There are two basic paradigms of building a data processing pipeline: Extract-Transform-Load (ETL) and Extract-Load-Transform (ELT). ETL is, still, the default way, but this approach has a lot of drawbacks and it’s becoming obvious that building an ELT pipeline is better.

First of all, there’s actually no such thing as a pure ETL pipeline. There will always have to be another Transform step after the data is loaded into the data warehouse. You’ll end up having an ETLT process or two ETL pipelines joined together.

ETL pipelines are tricky to build correctly. There are subtleties with each integration that, if done wrong, can be costly. At best, you’ll lose time and money rebuilding it. At worst, you’ll lose data and produce incorrect analyses.

ETL pipelines are even trickier to operate. You don’t want to test just the code, but also the data. You need to set up a good deployment and monitoring process. You want to log both success and error metrics. Don’t forget about alerting. Do your data engineers want to be on-call? The list goes on and on.

ETL pipelines are also inherently inflexible. They need to be rigid to give the “most correct” data possible, but this also makes them more difficult to adapt. And adapt they must, as the world around keeps changing all the time. Whether its a new API version or a new business requirement, you’ll need to incorporate this change. To do so, a data engineer and a data analyst need to work in tandem.

Which leads to another problem with ETL, this time around organizational design. Regardless how you structure your data team, a data engineer will always have less skin in the game – they’ll never directly take the blame and lose credibility for wrong data in a BI dashboard. They’ll feel less responsible, hence less interested, in doing the meticulous work necessary. Also, needing a data engineer to change a pipeline just leads to slower pace of development overall, a huge competitive disadvantage in today’s world.

All of this makes building and running an ETL process a slow, expensive, and complex undertaking. The truth is, Extract and Load steps are undifferentiated heavy lifting – they are not specific to any company yet every company needs to do them to at least have a chance of getting insights from their data. So why do it at all when there’s a better alternative in the form of ELT?

Let someone with way more experience and expertise handle the EL so you can focus on the T.

You’ll get your data sooner, faster and in a reliable fashion. You’ll save money on paying extra data engineers (my guesstimate is with ETL, the data engineer to data analyst ratio is around 1:2 whereas with ELT, it’s closer to 1:5). You’ll make your data analysts faster, independent, happy.

Standard ETL has been around for a long time, but its time has passed. With modern tools, there’s no point of not doing ELT. Ask yourself this – if you have to choose with a slow, error-prone, expensive way of achieving a goal or a fast, reliable and cheaper alternative, which one would you go for?

Hot-reloading node.js and TypeScript

TL;DR: Use tsc-watch

I’ve spent some time researching how to get hot-reloading working with Node & TypeScript. Due to the mess that is the JS dev ecosystem, finding a solid solution took me longer I expected for this kind. Hopefully this post will save you that effort.

The best solution I found is using tsc-watch. Install it as a dev dependency and set tsc-watch –onSuccess 'node .' as your start script.

That’s it, happy hacking.

Two tips for writing CloudFormation templates

Here are two tips for writing more readable CloudFormation templates.

1) Use dot notation to access attributes

The Fn::GetAtt intrinsic function supports the use of dot notation to refer to a resource’s attribute. It works both the long Fn::GetAtt and short !GetAtt forms of the syntax. Instead of the long, enumerated, YAML array syntax:

    - MyResource
    - Arn

Use the one-liner version:

Value: !GetAtt MyResource.Arn

2) Use !Sub instead of !Join

When creating a string, using !Sub is often a much better option than using !Join. Here’s an example of building an ECR repository URI:

    - '.'
    - - !Ref 'AWS::AccountID'
      - dkr
      - ecr
      - !Ref 'AWS::Region'
      - amazonaws.com
      - '/'
      - !Ref 'ServiceName'

With this approach, it’s hard to grasp what the string looks like, it’s hard to write correctly and it’s hard to debug. Now compare it to using !Sub:

Value: !Sub '${AWS::AccountId}.dkr.ecr.${AWS::Region}.amazonaws.com/${ServiceName}'

A cool thing to notice is the explicit !Ref on pseudo-parameters and template parameters. This makes the whole construction so much nicer. If you need to access an attribute, you can use the dot notation as mentioned above.

I hope you find these tips useful and apply them in your practice.

DynamoDB, NodeJS vs. Python and persistent connections

Recently, Yan Cui wrote an enlightening blogpost about using keep-alive HTTP connections to significantly speed up DynamoDB operations. He gave an example of how to do it in NodeJS. I was curious how to do it in Python.

