Streaming upload to S3

Here’s a short recipe of how to transmit files from an external source to an S3 bucket, without downloading the whole source and hence unnecessarily allocating memory:

from botocore.vendored import requests
import boto3
def main(event, context):
s3 = boto3.resource('s3')
bucket = s3.Bucket('mybucket')
destination = bucket.Object('path/to/destination')
url = 'https://foobar.com'
with requests.get(url, stream=True) as response:
destination.upload_fileobj(response.raw)

It’s taking advantage of request’s stream capability.

Even with files over 2 GB in size, the Lambda container consumed only about 120 MB of memory. Pretty sweet. Of course, this approach is applicable to any platform, not just Lambda.

Overseeing a long running process with Lambda and Step Functions

In my day job, we’re using Lambda and Step Functions to create data processing pipelines. This combo works great for a lot of our use cases. However for some specific long running tasks (e.g. web scrapers), we “outsource” the computing from Lambda to Fargate.

This poses an issue – how to plug that part of the pipeline to the Step Function orchestrating it. Using an Activity does not work when the processing is distributed among multiple workers.

A solution I came up with is creating a gatekeeper loop in the Step Function to oversee the progress of the workers by a Lambda function. This is how in looks:

gatekeeper-step-function

class NotReadyError(Exception):
"""
Custom exception signaling to the Step Function
that it cannot move to the next state. Instead,
the Retry block is triggered, pausing the process.
You can pass details about the progress as a
string when initializing it. It will be shown
in the SF console.
"""
pass
def is_processing_done():
"""
Function checking if whatever external work the
pipeline is waiting on has finished already.
"""
return False
def main(event, context):
if event.get('force'):
return
if not is_processing_done():
raise NotReadyError()
view raw gatekeeper.py hosted with ❤ by GitHub
{
"Comment": "Step Function demonstrating the gatekeeper loop",
"StartAt": "GatekeeperState",
"States": {
"GatekeeperState": {
"Type": "Task",
"Resource": "",
"Next": "DoneState",
"ResultPath": null,
"Retry": [{
"ErrorEquals": ["NotReadyError"],
"IntervalSeconds": 30,
"BackoffRate": 1.0,
"MaxAttempts": 50
}],
"Catch": [{
"ErrorEquals": ["NotReadyError"],
"ResultPath": "$.exception",
"Next": "ForceGatekeeperState"
}]
},
"ForceGatekeeperState": {
"Type": "Pass",
"Result": true,
"ResultPath": "$.force",
"Next": "GatekeeperState"
},
"DoneState": {
"Type": "Task",
"Resource": "",
"End": true
}
}
}
view raw stepfunction.json hosted with ❤ by GitHub

The gatekeeper function (triggered by the GatekeeperState) checks, if external workers have finished yet. This can be done by waiting until an SQS queue is empty, counting the number of objects in an S3 bucket or any other way indicating that the processing can move onto the next state.

If the processing is not done yet, the gatekeeper function raises a NotReadyError. This is caught by the Retry block in the Step Function, pausing the execution of a certain period of time, as defined by its parameters. Afterwards, the gatekeeper is called again.

Eventually, if the work is not done even after MaxAttempts retries, the ForceGatekeeperState is triggered. It adds a "force: true" parameter to the invocation event and calls the gatekeeper right back again. Notice that the gatekeeper function checks for this force parameter as the very first thing when executed. Since it’s present from the ForceGatekeeperState, it returns immediately and the Step Function moves on to the DoneState.

For our use case, it was better to have partial results than no results at all. That’s why the ForceGatekeeperState is present. You can also leave it out altogether and have the Step Function execution fail after MaxAttempt retries of the gatekeeper.

A better way to package Python functions for AWS Lambda

The default way of creating a zip package that’s to be deployed to AWS Lambda is to place everything – your source code and any libraries you are using – in the service root directory and compress it. I don’t like this approach as, due to the flat hierarchy it can lead to naming conflicts, it is harder to manage packaging of isolated functions and it creates a mess in the source directory.

