In my day job, we’re using Lambda and Step Functions to create data processing pipelines. This combo works great for a lot of our use cases. However for some specific long running tasks (e.g. web scrapers), we “outsource” the computing from Lambda to Fargate.
This poses an issue – how to plug that part of the pipeline to the Step Function orchestrating it. Using an Activity does not work when the processing is distributed among multiple workers.
A solution I came up with is creating a gatekeeper loop in the Step Function to oversee the progress of the workers by a Lambda function. This is how in looks:
class NotReadyError(Exception): | |
""" | |
Custom exception signaling to the Step Function | |
that it cannot move to the next state. Instead, | |
the Retry block is triggered, pausing the process. | |
You can pass details about the progress as a | |
string when initializing it. It will be shown | |
in the SF console. | |
""" | |
pass | |
def is_processing_done(): | |
""" | |
Function checking if whatever external work the | |
pipeline is waiting on has finished already. | |
""" | |
return False | |
def main(event, context): | |
if event.get('force'): | |
return | |
if not is_processing_done(): | |
raise NotReadyError() |
{ | |
"Comment": "Step Function demonstrating the gatekeeper loop", | |
"StartAt": "GatekeeperState", | |
"States": { | |
"GatekeeperState": { | |
"Type": "Task", | |
"Resource": "", | |
"Next": "DoneState", | |
"ResultPath": null, | |
"Retry": [{ | |
"ErrorEquals": ["NotReadyError"], | |
"IntervalSeconds": 30, | |
"BackoffRate": 1.0, | |
"MaxAttempts": 50 | |
}], | |
"Catch": [{ | |
"ErrorEquals": ["NotReadyError"], | |
"ResultPath": "$.exception", | |
"Next": "ForceGatekeeperState" | |
}] | |
}, | |
"ForceGatekeeperState": { | |
"Type": "Pass", | |
"Result": true, | |
"ResultPath": "$.force", | |
"Next": "GatekeeperState" | |
}, | |
"DoneState": { | |
"Type": "Task", | |
"Resource": "", | |
"End": true | |
} | |
} | |
} |
The gatekeeper function (triggered by the GatekeeperState) checks, if external workers have finished yet. This can be done by waiting until an SQS queue is empty, counting the number of objects in an S3 bucket or any other way indicating that the processing can move onto the next state.
If the processing is not done yet, the gatekeeper function raises a NotReadyError
. This is caught by the Retry
block in the Step Function, pausing the execution of a certain period of time, as defined by its parameters. Afterwards, the gatekeeper is called again.
Eventually, if the work is not done even after MaxAttempts
retries, the ForceGatekeeperState is triggered. It adds a "force: true"
parameter to the invocation event and calls the gatekeeper right back again. Notice that the gatekeeper function checks for this force
parameter as the very first thing when executed. Since it’s present from the ForceGatekeeperState, it returns immediately and the Step Function moves on to the DoneState.
For our use case, it was better to have partial results than no results at all. That’s why the ForceGatekeeperState is present. You can also leave it out altogether and have the Step Function execution fail after MaxAttempt
retries of the gatekeeper.