Ad-hoc querying on AWS: Connecting BI tools to Athena

In a previous post, we discussed using Lambda, Glue and Athena to set up queries of events that are logged by our real-time bidding system. Here, we will build on that foundation, and show how to make this even friendlier to business users by connecting BI tools to this setup.

Luckily, Athena supports both JDBC and ODBC, and, thus, any BI tool that uses either of these connection methods can use Athena!

First, we need to create an IAM user. The the minimum policies required are:

  • AmazonAthenaFullAccess
  • Writing to a bucket for Athena query output (use an existing one or create a new one). For the sake of example, let’s call it s3://athena.out
  • Reading from our s3://logbucket which is where the logs are in

Now we’ll need the access and the secret keys for that user to use it with various tools. 

JDBC

JDBC driver (com.simba.athena.jdbc.Driver) can be downloaded here.

The JDBC URL is constructed as follows:

jdbc:awsathena://User=<aws-access-key>;Password=<aws-secret-key>;S3OutputLocation=s3://athena.out;

Here’s a sample Java program that shows it in action:

import java.sql.Connection;
import java.sql.DriverManager;
 
public class Main {
  public static void main(String[] args) throws Throwable {
    Class.forName("com.simba.athena.jdbc.Driver");
    String accessKey = "...";
    String secretKey = "...";
    String bucket = "athena.out";
    String url = "jdbc:awsathena://AwsRegion=us-east-1;User=" + accessKey + ";Password=" + secretKey
        + ";S3OutputLocation=s3://" + bucket +”;";
    Connection connection = DriverManager.getConnection(url);
    System.out.println("Successfully connected to\n\t" + url);
  }
}

Example using JDBC: DbVisualizer

  1. If you haven’t already, download the JDBC driver to some folder.
  2. Open Driver Manager (Tools-Driver Manager)
  1. Press green + to create new driver
  1. Press the folder icon on the right …
  1. … and browse to the folder where you saved the JDBC driver and select it:
  1. Leave the URL Format field blank and pick com.simba.athena.jdbc.Driver for Driver Class:
  1. Close the Driver Manager, and let’s create a Connection:
  1. We’ll use “No Wizard” option. Pick Athena from the dropdown in the Driver (JDBC) field and enter the JDBC URL from above in the Database URL field:
  2. Press “Connect” and observe DbVisualizer read the metadata information from Athena (well, Glue, really), including tables and views.

ODBC (on OSX)

  1. Download run ODBC driver installer
  2. Create or edit /Library/ODBC/odbcinst.ini to add the following information:
    [ODBC Drivers]
    Simba Athena ODBC Driver=Installed
    [Simba Athena ODBC Driver]
    Driver = /Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib

    If the odbcinst.ini file already has entries, put new entries into the appropriate sections; e.g., if it was
    [ODBC Drivers]
    PostgreSQL Unicode = Installed
    [PostgreSQL Unicode]
    Description = PostgreSQL ODBC driver
    Driver = /usr/local/lib/psqlodbcw.so

    Then it becomes
     [ODBC Drivers]
    PostgreSQL Unicode = Installed
    Simba Athena ODBC Driver=Installed
    [PostgreSQL Unicode]
    Description = PostgreSQL ODBC driver
    Driver = /usr/local/lib/psqlodbcw.so
    [Simba Athena ODBC Driver]
    Driver = /Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib
  3. Create or edit, in a similar fashion, /Library/ODBC/odbc.ini to include the following information:
    [AthenaDSN]
    Driver=/Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib
    AwsRegion=us-east-1
    S3OutputLocation=s3://athena.out
    AuthenticationType=IAM Credentials
    UID=AWS_ACCESS_KEY
    PWD=AWS_SECRET_KEY
  4. If you wish to test, download and run ODBC Manager. You should see that it successfully recognizes the DSN:

Example using ODBC: Excel

  1. Switch to Data tab, and under New Database Query select From Database:
  1. In the iODBC Data Source Chooser window, select AthenaDSN we configured above and hit OK. 
  1. Annoyingly, despite having configured it, you will be asked for credentials again. Enter the access and secret key.
  2. You should see a Microsoft Query window. 

Success!

Helpful links

Ad-hoc querying on AWS: Lambda, Glue, Athena

Introduction