To my surprise, I found out I did not have to do anything at all. DynamoDB keeps the connection open. See for yourself – using the CLI, run aws dynamodb list-tables --debug. Notice the response headers section, which looks something like this:

 Response headers:
 {'Server': 'Server', 
  'Date': 'Thu, 07 Mar 2019 19:42:55 GMT', 
  'Content-Type': 'application/x-amz-json-1.0', 
  'Content-Length': '328', 
  'Connection': 'keep-alive', 
  'x-amzn-RequestId': '38N9IJV176MACH027DNIRT5C53VV4KQNSO5AEMVJF66Q9ASUAAJG', 
  'x-amz-crc32': '2150813651'}

The Connection: keep-alive header is set by DynamoDB. Unless it’s explicitly set to close, the connection will stay open. Yet this is exactly what NodeJS does. Thank you to Stefano Buliani for providing additional visibility into this. This behaviour is inherited by the aws-js-sdk. I think that’s a mistake so I’ve opened a bug in the GitHub repo. Until then, if you’re writing code in JS, be sure to follow Yan’s recommendation.

Connection: keep-alive vs. close in Python

I was still curious if  I could replicate Yan’s findings in Python. Here’s a log of running a single putItem operation using vanilla boto3 DynamoDB client:


Except for the first one, most of them are sub 10 ms, since the connection is kept open.

However, when I explicitly did add the Connection: close header, things looked a lot different:


Operations took at least 50 ms, often longer. This is in line with Yan’s findings.

Granted, my approach was not very rigorous. For the sake of replicability, here’s the code I used. Feel free to run your own experiments and let me know what you found.

Uselatest – a Cloudformation macro to always use the latest version of a Lambda Layer

One of the drawbacks of using a Lambda Layer is that you must declare it by its full version. This is a hassle as every time you update a Layer, you need to update its  declaration in every stack to get the latest updates. It would be much better if one could specify it only by its name (similar as with the FunctionName when declaring event source mapping). That is, instead of arn:aws:lambda:us-east-1:123456789012:layer:my-layer:24 just use my-layer.

I made a Cloudformation macro to do just that.

Uselatest scans through a Cloudformation template and replaces occurrences of Lambda Layers that are not fully qualified with an ARN of the latest available version of that Layer. This way you don’t have to think about updating a template after updating a Layer. The latest version will automatically get picked up during stack deployment. Magic. ✨

The macro works in all the places where you can declare a Layer. Check the Example section for more.

I wanted to make it available in the Serverless App Repo, but sadly, a Cloudformation Macro is not a supported resource. You’ll have to build, package and deploy it yourself if you want to use it.

Unit testing AWS services in Python

Consider the following piece of code:

import boto3
Table = boto3.resource('dynamodb').Table('foo')
def get_user(user_id):
ddb_response = Table.get_item(Key={'id': user_id})
return ddb_response.get('Item')
view raw models.py hosted with ❤ by GitHub

It’s a contrived example that just reads an item of data from a DynamoDB table. How would you write a unit test for the get_user function?

My favourite way to do so is to combine pytest fixtures and botocore’s Stubber:

from botocore import Stubber, ANY
import pytest
import models
def ddb_stubber():
ddb_stubber = Stubber(models.Table.meta.client)
yield ddb_stubber
def test_user_exists(ddb_stubber):
user_id = 'user123'
get_item_params = {'TableName': ANY,
'Key': {'id': user_id}}
get_item_response = {'Item': {'id': {'S': user_id},
'name': {'S': 'Spam'}}}
ddb_stubber.add_response('get_item', get_item_response, get_item_params)
result = main.get_user(user_id)
assert result.get('id') == user_id
def test_user_missing(ddb_stubber):
user_id = 'user123'
get_item_params = {'TableName': ANY,
'Key': {'id': user_id}}
get_item_response = {}
ddb_stubber.add_response('get_item', get_item_response, get_item_params)
result = main.get_user(user_id)
assert result is None
view raw test_models.py hosted with ❤ by GitHub

There’s couple of things to note here.

First, I’m using the wonderful scope functionality of pytest fixtures. This allows me to create a new fixture per every test function execution. It is necessary for Stubber to work correctly.

The Stubber needs to be created with the correct client. Since I’m using a DynamoDB Table instance in models.py, I have to access its client when creating the Stubber instance.

Notice also the “verbose” get_item_response structure in the first test. That’s because of how the DynamoDB client interacts with DynamoDB API (needless to say, this is DynamoDB specific). The Table is a layer of abstraction on top of this, it converts between DynamoDB types and Python types. However it still uses the client underneath, so it expects this structure nevertheless.

Finally, it’s good practice to call assert_no_pending_response to make sure the tested code actually did make the call to an AWS service.

I really like this combination of pytest and Stubber. It’s a great match for writing correct and compact tests.

Does Lambda need timeout and memory size parameters?

