Ad-hoc querying on AWS: Lambda, Glue, Athena

Introduction

If you give different engineers the same problem they will usually produce reasonably similar solutions (mutatis mutandis). For example, when I first came across a reference implementation of an RTB platform using AWS, I was amused by how close it was to what we have implemented in one of my previous projects (OpenRTB).

So it would be not much of a surprise that in the next RTB system, a similar pattern was used: logs are written to files, pushed to S3, then aggregated in Hadoop from where the reports are run.

But there were a few problems in the way… 

Log partitioning

The current log partitioning in S3 is by server ID.  This is really useful for debugging, and is fine for some aggregations, but not really good for various reasons – it is hard to narrow things by date, resulting in large scans. It is, therefore, even harder to do joins. Large scans in tools like Athena also translate into larger bills. In short, Hive-like partitioning would be good. 

To that end, I’ve created a Lambda function, repartition, which is triggered when a new log file is uploaded to s3://logbucket/ bucket:

import boto3
from gzip import GzipFile
from io import BytesIO
import json
import urllib.parse

s3 = boto3.client('s3')

SUFFIX = '.txt.gz'

V = "v8"

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    try:
        s3obj = s3.get_object(Bucket=bucket, Key=key)
        src = { 'Bucket': bucket, 'Key': key }
        (node, orig_name) = key.split("/")
        (_, _, node_id) = node.split("_")
        name = orig_name.replace(SUFFIX, "")
        (evt, dt0, hhmmss) = name.split("-")
        hr = hhmmss[0:2]
        # date-hour
        dthr = f"year=20{dt0[0:2]}/month={dt0[2:4]}/day={dt0[4:6]}/hour={hr}"
        
        schema = f"{V}/{evt}/{dthr}"
        dest = f"{schema}/{name}-{node_id}{SUFFIX}"
        print(f"Copying {key} to {dest}")
        s3.copy_object(Bucket=bucket, Key=dest, CopySource=src)

        return "OK"
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