What I do instead is install all dependencies into a lib directory (which is as simple as pip install -r requirements.txt -t lib step in the deployment pipeline) and set the PYTHONPATH environment variable to /var/runtime:/var/task/lib when deploying the Lambda functions.

This works because the zip package is extracted into /var/task in the Lambda container. While it might seem as an unstable solution, I’ve been using this for over a year now without any problems.

Python pre-commit hook

This is a pre-commit hook I use in my Python projects.

https://gist.github.com/3849310

Nevermind my feak bash-fu, in the end the script does what I want it to – the three following things:

  • First, it checks if I haven’t forgotten to add a new module to the requirements.txt file. Most of the time this works like a charm with virtualenv and pip. The only drawback is installing modules in local experimental branches – these modules are not necessary in upstream branches and so they don’t belong to requirements.txt yet. When you switch back and want to commit in an upstream branch, the pre-commit hook fails. However, this is easily avoidable by using the --no-verify option of git commit.
  • Second, it runs pyflakes on all the .py files in the repository. If there’s something pyflakes doesn’t like, the pre-commit hook fails and shows the output of pyflakes. There’s one case which is ignored and that is using the _ (underscore) function from the gettext module as install makes it available everywhere. Pyflakes documentation is non-existent and I guess there’s no way to create a configuration profile for it, so I had to resort to this hack.
  • Finally, since I deployed code with forgotten set_trace() calls a couple of times, the third thing the script does is it checks for these and prints out the file and line number if it encounters any.

