Ad-hoc querying on AWS: Lambda, Glue, Athena

Introduction

If you give different engineers the same problem they will usually produce reasonably similar solutions (mutatis mutandis). For example, when I first came across a reference implementation of an RTB platform using AWS, I was amused by how close it was to what we have implemented in one of my previous projects (OpenRTB).

So it would be not much of a surprise that in the next RTB system, a similar pattern was used: logs are written to files, pushed to S3, then aggregated in Hadoop from where the reports are run.

But there were a few problems in the way… 

Log partitioning

The current log partitioning in S3 is by server ID.  This is really useful for debugging, and is fine for some aggregations, but not really good for various reasons – it is hard to narrow things by date, resulting in large scans. It is, therefore, even harder to do joins. Large scans in tools like Athena also translate into larger bills. In short, Hive-like partitioning would be good. 

To that end, I’ve created a Lambda function, repartition, which is triggered when a new log file is uploaded to s3://logbucket/ bucket:

import boto3
from gzip import GzipFile
from io import BytesIO
import json
import urllib.parse

s3 = boto3.client('s3')

SUFFIX = '.txt.gz'

V = "v8"

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        s3obj = s3.get_object(Bucket=bucket, Key=key)
        src = { 'Bucket': bucket, 'Key': key }
        (node, orig_name) = key.split("/")
        (_, _, node_id) = node.split("_")
        name = orig_name.replace(SUFFIX, "")
        (evt, dt0, hhmmss) = name.split("-")
        hr = hhmmss[0:2]
        # date-hour
        dthr = f"year=20{dt0[0:2]}/month={dt0[2:4]}/day={dt0[4:6]}/hour={hr}"
        
        schema = f"{V}/{evt}/{dthr}"
        dest = f"{schema}/{name}-{node_id}{SUFFIX}"
        print(f"Copying {key} to {dest}")
        s3.copy_object(Bucket=bucket, Key=dest, CopySource=src)

        return "OK"
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

if __name__ == "__main__":
    # Wrapper to run from CLI for now
    s3entry = {'bucket' : {'name' : 'logbucket'},
               'object' : {'key'  : server/requests-200420-003740.txt.gz'}}
    event = {'Records' : [{'s3' : s3entry}] }
    lambda_handler(event, None)
    

At that time, the log is copied to a new path is created under v8 prefix, with the following pattern:

<event>/year=<year>/month=<month>/day=<day>/hour=<hour>/<filename>-<nodeID>. For example, 

s3://logbucket/server1234/wins-200421-000003.txt.gz

is copied to 

s3://logbucket/v8/wins/year=2020/month=04/day=21/hour=00/wins-200421-000003-1234.txt.gz

(The “v8” prefix is there because I have arrived at this schema having tried several versions — 7, to be exact).

What about storage cost?

  • An additional benefit of using date-based partitioning is that we can easily automate changing storage type to Glacier for folders older than some specified time, which will save S3 storage costs of the duplicate data.
  • In the cloud, the storage costs are not something to worry about; the outgoing traffic and compute is where the problem is at. So move the compute to the data, not the other way around.

NOTE: Partitions do not equal timestamps

Partitioning is based on the file name. Records inside the file may have timestamps whose hour one greater than or one less than the partition hour, for obvious reasons. Thus, partitions are there to reduce the number of scanned records, but care should be taken when querying to not assume that the timestamps under year=2020/month=04/day=21/hour=00 all are in the 0th hour of 2020-04-21.

Discover metadata in Glue

Glue is an ETL service on AWS. One of the great features of Glue is crawlers that attempt to glean metadata from the logs. This is really convenient because it saves us the tedious step of defining the metadata manually.

So we set up a crawler. For explanation on how to do it, see the “Tutorial” link on the left hand side of this page:

However, it takes some time to get the configuration correct. 

  1. We would want to exclude some logs because we know their format is not good for discovery (they are not straight-up JSON or CSV, etc; and, at the moment, custom SerDes are not supported by Athena) — but see below for exceptions. This is done in the “Data store” part of crawler configuration:
  1. We want Glue to treat new files of the same type as being different partitions of the same table, not create new ones. For example, given the partitioning convention we created above, these two paths:
  • s3://logbucket/v8/wins/year=2020/month=04/day=21/hour=00/wins-200421-000003-1234.txt.gz
  • s3://logbucket/v8/wins/year=2020/month=04/day=22/hour=11/wins-200422-000004-4321.txt.gz

 Should be treated as partitions of table “wins”, not two different tables. We do this on the “Output” section of the crawler configuration as follows:

Once the crawler runs, we will see a list of tables created in Glue.

If here we see tables looking like parts of the partitioned path (e.g., year=2020, or ending with _txt_gz), it means the crawler got confused when discovering their structure. We are adding those to the crawler’s exclusion list, and will create their metadata manually. Fortunately, there are not that many such logs, and we can exclude them one by one.

Of course, while the crawler can recognize the file structure, it doesn’t know what to name the fields. So we can go and name them manually. While this is a tedious process, we don’t have to do that all at once – just do it on the as-needed basis. 

We will want to keep the crawler running hourly so that new partitions (which get created every hour) are picked up. (This can also be done manually from Athena – or Hive — by issuing MSCK REPAIR TABLE command).

First useful Athena query

Looking now at Athena, we see that metadata from Glue is visible and we can run queries:

Woohoo! It works! I can do nice ad hoc queries. We’re done, right?

Almost. Unfortunately, for historical reasons, our logs are not always formatted to work with this setup ideally.

We could identify two key cases:

  1. Mostly CSV files:
    • There are event prefixes preceding the event ID, even though the event itself is already defined by the log name. For example, bid:<BID_ID>, e.g.:
      bid:0000-1111-2222-3333
    • A CSV field in itself contains really two values. E.g., a log that is comma-separated into two fields: timestamp and “message”, which includes “Win: ” prefix before bid ID, and then – with no comma! – “price: ” followed by price. Like so:
      04/21/2020 00:59:59.722,Win: a750a866-8b1c-49c9-8a30-34071167374e_200421-00__302 price:0.93
      However, what we want to join on is the ID. So in these cases, we can use Athena views. For example, in these respective cases we can use:
       CREATE OR REPLACE VIEW bids2 AS
      SELECT "year", "month", "day", "hour",
      "SUBSTRING"("bid_colon_bid_id", ("length"('bid:') + 1)) "bid_id"
      FROM bids

      and
       CREATE OR REPLACE VIEW bids2 AS
      SELECT "year", "month", "day", "hour",
      "SPLIT_PART"("message", ' ', 3) "bid_id"
      FROM wins

      Now joins can be done on bid_id column, which makes for a more readable query.
  2. The other case is a log that has the following format: a timestamp, followed by a comma, followed by JSON (an OpenRTB request wrapped in one more layer of our own JSON that augments it with other data). Which makes it neither CSV, nor JSON. Glue crawler gets confused. The solution is using RegEx SerDe, as follows:

    And then we can use Athena’s JSON functions to deal with the JSON column, for example, to see distribution of requests by country:
     SELECT JSON_EXTRACT_SCALAR(request, '$.bidrequest.device.geo.country') country,
    COUNT(*) cnt
    FROM requests
    GROUP BY
    JSON_EXTRACT_SCALAR(request, '$.bidrequest.device.geo.country')
    ORDER BY cnt DESC
    Success! We can now use SQL to easily query our event logs.

Helpful links

2 thoughts on “Ad-hoc querying on AWS: Lambda, Glue, Athena

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.