if __name__ == "__main__":
    # Wrapper to run from CLI for now
    s3entry = {'bucket' : {'name' : 'logbucket'},
               'object' : {'key'  : server/requests-200420-003740.txt.gz'}}
    event = {'Records' : [{'s3' : s3entry}] }
    lambda_handler(event, None)
    

At that time, the log is copied to a new path is created under v8 prefix, with the following pattern:

<event>/year=<year>/month=<month>/day=<day>/hour=<hour>/<filename>-<nodeID>. For example, 

s3://logbucket/server1234/wins-200421-000003.txt.gz

is copied to 

s3://logbucket/v8/wins/year=2020/month=04/day=21/hour=00/wins-200421-000003-1234.txt.gz

(The “v8” prefix is there because I have arrived at this schema having tried several versions — 7, to be exact).

What about storage cost?

  • An additional benefit of using date-based partitioning is that we can easily automate changing storage type to Glacier for folders older than some specified time, which will save S3 storage costs of the duplicate data.
  • In the cloud, the storage costs are not something to worry about; the outgoing traffic and compute is where the problem is at. So move the compute to the data, not the other way around.

NOTE: Partitions do not equal timestamps

Partitioning is based on the file name. Records inside the file may have timestamps whose hour one greater than or one less than the partition hour, for obvious reasons. Thus, partitions are there to reduce the number of scanned records, but care should be taken when querying to not assume that the timestamps under year=2020/month=04/day=21/hour=00 all are in the 0th hour of 2020-04-21.

Discover metadata in Glue

Glue is an ETL service on AWS. One of the great features of Glue is crawlers that attempt to glean metadata from the logs. This is really convenient because it saves us the tedious step of defining the metadata manually.

So we set up a crawler. For explanation on how to do it, see the “Tutorial” link on the left hand side of this page:

However, it takes some time to get the configuration correct. 

  1. We would want to exclude some logs because we know their format is not good for discovery (they are not straight-up JSON or CSV, etc; and, at the moment, custom SerDes are not supported by Athena) — but see below for exceptions. This is done in the “Data store” part of crawler configuration:
  1. We want Glue to treat new files of the same type as being different partitions of the same table, not create new ones. For example, given the partitioning convention we created above, these two paths:
  • s3://logbucket/v8/wins/year=2020/month=04/day=21/hour=00/wins-200421-000003-1234.txt.gz
  • s3://logbucket/v8/wins/year=2020/month=04/day=22/hour=11/wins-200422-000004-4321.txt.gz

 Should be treated as partitions of table “wins”, not two different tables. We do this on the “Output” section of the crawler configuration as follows:

Once the crawler runs, we will see a list of tables created in Glue.

If here we see tables looking like parts of the partitioned path (e.g., year=2020, or ending with _txt_gz), it means the crawler got confused when discovering their structure. We are adding those to the crawler’s exclusion list, and will create their metadata manually. Fortunately, there are not that many such logs, and we can exclude them one by one.

Of course, while the crawler can recognize the file structure, it doesn’t know what to name the fields. So we can go and name them manually. While this is a tedious process, we don’t have to do that all at once – just do it on the as-needed basis. 

We will want to keep the crawler running hourly so that new partitions (which get created every hour) are picked up. (This can also be done manually from Athena – or Hive — by issuing MSCK REPAIR TABLE command).

First useful Athena query

Looking now at Athena, we see that metadata from Glue is visible and we can run queries:

Woohoo! It works! I can do nice ad hoc queries. We’re done, right?

Almost. Unfortunately, for historical reasons, our logs are not always formatted to work with this setup ideally.

We could identify two key cases:

  1. Mostly CSV files:
    • There are event prefixes preceding the event ID, even though the event itself is already defined by the log name. For example, bid:<BID_ID>, e.g.:
      bid:0000-1111-2222-3333
    • A CSV field in itself contains really two values. E.g., a log that is comma-separated into two fields: timestamp and “message”, which includes “Win: ” prefix before bid ID, and then – with no comma! – “price: ” followed by price. Like so:
      04/21/2020 00:59:59.722,Win: a750a866-8b1c-49c9-8a30-34071167374e_200421-00__302 price:0.93
      However, what we want to join on is the ID. So in these cases, we can use Athena views. For example, in these respective cases we can use:
       CREATE OR REPLACE VIEW bids2 AS
      SELECT "year", "month", "day", "hour",
      "SUBSTRING"("bid_colon_bid_id", ("length"('bid:') + 1)) "bid_id"
      FROM bids

      and
       CREATE OR REPLACE VIEW bids2 AS
      SELECT "year", "month", "day", "hour",
      "SPLIT_PART"("message", ' ', 3) "bid_id"
      FROM wins

      Now joins can be done on bid_id column, which makes for a more readable query.
  2. The other case is a log that has the following format: a timestamp, followed by a comma, followed by JSON (an OpenRTB request wrapped in one more layer of our own JSON that augments it with other data). Which makes it neither CSV, nor JSON. Glue crawler gets confused. The solution is using RegEx SerDe, as follows:

    And then we can use Athena’s JSON functions to deal with the JSON column, for example, to see distribution of requests by country:
     SELECT JSON_EXTRACT_SCALAR(request, '$.bidrequest.device.geo.country') country,
    COUNT(*) cnt
    FROM requests
    GROUP BY
    JSON_EXTRACT_SCALAR(request, '$.bidrequest.device.geo.country')
    ORDER BY cnt DESC
    Success! We can now use SQL to easily query our event logs.

Helpful links

OpenDSP’s DMP: Nanoput

Here I will describe the Nanoput project, which comprises a large part of OpenDSP’s DMP (Data Management Platform). There are, of course, other pieces — the entire picture will be painted under the DMP tag.

Overall DMP stack is as follows:

  • Nanoput proper: NGINX with Lua to handle . I’ll confess that at the moment this is a bit of a pet (as in not cattle). We are considering using OpenResty instead of rolling our own, which uses parts of OpenResty. But no matter, here I will present some features that can be achieved with this setup — and one instance is capable of handling all this.
  • Redis for storing and manipulating user sets — ZSET is great
  • MySQL for storing metadata — will be described in a separate post
  • PHP/JS for a simple Web interface to define the said metadata
  • Python for translating metadata into the configuration for NGINX
  • AWS S3 for storing raw logs — pre-partitioned so that EMR can be used easily.

Conceptual introduction

Conceptually, let’s consider the idea of an “event”. An impression, a conversion, a video tracking event, a site visit, etc, is an event — anything that fires a request to our Nanoput is. You may recognize a similarity with Snowplow — and that is because we are solving a similar problem.

To proceed further, I must ask of a little indulgence — please bear with me. As a technologist, it really grated me to hear the words like “Javascript pixel” but I have learned to stop worrying and love the bomb. Therefore, Javascript pixel it is when it is a JS code that fires some GET URLs. Now, then, the requests are assumed to be fired as GET HTTP requests (do not have to be but that’s the primary idea behind Nanoput per se — management of metadata to ingest things via other means, like FTP upload, etc, will be addressed separately) and can originate, for example, from:

  • Exchanges or DMPs, as an exchange-initiated cookie-sync: see below.
  • Regular user behavior — impressions, in case of video, video-tracking events; conversions
  • Calls as if initiated by exchanges or DMPs to Nanoput but in reality are, heh, Javascript pixels

Now, let us also consider the idea of a “user segment”. If you think about it, a segment is just a set of users. Thus, we may as well consider a user that produced a certain event as belonging to some segment. These may be explicitly asked for, such as “users we want to retarget“, or “users who converted”, etc. But there is no reason why any event cannot be defined as a segment-defining one.

Segments, here, are a special case of data collections concept discussed in a different post.

Given that, we can now dive into Nanoput implementation

General data acquisition idea

Here, we simply leverage basic NGINX functionality, that is logging. To that end, we split the main config file into sections that we include that deal with log format and location and behavior.

Static data acquisition URLs

By “static”, here we mean common use cases that are just part of Nanoput (hence the “man” subdirectory you will notice in examples — stands for, just like the Unix man command, for “manual”). Here we have:

  • Site events (essentially, those are an extension of retargeting concept).
  • Standard event tracking — by which we mean, standard events that happen in Ad world.

Notice that we also augment information available from NGINX (HTTP headers, etc.) with geo data using GeoIP module and user-agent/device/OS

Dynamic (metadata-driven) data acquisition URLs

Dynamic data acquisition works simply: a process reads the metadata table and creates appropriate entries in the NGINX configs that define log format and location and behavior.

Creating “segments”

On every “event”, consider script. We use awesome Redis’s Sorted Set functionality here inserting things twice. The key idea here, again, is a variation on dealing with data gravity concerns by just duplicating storage. We create two sorted sets for each key, the “score” being the first and last time we have seen the user. The reasoning for this is that:

  • First-seen: we can write batch scripts to get rid of users we have last seen over X days ago (expiring targeting).
  • Last-seen: helps us with conversion attribution (yes, this assumes naive last-click attribution or variants.

Duplication is not just for every user — it is for every set. The key here is the set (or segment) name, and the value is the set of users.

An added benefit of this is that new segments can be created by using various Redis set operations (union, intersection) easily.

Some useful shortcuts for a DMP

  • Getting OS/browser info without necessarily using WURFL (though that can easily be fronted by NGINX too, actually).

Exchange cookie sync

In the display world, there is a need for cookie syncing between DSP and a third-party DMP or an exchange/SSP, and that can be either exchange, DMP or DSP-initiated, or both. Some exchanges may allow the redirect chain to proceed further, some may not. Nanoput provides this functionality for exchanges we deal with as well as a template for doing it for other partners — at the speed that NGINX provides. Here are the moving parts:

Storing for further analysis

Raw logs, formatted as above, are uploaded to S3. Notice that they are stored twice, with different partitioning schema. This is one of the key ideas in Nanoput — storage is cheap; duplicating the storage this way and then using one or another partitioning schema depending on the use case:

OpenDSP – stay tuned

Every marketer, it seems, wants to participate in real-time bidding (RTB). But what is it that they really want?

They want an ability to price (price, not target!) a particular impression in real-time. Based on the secret-sauce business logic and data science. Fair enough.

But that secret sauce, part of their core competence, is just the tip of the iceberg — and the submerged part is all that is required to keep that tip above water. To wit:

  • Designing, developing, testing and maintaining actual code for the UI for targeting, the bidder for bidding, reporting, and data management
  • Scaling and deploying such code in some infrastructure (own data center,
    clouds like AWS, GCE, Azure), etc.
  • Integrating with all exchanges of interest, including the following steps:
    • Code: passing functional tests (understanding the exchange’s requirements for parsing request and sending response)
    • Infrastructure: ensuring the response is being sent to the exchange within the double-digit-millisecond limit
    • Scaling: As above, but under real load (hundreds of thousands of queries per second)
    • Business: Paperwork to ensure seat on the exchange, including credit agreements when necessary
  • Operations: Ongoing monitoring of the operations, including technical (increased latency) and business (low fill level, high disapproval level) concerns (whether these concerns are triggered by clients, exchange partners or,
    ideally, pro-actively addressed internally.

None of which is their core competence. We propose to address the underwater part. It’ll be exciting.

Enter OpenDSP. We got something cool coming up here. Stay tuned.

Watch this space, and by this space I mean this blog, this GitHub account.