I keep this file as a part of the repository, making a symbolic link to it in .git/hooks/pre-commit. Make sure the file is executable.

    Do you have similar stuff in your VCS hooks? Is there anything I could improve in mine? I’ll be glad to see your tips in the comments.

    Server-side verification of Google Play subscriptions

    TL;DR To programatically verify Google Play subscriptions, you have to use the OAuth 2.0 for web server applications auth flow, not service accounts. This is a massive FUBAR on Google’s side and makes life of developers very painful.

    Lately, I’ve been working on the backend part of a upcoming app we’re developing for one of our clients. This app offers monthly and yearly subscriptions, so I had to implement a check if the recurring payment happened, the credit card got billed and the app provider got the money. Of course, for multiple reasons, this has to be done server-side, completely automatically and without any intervention from the app user or provider.

    Google provides an API called android-publisher for this. To use any API from Google, first you have to enable it from the Console and then authenticate with it. The authentication is done via OAuth 2.0. As Google offers API access to many of their services which are used in different occasions, they also offer different OAuth 2.0 authentication flows.

    The flow/mechanism for server to server communication is called Service accounts in Google terminology. This is precisely what I needed. However, for reasons beyond my understanding, this is not the one used for android-publisher API. Instead, they chose Web server applications flow, which for this use case is absurd.

    (Sidenote: When we started to build the aforementioned app, recurring transaction were not even available for Android. We planned to use Paypal as we did for the Blackberry version. However, during development, Google introduced subscriptions for Android which made us happy.

    I started reading the docs and implementing the whole auth and check code, but it didn’t work; I was getting “This developer account does not own the application.” HTTP 401 error. Googling for this didn’t help – at that time, the only search results were two couple of hours old questions on Stack Overflow. I would swear the docs at that time mentioned to use Service accounts for authentication and later Google changed it. I had to re-read the docs from the beginning to debug this infuriating error.)

    Using Web server applications flow is ridiculous because human interaction is involved. At least once, you (in this case our client!) need to press an “Allow” button in you web browser. Palm, meet face.

    Here are the instructions you need to follow to achieve automated subscription verification. The code is in Python but it’s easy to adapt.

    First of all, in the Console, you need to create a Client ID for Web applications. You can use http://localhost as the redirect hostname. As you’ll see in a minute, it doesn’t matter much. You mostly need the Client ID and Client secret.

    Next, fire up the Python REPL and enter this:

    https://gist.github.com/3450666

    Use the Client ID and Client secret you obtained from Console. This piece of code will give you an authentication URL; by default, it will contain access_type=offline parameter. This is very important, make sure it’s there. Open the URL in your browser and log in with the Google account that you will be using for publishing the Android application. After a successfull login and authorization, you’ll be redirected to localhost in your browser. Unless you’re running a webserver locally, this will probably fail, but it doesn’t matter. The address you are redirected to will contain a code parameter. Copy its value and go back to the REPL again:

    https://gist.github.com/3450785

    Finally you’ve got an instance of the oauth2client.client.OAuth2Credentials class. It contains couple of properties but the only one that’s really interesting is the refresh_token. Store the refresh token to your server configuration, you can use it forever meaning until someone does not revoke the access to the API. Then you would have to got through this whole process again.

    Basically, thanks to this refresh token you will able to obtain a new access token on each call to the API. To do that, you create an instance of OAuth2Credentials and use that to authorize an httplib2.Http object:

    https://gist.github.com/3451039

    You can now build a service and call the get purchases API call.

    The following gist summarizes the whole blogpost:

    https://gist.github.com/3451509

    As long as the API access will not be revoked, you should be fine using this method.

    Programming for iOS from the point of view of a python developer

    I gave a talk at Prague’s python user group meetup. It was about my experience of learning and using Obj-C to develop iOS apps as a python developer. You can check out the recorded video below. Slides are on Speakerdeck (I tried to embed them but Posterous doesn’t play nicely with Speakerdeck).

    A huge thanks to Jiří, Aleš and Jakub for inviting me and everyone else who attended.

    Introducing Myngo: A web admin for MongoDB

    Myngo is a web administration interface for MongoDB. It is written in
    Python, runs on Tornado and uses jQuery on the front-end. It is a
    fresh, new project so there’s no package yet. If you want to try it
    out follow the instructions.

    So far, Myngo can display a list of available databases, collections
    and some server info. You can also do some actions with the DBs and
    collections. Check the screenshots for more details.

    Myngo-dbMyngo-collectionsMyngo-server-ingoMyngo-modal

    There’s a lot of features in plan for Myngo, the most significant being:
    * querying (or an interactive console)
    * user auth and permission system
    * slick UI
    * some kind of test suite

    I hope you’ll like what you see. Please report bugs and feature
    requests to the project’s bug tracker or directly to me. I’ll also
    gladly appreciate any kind of help (especially with design and layout)
    so feel free to fork and hack away.

    If you find Myngo valuable, please consider donating so I can spend
    more time improving it.

     

    Benchmarking Tornado’s sessions

    Tornado, being otherwise great, lacks session support which makes building even slightly complex websites a hurdle. I created a fork which adds session support to Tornado. So far it enables 6 different storage engines for sessions. I was curious how fast they are and how it would affect Tornado’s performance (requests per second served) overall.

    Methodology

    I carried out a simple benchmark. All possible configurations were left in their default state. I used two servers, bot from Rackspace, both were located at the same datacenter. The first was used to run Apache Bench to simulate server load. It was run with 300 concurent requests, up to 10 000 requests total.

    The second server was used to run the sample Tornado app. Nginx served as a load balancer to four different Tornado processes (one for each core). This is the recommended way of deploying a Tornado app. The server hardware was a 2.2 GHz quad core AMD Opteron with 2GB of RAM.

    I always ran ab to prepopulate the storage with data before doing the measured reading. This simulated older, stored sessions. The Tornado process was a simple request handler, which stored data from a GET parameter to the session.

    No session performance

    To have a baseline of Tornado’s performance on the machine, I checked out the source code and installed it. The handler script was slightly altered, because, obviously, no sessions were available. Tornado scored 1626 req/s.

    File based sessions

    I didn’t expect much of file based sessions. It’s a naive implementation where all sessions are stored in a single file. It’s not suitable for production, but it’s ok when developing and testing. Due to poor performance I had to change ab parameters to 10 concurent requests and 1000 total to get at least some results. First run ended with approx. 160 req/s, but next batch dropped to 32 req/s and the third didn’t even finish. As I said, this way of storing sessions is good for testing purposes, at most.

    Directory based sessions

    If you want your sessions to be stored in files, use directory based sessions. This solution offers traditional approach of having one file per session. Of course, with lot of users there will be a lot of sessions (and a lot of files), but then again, modern filesystems can handle a large amount of small sized files easily. Plus with hundreds of thousands of users you probably wouldn’t be using this solution.

    Directory based sessions performed reasonably well. The best run was 869 req/s, or 53.4% of the original performance. However, over time, as the amount of session files in the directory rose, it fell down to 608 req/s. The filesystem was ext3. I suspect reiserfs or JFS would score better.

    Mysql based sessions

    Tornado ships with a simple MySQL layer on top of MySQLdb module and because it is a popular choice among many web developers, I implemented support for MySQL session storage. Furthermore, it will be nice to see how it compares to its NoSQL cousins.

    I used MySQL server v5.1.37 with default configuration. The results were a bit unstable. Apache bench reported 1171, 1216 and 1353 req/s in three consecutive runs. That’s 83 % when counting the best run. I didn’t investigate the root of the inconsistent performance. Test runs showed something between 1200 and 1300 req/s, with the mysqld process often consuming the whole capacity of one core.

    Memcached based sessions

    Being a non-persistent key-value store, Memcached has an obvious advantage over MySQL. At least on paper. I used Memcached 1.4.4. built from source. The best result was 1473 req/s (90.6 %), but the other two measured runs clocked at 1106 and 1202 req/s. Again, I don’t know why the big difference occured. The code uses pylibmc which is the fastest Python lib for interacting with memcached.

    Redis based sessions

    If you want persistency with session data, you can use Redis instead of Memcached. Redis is a simple, fast key-value store with advanced features.

    I used v1.2.1 built from source. Redis scored very well with three consistent runs at around 1410 req/s, the best one showed 1418 req/s (87.2 %).

    MongoDB based sessions

    MongoDB is the last supported storage engine and it was a shock. I don’t know how the 10gen gal and guys do it, but MongoDB is FAST. All measured runs returned over 1500 req/s (1520, 1577 and 1582 req/s) which is a) supersonic and b) stable. That is 97.4 % of the original, no-sessions Tornado performance.

    To be honest, MongoDB scored 960 req/s in one of the test runs. It was because the way it works – it allocates hard disk space by creating zero-filled files, starting from 2 MB, continuously up to 2 GB per file if the database needs more space. This one time the allocation happended during the test run (it was recorded in the mongod output log so it wasn’t hard to find out the reason), hence the bad performance. However, the space allocation is infrequent and in real world it would rarely be a problem.

    Graph and data

    I put the benchmark data along with the Tornado handler up on GitHub. The graph shows worst and best runs for easy comparsion.

    Session-performance-graph

    Conclusion

    Assuming you want to store sessions along with your app’s other data, I would recommend using Redis or MongoDB; it depends on your use case. Redis is fast, easy to set up and work with and offers persistence so it wins over Memcached. If you’re building something more complex, MongoDB is the way to go. It’s fast, fun and addictive with great support from the authors. For developers seeking the traditional, SQL approach, only MySQL is available at the moment. I may add PostgeSQL and SQLite support in the future. Get in touch if you need it or watch the repo to be aware of the latest changes.

    Case-insensitive regexp matching of national characters in Python

    The goal is to have a positive match on strings ‘Šimánek’ and ‘šimánek’ when doing a case-insensitive comparison. Sounds like an easy task, right? It turns out it’s not that easy due to the ‘Š/š’ national characters at the beginning of the strings. A simple:

    >>> re.match(u'šimánek', u'Šimánek', re.I)

    returns None. Setting the right locale or using the re.L flag doesn’t help either. After a couple of experiments, I found a way how to match these strings:

    >>> re.match('šimánek'.decode('utf-8'), 'Šimánek'.decode('utf-8'), re.I | re.U)

    <_sre.SRE_Match object at 0x8d82480>

    Hope this helps.

    Django micro-frameworks

    Recently I was searching for Django micro-frameworks (or frameworks
    built on top of/out of Django, if you will). I knew there were some, I
    just couldn’t find any. Thanks to a bit of luck and my excellent
    Google-fu, I found four. These are, to my knowledge, all Django
    micro-frameworks available:

    I haven’t had time to play with any of them since. If you have any
    experience with one of there micro-frameworks, please let me know in
    the comments.