If you give different engineers the same problem they will usually produce reasonably similar solutions (mutatis mutandis). For example, when I first came across a reference implementation of an RTB platform using AWS, I was amused by how close it was to what we have implemented in one of my previous projects (OpenRTB).

So it would be not much of a surprise that in the next RTB system, a similar pattern was used: logs are written to files, pushed to S3, then aggregated in Hadoop from where the reports are run.

But there were a few problems in the way… 

Log partitioning

The current log partitioning in S3 is by server ID.  This is really useful for debugging, and is fine for some aggregations, but not really good for various reasons – it is hard to narrow things by date, resulting in large scans. It is, therefore, even harder to do joins. Large scans in tools like Athena also translate into larger bills. In short, Hive-like partitioning would be good. 

To that end, I’ve created a Lambda function, repartition, which is triggered when a new log file is uploaded to s3://logbucket/ bucket:

import boto3
from gzip import GzipFile
from io import BytesIO
import json
import urllib.parse

s3 = boto3.client('s3')

SUFFIX = '.txt.gz'

V = "v8"

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        s3obj = s3.get_object(Bucket=bucket, Key=key)
        src = { 'Bucket': bucket, 'Key': key }
        (node, orig_name) = key.split("/")
        (_, _, node_id) = node.split("_")
        name = orig_name.replace(SUFFIX, "")
        (evt, dt0, hhmmss) = name.split("-")
        hr = hhmmss[0:2]
        # date-hour
        dthr = f"year=20{dt0[0:2]}/month={dt0[2:4]}/day={dt0[4:6]}/hour={hr}"
        
        schema = f"{V}/{evt}/{dthr}"
        dest = f"{schema}/{name}-{node_id}{SUFFIX}"
        print(f"Copying {key} to {dest}")
        s3.copy_object(Bucket=bucket, Key=dest, CopySource=src)

        return "OK"
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