Following my previous post on judging the serverlessness of a technology, I apply this criterion to AWS Lambda. I argue that the timeout and memory size configuration parameters are non-essential and should be made optional. The need to think about them makes Lambda less serverless than it could be.

On timeout

The way you naturally write a function is to finish as soon as possible. It’s just good engineering and good for business. Why then artificially limit its execution time?

The most common case I hear about using timeout is when a Lambda calls some external API. In this scenario, it is used as a fail-safe in case the API takes too long to respond. A better approach is to implement a timeout on the API call itself, in code, and fail the Lambda gracefully if it does not respond in time instead of relying on the runtime to terminate your function. That’s also good engineering.

So here’s my first #awswishlist entry: Make timeout optional and let functions run as long as they need to.

On memory size

I have two issues with the memory size parameter.

First of all, it’s a leaky abstraction of the underlying system. You don’t just specify how much memory your function gets, but also the CPU power. There’s a threshold where the Lambda container is assigned 2 vCPUs instead of 1. Last time I checked this was at 1024 MB, but there’s no way of knowing this unless you experiment with the platform. Since Lambda does not offer specialized CPU instances like EC2 does (yet?), it might not matter, but I worked on a data processing application where this came into play. Why not allow us to configure this directly? What if I need less memory but more vCPUs?

However a more serious point of contention for me is that setting the memory size is an issue of capacity planning. That’s something that should have gone away in the serverless world. You have to set it for the worst possible scenario as there’s no “auto-scaling”. It really sucks when your application starts failing because a Lambda function suddenly needs 135 MB of memory to finish.

Hence here’s my second #awswishlist entry: Make memory size optional. Or provide “burst capacity” for those times a Lambda crosses the threshold.

Now I won’t pretend I understand all the complexities that are behind operating the Lambda platform and I imagine this is an impossible request, but one can dream.

And while I’m at it, a third #awswishlist item is: Publish memory consumed by a Lambda function as a metric to CloudWatch.

Closing remarks

I do see value in setting either of these parameters, but I think those are specialized cases. For the vast majority of code deployed on Lambda, the platform should take care of “just doing the right thing” and allow us, developers, to think less about the ops side.

Thinking less about servers

Even though serverless has been around for a couple of years now, there is not a clear definition what the term actually means. Leaving aside that it’s a misnomer to begin with, I think part of the confusion stems from the fact that it is being applied to in two different ways. Serverless can either describe a quality of a technology (DynamoDB) or it can refer to an approach of building IT systems (a serverless chat-bot).

My way to judging the former is this:

The less you have to think about servers the more serverless a technology is. Furthermore, serverless is not a binary value but a spectrum.

Let me give an example. On a completely arbitrary scale from 1 to 10, I would rate DynamoDB with provisioned capacity as 8/10 serverless. It’s not fully serverless because I still need to think deeply about data access patterns, predict read and write load and monitor utilization once my system is operational. However, with the recent announcement of on demand pricing, I would rate DynamoDB 10/10. I don’t need to think about any of these aforementioned idiosyncrasies (burdens, really) of using the technology.

The second aspect of a serverless technology (and by proxy also a system) is that you don’t pay for idle except for data storage. Once again, if you need to think about something even if it’s not running (and clearly you’re going to think about your credit card bill), it is not serverless.

This is the promise of serverless. Once you start combining these technologies into systems, you can think about and focus on building value and leave the operational cost on the technology provider.

Streaming upload to S3

Here’s a short recipe of how to transmit files from an external source to an S3 bucket, without downloading the whole source and hence unnecessarily allocating memory:

from botocore.vendored import requests
import boto3
def main(event, context):
s3 = boto3.resource('s3')
bucket = s3.Bucket('mybucket')
destination = bucket.Object('path/to/destination')
url = 'https://foobar.com'
with requests.get(url, stream=True) as response:

It’s taking advantage of request’s stream capability.

Even with files over 2 GB in size, the Lambda container consumed only about 120 MB of memory. Pretty sweet. Of course, this approach is applicable to any platform, not just Lambda.

Redshift workload management basics

Configuring workload management (WLM) for a Redshift cluster is one of the most impactful things you can do to improve the overall performance of your queries.

The goal is, roughly speaking, to have as less slots per queue as possible with as less — ideally none — wait time in each queue as possible. This will ensure that queries have the most amount of memory available (which helps with query execution speed as intermediate results don’t have to be written to disk) while, at the same time, they execute immediately.

There’s no golden rule on how to configure WLM queues, as it is really use-case specifics. I recommend starting very simple. By default, there’s a single queue with concurrency level of 5. This is, most probably, insufficient — queries won’t be executed immediately, but will be waiting for a slot to free up. Increase it (say, to 15) and monitor the wait time over the next few days.

You can use the v_check_wlm_query_trend_hourly admin view from the tremendously useful amazon-redshift-utils and plot it on a graph.


