title | description |
---|---|
Batch Processing |
Utility |
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
- Reports batch item failures to reduce number of retries for a record upon errors
- Simple interface to process each batch record
- Integrates with Event Source Data Classes{target="_blank} and Parser (Pydantic){target="_blank} for self-documenting record schema
- Build your own batch processor by extending primitives
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.
With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place:
ReportBatchItemFailures
is set in your SQS, Kinesis, or DynamoDB event source properties- A specific response{target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it" We recommend implementing processing logic in an idempotent manner{target="_blank"} wherever possible.
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use ``ReportBatchItemFailures`.
You do not need any additional IAM permissions to use this utility, except for what each event source requires.
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
=== "SQS"
```yaml title="template.yaml" hl_lines="30-31"
--8<-- "examples/batch_processing/sam/sqs_batch_processing.yaml"
```
=== "Kinesis Data Streams"
```yaml title="template.yaml" hl_lines="44-45"
--8<-- "examples/batch_processing/sam/kinesis_batch_processing.yaml"
```
=== "DynamoDB Streams"
```yaml title="template.yaml" hl_lines="43-44"
--8<-- "examples/batch_processing/sam/dynamodb_batch_processing.yaml"
```
Processing batches from SQS works in three stages:
- Instantiate
BatchProcessor
and chooseEventType.SQS
for the event type - Define your function to handle each batch record, and use
SQSRecord
{target="_blank"} type annotation for autocompletion - Use
process_partial_response
to kick off processing
???+ info This code example optionally uses Tracer and Logger for completion.
=== "Recommended"
```python hl_lines="4-9 12 18 28"
--8<-- "examples/batch_processing/src/getting_started_sqs.py"
```
=== "As a context manager"
```python hl_lines="4-5 8 14 25-26 29"
--8<-- "examples/batch_processing/src/getting_started_sqs_context_manager.py"
```
=== "As a decorator (legacy)"
```python hl_lines="4-9 12 18 27 29"
--8<-- "examples/batch_processing/src/getting_started_sqs_decorator.py"
```
=== "Sample response"
The second record failed to be processed, therefore the processor added its message ID in the response.
```json
--8<-- "examples/batch_processing/src/getting_started_response.json"
```
=== "Sample event"
```json
--8<-- "examples/batch_processing/src/getting_started_event.json"
```
When using SQS FIFO queues{target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in batchItemFailures
.
This helps preserve the ordering of messages in your queue.
=== "Recommended"
```python hl_lines="5-6 11 27"
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py"
```
=== "As a context manager"
```python hl_lines="4 8"
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py"
```
=== "As a decorator (legacy)"
```python hl_lines="5-6 11 26"
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py"
```
Processing batches from Kinesis works in three stages:
- Instantiate
BatchProcessor
and chooseEventType.KinesisDataStreams
for the event type - Define your function to handle each batch record, and use
KinesisStreamRecord
{target="_blank"} type annotation for autocompletion - Use
process_partial_response
to kick off processing
???+ info This code example optionally uses Tracer and Logger for completion.
=== "Recommended"
```python hl_lines="2-9 12 18 27"
--8<-- "examples/batch_processing/src/getting_started_kinesis.py"
```
=== "As a context manager"
```python hl_lines="3-5 8 14 23-25 28"
--8<-- "examples/batch_processing/src/getting_started_kinesis_context_manager.py"
```
=== "As a decorator (legacy)"
```python hl_lines="2-9 12 18 26"
--8<-- "examples/batch_processing/src/getting_started_kinesis_decorator.py"
```
=== "Sample response"
The second record failed to be processed, therefore the processor added its sequence number in the response.
```json
--8<-- "examples/batch_processing/src/getting_started_kinesis_response.json"
```
=== "Sample event"
```json
--8<-- "examples/batch_processing/src/getting_started_kinesis_event.json"
```
Processing batches from DynamoDB Streams works in three stages:
- Instantiate
BatchProcessor
and chooseEventType.DynamoDBStreams
for the event type - Define your function to handle each batch record, and use
DynamoDBRecord
{target="_blank"} type annotation for autocompletion - Use
process_partial_response
to kick off processing
???+ info This code example optionally uses Tracer and Logger for completion.
=== "Recommended"
```python hl_lines="4-11 14 20 32"
--8<-- "examples/batch_processing/src/getting_started_dynamodb.py"
```
=== "As a context manager"
```python hl_lines="5-7 10 16 28-30 33"
--8<-- "examples/batch_processing/src/getting_started_dynamodb_context_manager.py"
```
=== "As a decorator (legacy)"
```python hl_lines="4-11 14 20 31"
--8<-- "examples/batch_processing/src/getting_started_dynamodb_decorator.py"
```
=== "Sample response"
The second record failed to be processed, therefore the processor added its sequence number in the response.
```json
--8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json"
```
=== "Sample event"
```json
--8<-- "examples/batch_processing/src/getting_started_dynamodb_response.json"
```
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
- All records successfully processed. We will return an empty list of item failures
{'batchItemFailures': []}
- Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing
- All records failed to be processed. We will raise
BatchProcessingError
exception with a list of all exceptions raised when processing
!!! tip "New to AsyncIO? Read this comprehensive guide first{target="_blank"}."
You can use AsyncBatchProcessor
class and async_process_partial_response
function to process messages concurrently.
???+ question "When is this useful?" Your use case might be able to process multiple records at the same time without conflicting with one another.
For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
--8<-- "examples/batch_processing/src/getting_started_async.py"
???+ warning "Using tracer?"
AsyncBatchProcessor
uses asyncio.gather
which can cause side effects and reach trace limits at high concurrency.
See [Tracing concurrent asynchronous functions](../core/tracer.md#concurrent-asynchronous-functions).
You can bring your own Pydantic models via model
parameter when inheriting from SqsRecordModel
, KinesisDataStreamRecord
, or DynamoDBStreamRecordModel
Inheritance is importance because we need to access message IDs and sequence numbers from these records in the event of failure. Mypy is fully integrated with this utility, so it should identify whether you're passing the incorrect Model.
=== "SQS"
```python hl_lines="8 17 27 34"
--8<-- "examples/batch_processing/src/pydantic_sqs.py"
```
=== "Kinesis Data Streams"
```python hl_lines="9 10 20 28 34 41"
--8<-- "examples/batch_processing/src/pydantic_kinesis.py"
```
=== "DynamoDB Streams"
```python hl_lines="12 13 22 32 37 41 47 55"
--8<-- "examples/batch_processing/src/pydantic_dynamodb.py"
```
Use the context manager to access a list of all returned values from your record_handler
function.
- When successful. We will include a tuple with
success
, the result ofrecord_handler
, and the batch record - When failed. We will include a tuple with
fail
, exception as a string, and the batch record
--8<-- "examples/batch_processing/src/context_manager_access.py"
Within your record_handler
function, you might need access to the Lambda context to determine how much time you have left before your function times out.
We can automatically inject the Lambda context{target="_blank"} into your record_handler
if your function signature has a parameter named lambda_context
. When using a context manager, you also need to pass the Lambda context object like in the example below.
=== "Recommended"
```python hl_lines="19"
--8<-- "examples/batch_processing/src/advanced_accessing_lambda_context.py"
```
=== "As a decorator (legacy)"
```python hl_lines="18"
--8<-- "examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py"
```
=== "As a context manager"
```python hl_lines="14 24"
--8<-- "examples/batch_processing/src/advanced_accessing_lambda_context_manager.py"
```
You might want to bring custom logic to the existing BatchProcessor
to slightly override how we handle successes and failures.
For these scenarios, you can subclass BatchProcessor
and quickly override success_handler
and failure_handler
methods:
success_handler()
– Keeps track of successful batch recordsfailure_handler()
– Keeps track of failed batch records
???+ example
Let's suppose you'd like to add a metric named BatchRecordFailures
for each batch record that failed processing
--8<-- "examples/batch_processing/src/extending_failure.py"
You can create your own partial batch processor from scratch by inheriting the BasePartialProcessor
class, and implementing _prepare()
, _clean()
and _process_record()
.
_process_record()
– handles all processing logic for each individual message of a batch, including calling therecord_handler
(self.handler)_prepare()
– called once as part of the processor initializationclean()
– teardown logic called once after_process_record
completes
You can then use this class as a context manager, or pass it to batch_processor
to use as a decorator on your Lambda handler function.
--8<-- "examples/batch_processing/src/custom_partial_processor.py"
When using Tracer to capture responses for each batch record processing, you might exceed 64K of tracing data depending on what you return from your record_handler
function, or how big is your batch size.
If that's the case, you can configure Tracer to disable response auto-capturing{target="_blank"}.
--8<-- "examples/batch_processing/src/disable_tracing.py"
As there is no external calls, you can unit test your code with BatchProcessor
quite easily.
Example:
Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response.
=== "test_app.py"
```python
--8<-- "examples/batch_processing/testing/test_app.py"
```
=== "src/app.py"
```python
--8<-- "examples/batch_processing/testing/src/app.py"
```
=== "Sample SQS event"
```json title="events/sqs_event.json"
--8<-- "examples/batch_processing/testing/events/sqs_event.json"
```
Use context manager when you want access to the processed messages or handle BatchProcessingError
exception when all records within the batch fail to be processed.
batch_processor
and async_batch_processor
decorators are now considered legacy. Historically, they were kept due to backwards compatibility and to minimize code changes between V1 and V2.
As 2.12.0, process_partial_response
and async_process_partial_response
are the recommended instead. It reduces boilerplate, smaller memory/CPU cycles, and it makes it less error prone - e.g., decorators required an additional return.
When using Sentry.io for error monitoring, you can override failure_handler
to capture each processing exception with Sentry SDK:
Credits to Charles-Axel Dein
--8<-- "examples/batch_processing/src/sentry_error_tracking.py"