title | description |
---|---|
Batch |
Utility |
import Note from "../../src/components/Note"
One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external event sources. Some of these event providers allows a feature called "Batch processing" in which predefined number of events is sent to lambda function at once.
The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (BasePartialProcessor
) allowing you to create your own batch processor.
Key Features
- Removal of successful messages for AWS SQS batch - in case of partial failure;
- Build your own batch processor using the base classes.
IAM Permissions
This utility requires additional permissions to work as expected. See the following table:
Processor | Function/Method | IAM Permission |
---|---|---|
PartialSQSProcessor | _clean |
sqs:DeleteMessageBatch |
SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations.
As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the whole batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your configuration, until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency.
A naive approach to solving this problem is to delete successful records from the queue before redriving's phase. The PartialSQSProcessor
class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures occurred during lambda's execution. Two examples are provided below, displaying the behavior of this class.
Examples:
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
def record_handler(record):
return record["body"]
def lambda_handler(event, context):
records = event["Records"]
# highlight-start
with processor(records, record_handler):
result = processor.process()
# highlight-end
return result
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
def record_handler(record):
return record["body"]
# highlight-start
@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
# highlight-end
def lambda_handler(event, context):
return {"statusCode": 200}
You can create your own partial batch processor by inheriting the BasePartialProcessor
class, and implementing _prepare()
, _clean()
and _process_record()
.
All processing logic is handled by _process_record()
whilst _prepare()
and clean()
take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively.
Example:
from random import randint
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
import boto3
def record_handler(record):
return randint(0, 100)
class MyPartialProcessor(BasePartialProcessor):
"""
Process a record and stores successful results at a DDB Table
Parameters
----------
table_name: str
Table name to write results
"""
def __init__(self, table_name: str):
self.table_name = table_name
super().__init__()
def _prepare(self):
# It's called once, *before* processing
# Creates table resource and clean previous results
# E.g.:
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
self.success_messages.clear()
def _clean(self):
# It's called once, *after* closing processing all records (closing the context manager)
# Here we're sending, at once, all successful messages to a ddb table
# E.g.:
with ddb_table.batch_writer() as batch:
for result in self.success_messages:
batch.put_item(Item=result)
def _process_record(self, record):
# It handles how your record is processed
# Here we're keeping the status of each run
# E.g.:
try:
result = self.handler(record)
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)
def success_handler(self, record):
entry = ("success", result, record)
message = {"age": result}
self.success_messages.append(message)
return entry
@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table"))
def lambda_handler(event, context):
return {"statusCode": 200}