if __name__ == "__main__":
    # Wrapper to run from CLI for now
    s3entry = {'bucket' : {'name' : 'logbucket'},
               'object' : {'key'  : server/requests-200420-003740.txt.gz'}}
    event = {'Records' : [{'s3' : s3entry}] }
    lambda_handler(event, None)
    

At that time, the log is copied to a new path is created under v8 prefix, with the following pattern:

<event>/year=<year>/month=<month>/day=<day>/hour=<hour>/<filename>-<nodeID>. For example, 

s3://logbucket/server1234/wins-200421-000003.txt.gz

is copied to 

s3://logbucket/v8/wins/year=2020/month=04/day=21/hour=00/wins-200421-000003-1234.txt.gz

(The “v8” prefix is there because I have arrived at this schema having tried several versions — 7, to be exact).

What about storage cost?

  • An additional benefit of using date-based partitioning is that we can easily automate changing storage type to Glacier for folders older than some specified time, which will save S3 storage costs of the duplicate data.
  • In the cloud, the storage costs are not something to worry about; the outgoing traffic and compute is where the problem is at. So move the compute to the data, not the other way around.

NOTE: Partitions do not equal timestamps

Partitioning is based on the file name. Records inside the file may have timestamps whose hour one greater than or one less than the partition hour, for obvious reasons. Thus, partitions are there to reduce the number of scanned records, but care should be taken when querying to not assume that the timestamps under year=2020/month=04/day=21/hour=00 all are in the 0th hour of 2020-04-21.

Discover metadata in Glue

Glue is an ETL service on AWS. One of the great features of Glue is crawlers that attempt to glean metadata from the logs. This is really convenient because it saves us the tedious step of defining the metadata manually.

So we set up a crawler. For explanation on how to do it, see the “Tutorial” link on the left hand side of this page:

However, it takes some time to get the configuration correct. 

  1. We would want to exclude some logs because we know their format is not good for discovery (they are not straight-up JSON or CSV, etc; and, at the moment, custom SerDes are not supported by Athena) — but see below for exceptions. This is done in the “Data store” part of crawler configuration:
  1. We want Glue to treat new files of the same type as being different partitions of the same table, not create new ones. For example, given the partitioning convention we created above, these two paths:
  • s3://logbucket/v8/wins/year=2020/month=04/day=21/hour=00/wins-200421-000003-1234.txt.gz
  • s3://logbucket/v8/wins/year=2020/month=04/day=22/hour=11/wins-200422-000004-4321.txt.gz

 Should be treated as partitions of table “wins”, not two different tables. We do this on the “Output” section of the crawler configuration as follows:

Once the crawler runs, we will see a list of tables created in Glue.

If here we see tables looking like parts of the partitioned path (e.g., year=2020, or ending with _txt_gz), it means the crawler got confused when discovering their structure. We are adding those to the crawler’s exclusion list, and will create their metadata manually. Fortunately, there are not that many such logs, and we can exclude them one by one.

Of course, while the crawler can recognize the file structure, it doesn’t know what to name the fields. So we can go and name them manually. While this is a tedious process, we don’t have to do that all at once – just do it on the as-needed basis. 

We will want to keep the crawler running hourly so that new partitions (which get created every hour) are picked up. (This can also be done manually from Athena – or Hive — by issuing MSCK REPAIR TABLE command).

First useful Athena query

Looking now at Athena, we see that metadata from Glue is visible and we can run queries:

Woohoo! It works! I can do nice ad hoc queries. We’re done, right?

Almost. Unfortunately, for historical reasons, our logs are not always formatted to work with this setup ideally.

We could identify two key cases:

  1. Mostly CSV files:
    • There are event prefixes preceding the event ID, even though the event itself is already defined by the log name. For example, bid:<BID_ID>, e.g.:
      bid:0000-1111-2222-3333
    • A CSV field in itself contains really two values. E.g., a log that is comma-separated into two fields: timestamp and “message”, which includes “Win: ” prefix before bid ID, and then – with no comma! – “price: ” followed by price. Like so:
      04/21/2020 00:59:59.722,Win: a750a866-8b1c-49c9-8a30-34071167374e_200421-00__302 price:0.93
      However, what we want to join on is the ID. So in these cases, we can use Athena views. For example, in these respective cases we can use:
       CREATE OR REPLACE VIEW bids2 AS
      SELECT "year", "month", "day", "hour",
      "SUBSTRING"("bid_colon_bid_id", ("length"('bid:') + 1)) "bid_id"
      FROM bids

      and
       CREATE OR REPLACE VIEW bids2 AS
      SELECT "year", "month", "day", "hour",
      "SPLIT_PART"("message", ' ', 3) "bid_id"
      FROM wins

      Now joins can be done on bid_id column, which makes for a more readable query.
  2. The other case is a log that has the following format: a timestamp, followed by a comma, followed by JSON (an OpenRTB request wrapped in one more layer of our own JSON that augments it with other data). Which makes it neither CSV, nor JSON. Glue crawler gets confused. The solution is using RegEx SerDe, as follows:

    And then we can use Athena’s JSON functions to deal with the JSON column, for example, to see distribution of requests by country:
     SELECT JSON_EXTRACT_SCALAR(request, '$.bidrequest.device.geo.country') country,
    COUNT(*) cnt
    FROM requests
    GROUP BY
    JSON_EXTRACT_SCALAR(request, '$.bidrequest.device.geo.country')
    ORDER BY cnt DESC
    Success! We can now use SQL to easily query our event logs.

Helpful links

Rethinking data gravity

At some point I remember having a short chat with Werner Voegels about taking spot instances to extreme in a genuine market in which compute power can be traded. His response was “what about data gravity?” to which my counter was — but by making data transfer into S3 free (and, later, making true the adage about not underestimating the bandwidth of a truck full of tape) you, while understanding the gravity idea, also provide incentives to not make it an issue. As in — why don’t I make things redundant? Why don’t I just push data to multiple S3 regions and have my compute follow the sun in terms of cost? Sure, it doesn’t work on huge scale, but it just may work perfectly fine on some medium scale, and this is what we’ve used for implementing our DMP at OpenDSP.

Later on, I sort of dabbled in something in the arbitrage of cost space. I still think compute cost arbitrage will be a thing; 6fusion did some interesting work there; ClusterK got acquired by Amazon for their ability to save cost even when running heavy data-gravity workload such as EMR, and ultimately isn’t compute arbitrage just an arbitrage of electricity? But I digress. Or do I? Oh yes.

In a way, this is not really anything new — it is just another way to surface the same idea as Hadoop.