You are only interested in those with a service_class > 5 as first five are internal and you cannot change their configuration.

In the graph above you can see that there’s pretty much no wait time on the queue, which is a good thing. In such a case you can experiment with reducing the concurrency level to increase the memory-per-slot of a queue. Use this query to inspect the memory allocation and concurrency level of your queues:

SELECT service_class, query_working_mem as mem_mb_per_slot, num_query_tasks as concurrency_level
FROM stv_wlm_service_class_config
WHERE service_class > 5

Finally, make sure to set a query timeout (maximum time it can run) on your WLM queues. A runaway query can bring your cluster to a halt.

Figuring out the sweet spot for your WLM setup takes a while and you should revisit it regularly as your system evolves. The great thing about changing WLM config is that tweaking the properties of a queue does not require a cluster reboot so you won’t disrupt the work of your colleagues by experimenting with the setup.

There is a lot of fine-grained parameters you can adjust and tons more to learn about WLM (my favourite gem is wlm_query_slot_count). Yet already a very basic setup will help with the overall cluster performance. It is absolutely worth the effort to understand and implement WLM.

Overseeing a long running process with Lambda and Step Functions

In my day job, we’re using Lambda and Step Functions to create data processing pipelines. This combo works great for a lot of our use cases. However for some specific long running tasks (e.g. web scrapers), we “outsource” the computing from Lambda to Fargate.

This poses an issue – how to plug that part of the pipeline to the Step Function orchestrating it. Using an Activity does not work when the processing is distributed among multiple workers.

A solution I came up with is creating a gatekeeper loop in the Step Function to oversee the progress of the workers by a Lambda function. This is how in looks:


class NotReadyError(Exception):
Custom exception signaling to the Step Function
that it cannot move to the next state. Instead,
the Retry block is triggered, pausing the process.
You can pass details about the progress as a
string when initializing it. It will be shown
in the SF console.
def is_processing_done():
Function checking if whatever external work the
pipeline is waiting on has finished already.
return False
def main(event, context):
if event.get('force'):
if not is_processing_done():
raise NotReadyError()
view raw gatekeeper.py hosted with ❤ by GitHub
"Comment": "Step Function demonstrating the gatekeeper loop",
"StartAt": "GatekeeperState",
"States": {
"GatekeeperState": {
"Type": "Task",
"Resource": "",
"Next": "DoneState",
"ResultPath": null,
"Retry": [{
"ErrorEquals": ["NotReadyError"],
"IntervalSeconds": 30,
"BackoffRate": 1.0,
"MaxAttempts": 50
"Catch": [{
"ErrorEquals": ["NotReadyError"],
"ResultPath": "$.exception",
"Next": "ForceGatekeeperState"
"ForceGatekeeperState": {
"Type": "Pass",
"Result": true,
"ResultPath": "$.force",
"Next": "GatekeeperState"
"DoneState": {
"Type": "Task",
"Resource": "",
"End": true
view raw stepfunction.json hosted with ❤ by GitHub

The gatekeeper function (triggered by the GatekeeperState) checks, if external workers have finished yet. This can be done by waiting until an SQS queue is empty, counting the number of objects in an S3 bucket or any other way indicating that the processing can move onto the next state.

If the processing is not done yet, the gatekeeper function raises a NotReadyError. This is caught by the Retry block in the Step Function, pausing the execution of a certain period of time, as defined by its parameters. Afterwards, the gatekeeper is called again.

Eventually, if the work is not done even after MaxAttempts retries, the ForceGatekeeperState is triggered. It adds a "force: true" parameter to the invocation event and calls the gatekeeper right back again. Notice that the gatekeeper function checks for this force parameter as the very first thing when executed. Since it’s present from the ForceGatekeeperState, it returns immediately and the Step Function moves on to the DoneState.

For our use case, it was better to have partial results than no results at all. That’s why the ForceGatekeeperState is present. You can also leave it out altogether and have the Step Function execution fail after MaxAttempt retries of the gatekeeper.

A better way to package Python functions for AWS Lambda

The default way of creating a zip package that’s to be deployed to AWS Lambda is to place everything – your source code and any libraries you are using – in the service root directory and compress it. I don’t like this approach as, due to the flat hierarchy it can lead to naming conflicts, it is harder to manage packaging of isolated functions and it creates a mess in the source directory.

What I do instead is install all dependencies into a lib directory (which is as simple as pip install -r requirements.txt -t lib step in the deployment pipeline) and set the PYTHONPATH environment variable to /var/runtime:/var/task/lib when deploying the Lambda functions.

This works because the zip package is extracted into /var/task in the Lambda container. While it might seem as an unstable solution, I’ve been using this for over a year now without any problems.