diff --git a/Makefile b/Makefile index 0ee0ee76fbd..f030e4289ee 100644 --- a/Makefile +++ b/Makefile @@ -90,3 +90,11 @@ changelog: mypy: poetry run mypy --pretty aws_lambda_powertools + +format-examples: + poetry run isort docs/examples + poetry run black docs/examples/*/*/*.py + +lint-examples: + poetry run python3 -m py_compile docs/examples/*/*/*.py + cfn-lint docs/examples/*/*/*.yml diff --git a/docs/examples/utilities/batch/caveats_tracer_response_auto_capture.py b/docs/examples/utilities/batch/caveats_tracer_response_auto_capture.py new file mode 100644 index 00000000000..f579580cd4b --- /dev/null +++ b/docs/examples/utilities/batch/caveats_tracer_response_auto_capture.py @@ -0,0 +1,25 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method(capture_response=False) +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/custom_batch_processor.py b/docs/examples/utilities/batch/custom_batch_processor.py new file mode 100644 index 00000000000..f3944f3be86 --- /dev/null +++ b/docs/examples/utilities/batch/custom_batch_processor.py @@ -0,0 +1,62 @@ +import os +from random import randint + +import boto3 + +from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor + +table_name = os.getenv("TABLE_NAME", "table_not_found") + + +class MyPartialProcessor(BasePartialProcessor): + """ + Process a record and stores successful results at a Amazon DynamoDB Table + + Parameters + ---------- + table_name: str + DynamoDB table name to write results to + """ + + def __init__(self, table_name: str): + self.table_name = table_name + + super().__init__() + + def _prepare(self): + # It's called once, *before* processing + # Creates table resource and clean previous results + self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) + self.success_messages.clear() + + def _clean(self): + # It's called once, *after* closing processing all records (closing the context manager) + # Here we're sending, at once, all successful messages to a ddb table + with self.ddb_table.batch_writer() as batch: + for result in self.success_messages: + batch.put_item(Item=result) + + def _process_record(self, record): + # It handles how your record is processed + # Here we're keeping the status of each run + # where self.handler is the record_handler function passed as an argument + try: + result = self.handler(record) # record_handler passed to decorator/context manager + return self.success_handler(record, result) + except Exception as exc: + return self.failure_handler(record, exc) + + def success_handler(self, record, result): + entry = ("success", result, record) + message = {"age": result} + self.success_messages.append(message) + return entry + + +def record_handler(record): + return randint(0, 100) + + +@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) +def lambda_handler(event, context): + return {"statusCode": 200} diff --git a/docs/examples/utilities/batch/custom_boto3_session_context_manager.py b/docs/examples/utilities/batch/custom_boto3_session_context_manager.py new file mode 100644 index 00000000000..8644a726bcb --- /dev/null +++ b/docs/examples/utilities/batch/custom_boto3_session_context_manager.py @@ -0,0 +1,23 @@ +import boto3 + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + +session = boto3.session.Session() + + +def record_handler(record): + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + + +def lambda_handler(event, context): + records = event["Records"] + + processor = PartialSQSProcessor(boto3_session=session) + + with processor(records, record_handler): + result = processor.process() + + return result diff --git a/docs/examples/utilities/batch/custom_boto3_session_decorator.py b/docs/examples/utilities/batch/custom_boto3_session_decorator.py new file mode 100644 index 00000000000..f7c6b41ca3f --- /dev/null +++ b/docs/examples/utilities/batch/custom_boto3_session_decorator.py @@ -0,0 +1,17 @@ +import boto3 + +from aws_lambda_powertools.utilities.batch import sqs_batch_processor + +session = boto3.session.Session() + + +def record_handler(record): + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + + +@sqs_batch_processor(record_handler=record_handler, boto3_session=session) +def lambda_handler(event, context): + return {"statusCode": 200} diff --git a/docs/examples/utilities/batch/custom_config_context_manager.py b/docs/examples/utilities/batch/custom_config_context_manager.py new file mode 100644 index 00000000000..8f080d02b5e --- /dev/null +++ b/docs/examples/utilities/batch/custom_config_context_manager.py @@ -0,0 +1,23 @@ +from botocore.config import Config + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + +config = Config(region_name="us-east-1") + + +def record_handler(record): + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + + +def lambda_handler(event, context): + records = event["Records"] + + processor = PartialSQSProcessor(config=config) + + with processor(records, record_handler): + result = processor.process() + + return result diff --git a/docs/examples/utilities/batch/custom_config_decorator.py b/docs/examples/utilities/batch/custom_config_decorator.py new file mode 100644 index 00000000000..fb71355c0c1 --- /dev/null +++ b/docs/examples/utilities/batch/custom_config_decorator.py @@ -0,0 +1,17 @@ +from botocore.config import Config + +from aws_lambda_powertools.utilities.batch import sqs_batch_processor + +config = Config(region_name="us-east-1") + + +def record_handler(record): + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + + +@sqs_batch_processor(record_handler=record_handler, config=config) +def lambda_handler(event, context): + return {"statusCode": 200} diff --git a/docs/examples/utilities/batch/dynamodb_streams_context_manager.py b/docs/examples/utilities/batch/dynamodb_streams_context_manager.py new file mode 100644 index 00000000000..1fae494cfa8 --- /dev/null +++ b/docs/examples/utilities/batch/dynamodb_streams_context_manager.py @@ -0,0 +1,30 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) + payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value) + # alternatively: + # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image + # payload = change.get("Message").raw_event -> {"S": ""} + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() diff --git a/docs/examples/utilities/batch/dynamodb_streams_decorator.py b/docs/examples/utilities/batch/dynamodb_streams_decorator.py new file mode 100644 index 00000000000..ea63d834b88 --- /dev/null +++ b/docs/examples/utilities/batch/dynamodb_streams_decorator.py @@ -0,0 +1,27 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) + payload: dict = json.loads(record.dynamodb.new_image.get("Message").get_value) + # alternatively: + # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image + # payload = change.get("Message").raw_event -> {"S": ""} + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/dynamodb_streams_pydantic_inheritance.py b/docs/examples/utilities/batch/dynamodb_streams_pydantic_inheritance.py new file mode 100644 index 00000000000..f518bc65dd1 --- /dev/null +++ b/docs/examples/utilities/batch/dynamodb_streams_pydantic_inheritance.py @@ -0,0 +1,48 @@ +import json +from typing import Dict, Literal, Optional + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.parser import BaseModel, validator +from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class Order(BaseModel): + item: dict + + +class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + +class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + +class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: OrderDynamoDBRecord): + return record.dynamodb.NewImage.Message.item + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/dynamodb_streams_template.yml b/docs/examples/utilities/batch/dynamodb_streams_template.yml new file mode 100644 index 00000000000..39449b32628 --- /dev/null +++ b/docs/examples/utilities/batch/dynamodb_streams_template.yml @@ -0,0 +1,66 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.9 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records from Kinesis/DynamoDB + - Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + DynamoDBStream: + Type: DynamoDB + Properties: + Stream: !GetAtt SampleTable.StreamArn + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleTable: + Type: AWS::DynamoDB::Table + Properties: + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + - AttributeName: sk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + - AttributeName: sk + KeyType: RANGE + SSESpecification: + SSEEnabled: yes + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES diff --git a/docs/examples/utilities/batch/kinesis_data_streams_context_manager.py b/docs/examples/utilities/batch/kinesis_data_streams_context_manager.py new file mode 100644 index 00000000000..1e4a580b34b --- /dev/null +++ b/docs/examples/utilities/batch/kinesis_data_streams_context_manager.py @@ -0,0 +1,27 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() diff --git a/docs/examples/utilities/batch/kinesis_data_streams_decorator.py b/docs/examples/utilities/batch/kinesis_data_streams_decorator.py new file mode 100644 index 00000000000..6996c2a033c --- /dev/null +++ b/docs/examples/utilities/batch/kinesis_data_streams_decorator.py @@ -0,0 +1,24 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/kinesis_data_streams_pydantic_inheritance.py b/docs/examples/utilities/batch/kinesis_data_streams_pydantic_inheritance.py new file mode 100644 index 00000000000..0c80478b918 --- /dev/null +++ b/docs/examples/utilities/batch/kinesis_data_streams_pydantic_inheritance.py @@ -0,0 +1,45 @@ +import json + +from pydantic import BaseModel + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.parser import BaseModel, validator +from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord, KinesisDataStreamRecordPayload +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class Order(BaseModel): + item: dict + + +class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("data", pre=True) + def transform_message_to_dict(cls, value: str): + # Powertools KinesisDataStreamRecordPayload already decodes b64 to str here + return json.loads(value) + + +class OrderKinesisRecord(KinesisDataStreamRecord): + kinesis: OrderKinesisPayloadRecord + + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: OrderKinesisRecord): + return record.kinesis.data.item + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/kinesis_data_streams_template.yml b/docs/examples/utilities/batch/kinesis_data_streams_template.yml new file mode 100644 index 00000000000..6acb7c9ec32 --- /dev/null +++ b/docs/examples/utilities/batch/kinesis_data_streams_template.yml @@ -0,0 +1,53 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.9 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records to DLQ from Kinesis/DynamoDB + - Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + KinesisStream: + Type: Kinesis + Properties: + Stream: !GetAtt SampleStream.Arn + BatchSize: 100 + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 1 diff --git a/docs/examples/utilities/batch/migration_context_manager_after.py b/docs/examples/utilities/batch/migration_context_manager_after.py new file mode 100644 index 00000000000..466554165af --- /dev/null +++ b/docs/examples/utilities/batch/migration_context_manager_after.py @@ -0,0 +1,17 @@ +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + + +def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value + + +def lambda_handler(event, context): + records = event["Records"] + + processor = BatchProcessor(event_type=EventType.SQS) + + with processor(records, record_handler): + result = processor.process() + + return processor.response() diff --git a/docs/examples/utilities/batch/migration_context_manager_before.py b/docs/examples/utilities/batch/migration_context_manager_before.py new file mode 100644 index 00000000000..d1420838c06 --- /dev/null +++ b/docs/examples/utilities/batch/migration_context_manager_before.py @@ -0,0 +1,21 @@ +from botocore.config import Config + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + +config = Config(region_name="us-east-1") + + +def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value + + +def lambda_handler(event, context): + records = event["Records"] + + processor = PartialSQSProcessor(config=config) + + with processor(records, record_handler): + result = processor.process() + + return result diff --git a/docs/examples/utilities/batch/migration_decorator_after.py b/docs/examples/utilities/batch/migration_decorator_after.py new file mode 100644 index 00000000000..8b833eb1eb0 --- /dev/null +++ b/docs/examples/utilities/batch/migration_decorator_after.py @@ -0,0 +1,14 @@ +import json + +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + +processor = BatchProcessor(event_type=EventType.SQS) + + +def record_handler(record): + return do_something_with(record["body"]) + + +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context): + return processor.response() diff --git a/docs/examples/utilities/batch/migration_decorator_before.py b/docs/examples/utilities/batch/migration_decorator_before.py new file mode 100644 index 00000000000..8684d0405b7 --- /dev/null +++ b/docs/examples/utilities/batch/migration_decorator_before.py @@ -0,0 +1,10 @@ +from aws_lambda_powertools.utilities.batch import sqs_batch_processor + + +def record_handler(record): + return do_something_with(record["body"]) + + +@sqs_batch_processor(record_handler=record_handler) +def lambda_handler(event, context): + return {"statusCode": 200} diff --git a/docs/examples/utilities/batch/sentry_integration.py b/docs/examples/utilities/batch/sentry_integration.py new file mode 100644 index 00000000000..10adf570e6b --- /dev/null +++ b/docs/examples/utilities/batch/sentry_integration.py @@ -0,0 +1,11 @@ +from typing import Tuple + +from sentry_sdk import capture_exception + +from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse + + +class MyProcessor(BatchProcessor): + def failure_handler(self, record, exception) -> FailureResponse: + capture_exception() # send exception to Sentry + return super().failure_handler(record, exception) diff --git a/docs/examples/utilities/batch/sqs_batch_processor_extension.py b/docs/examples/utilities/batch/sqs_batch_processor_extension.py new file mode 100644 index 00000000000..ac86014cbbe --- /dev/null +++ b/docs/examples/utilities/batch/sqs_batch_processor_extension.py @@ -0,0 +1,39 @@ +import json + +from aws_lambda_powertools import Metrics, Tracer +from aws_lambda_powertools.metrics import MetricUnit +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + ExceptionInfo, + FailureResponse, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +tracer = Tracer() + + +class MyProcessor(BatchProcessor): + def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse: + metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1) + return super().failure_handler(record, exception) + + +processor = MyProcessor(event_type=EventType.SQS) +metrics = Metrics(namespace="test") + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + +@metrics.log_metrics(capture_cold_start_metric=True) +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/sqs_context_manager.py b/docs/examples/utilities/batch/sqs_context_manager.py new file mode 100644 index 00000000000..a0193a61cfa --- /dev/null +++ b/docs/examples/utilities/batch/sqs_context_manager.py @@ -0,0 +1,28 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() diff --git a/docs/examples/utilities/batch/sqs_decorator.py b/docs/examples/utilities/batch/sqs_decorator.py new file mode 100644 index 00000000000..0977bfd09b1 --- /dev/null +++ b/docs/examples/utilities/batch/sqs_decorator.py @@ -0,0 +1,25 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/sqs_processed_messages_context_manager.py b/docs/examples/utilities/batch/sqs_processed_messages_context_manager.py new file mode 100644 index 00000000000..449a14b1389 --- /dev/null +++ b/docs/examples/utilities/batch/sqs_processed_messages_context_manager.py @@ -0,0 +1,34 @@ +import json +from typing import Any, List, Literal, Union + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, FailureResponse, SuccessResponse +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process() + + for message in processed_messages: + status: Union[Literal["success"], Literal["fail"]] = message[0] + result: Any = message[1] + record: SQSRecord = message[2] + + return processor.response() diff --git a/docs/examples/utilities/batch/sqs_pydantic_inheritance.py b/docs/examples/utilities/batch/sqs_pydantic_inheritance.py new file mode 100644 index 00000000000..6e823fab27e --- /dev/null +++ b/docs/examples/utilities/batch/sqs_pydantic_inheritance.py @@ -0,0 +1,38 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.parser import BaseModel, validator +from aws_lambda_powertools.utilities.parser.models import SqsRecordModel +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class Order(BaseModel): + item: dict + + +class OrderSqsRecord(SqsRecordModel): + body: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("body", pre=True) + def transform_body_to_dict(cls, value: str): + return json.loads(value) + + +processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: OrderSqsRecord): + return record.body.item + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/sqs_template.yml b/docs/examples/utilities/batch/sqs_template.yml new file mode 100644 index 00000000000..1c1075e3be8 --- /dev/null +++ b/docs/examples/utilities/batch/sqs_template.yml @@ -0,0 +1,42 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.9 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt SampleQueue.QueueName + Events: + Batch: + Type: SQS + Properties: + Queue: !GetAtt SampleQueue.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleQueue: + Type: AWS::SQS::Queue + Properties: + VisibilityTimeout: 30 # Fn timeout * 6 + RedrivePolicy: + maxReceiveCount: 2 + deadLetterTargetArn: !GetAtt SampleDLQ.Arn diff --git a/docs/examples/utilities/batch/suppress_exception_decorator_sqs_batch_processor.py b/docs/examples/utilities/batch/suppress_exception_decorator_sqs_batch_processor.py new file mode 100644 index 00000000000..f8735d396cf --- /dev/null +++ b/docs/examples/utilities/batch/suppress_exception_decorator_sqs_batch_processor.py @@ -0,0 +1,8 @@ +from aws_lambda_powertools.utilities.batch import sqs_batch_processor + +... + + +@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) +def lambda_handler(event, context): + return {"statusCode": 200} diff --git a/docs/examples/utilities/batch/suppress_exception_partial_sqs_processor.py b/docs/examples/utilities/batch/suppress_exception_partial_sqs_processor.py new file mode 100644 index 00000000000..6dbd4e797f3 --- /dev/null +++ b/docs/examples/utilities/batch/suppress_exception_partial_sqs_processor.py @@ -0,0 +1,8 @@ +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + +... + +processor = PartialSQSProcessor(config=config, suppress_exception=True) + +with processor(records, record_handler): + result = processor.process() diff --git a/docs/examples/utilities/batch/testing_src_app.py b/docs/examples/utilities/batch/testing_src_app.py new file mode 100644 index 00000000000..0977bfd09b1 --- /dev/null +++ b/docs/examples/utilities/batch/testing_src_app.py @@ -0,0 +1,25 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/docs/examples/utilities/batch/testing_test_app.py b/docs/examples/utilities/batch/testing_test_app.py new file mode 100644 index 00000000000..cda6fb58c09 --- /dev/null +++ b/docs/examples/utilities/batch/testing_test_app.py @@ -0,0 +1,51 @@ +import json +from dataclasses import dataclass +from pathlib import Path + +import pytest +from src.app import lambda_handler, processor + + +def load_event(path: Path): + with path.open() as f: + return json.load(f) + + +@pytest.fixture +def lambda_context(): + @dataclass + class LambdaContext: + function_name: str = "test" + memory_limit_in_mb: int = 128 + invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" + aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + return LambdaContext() + + +@pytest.fixture() +def sqs_event(): + """Generates API GW Event""" + return load_event(path=Path("events/sqs_event.json")) + + +def test_app_batch_partial_response(sqs_event, lambda_context): + # GIVEN + processor = app.processor # access processor for additional assertions + successful_record = sqs_event["Records"][0] + failed_record = sqs_event["Records"][1] + expected_response = { + "batchItemFailures": [ + { + "itemIdentifier": failed_record["messageId"], + }, + ], + } + + # WHEN + ret = app.lambda_handler(sqs_event, lambda_context) + + # THEN + assert ret == expected_response + assert len(processor.fail_messages) == 1 + assert processor.success_messages[0] == successful_record diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 14dc80bdb11..69ce45bb2a2 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -20,8 +20,8 @@ If your function fails to process any message from the batch, the entire batch r With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place: -1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties -2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sqs-batchfailurereporting-syntax){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses +1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties +2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sqs-batchfailurereporting-syntax){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses ???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it" We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible. @@ -38,257 +38,46 @@ You do not need any additional IAM permissions to use this utility, except for w The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. - === "SQS" - ```yaml title="template.yaml" hl_lines="31-32" - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Description: partial batch response sample - - Globals: - Function: - Timeout: 5 - MemorySize: 256 - Runtime: python3.9 - Tracing: Active - Environment: - Variables: - LOG_LEVEL: INFO - POWERTOOLS_SERVICE_NAME: hello - - Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - CodeUri: hello_world - Policies: - - SQSPollerPolicy: - QueueName: !GetAtt SampleQueue.QueueName - Events: - Batch: - Type: SQS - Properties: - Queue: !GetAtt SampleQueue.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - - SampleQueue: - Type: AWS::SQS::Queue - Properties: - VisibilityTimeout: 30 # Fn timeout * 6 - RedrivePolicy: - maxReceiveCount: 2 - deadLetterTargetArn: !GetAtt SampleDLQ.Arn + ```yaml title="template.yaml" hl_lines="30-31" + --8<-- "docs/examples/utilities/batch/sqs_template.yml" ``` === "Kinesis Data Streams" ```yaml title="template.yaml" hl_lines="44-45" - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Description: partial batch response sample - - Globals: - Function: - Timeout: 5 - MemorySize: 256 - Runtime: python3.9 - Tracing: Active - Environment: - Variables: - LOG_LEVEL: INFO - POWERTOOLS_SERVICE_NAME: hello - - Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - CodeUri: hello_world - Policies: - # Lambda Destinations require additional permissions - # to send failure records to DLQ from Kinesis/DynamoDB - - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:SendMessage - Resource: !GetAtt SampleDLQ.Arn - Events: - KinesisStream: - Type: Kinesis - Properties: - Stream: !GetAtt SampleStream.Arn - BatchSize: 100 - StartingPosition: LATEST - MaximumRetryAttempts: 2 - DestinationConfig: - OnFailure: - Destination: !GetAtt SampleDLQ.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - - SampleStream: - Type: AWS::Kinesis::Stream - Properties: - ShardCount: 1 + --8<-- "docs/examples/utilities/batch/kinesis_data_streams_template.yml" ``` === "DynamoDB Streams" ```yaml title="template.yaml" hl_lines="43-44" - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Description: partial batch response sample - - Globals: - Function: - Timeout: 5 - MemorySize: 256 - Runtime: python3.9 - Tracing: Active - Environment: - Variables: - LOG_LEVEL: INFO - POWERTOOLS_SERVICE_NAME: hello - - Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - CodeUri: hello_world - Policies: - # Lambda Destinations require additional permissions - # to send failure records from Kinesis/DynamoDB - - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:SendMessage - Resource: !GetAtt SampleDLQ.Arn - Events: - DynamoDBStream: - Type: DynamoDB - Properties: - Stream: !GetAtt SampleTable.StreamArn - StartingPosition: LATEST - MaximumRetryAttempts: 2 - DestinationConfig: - OnFailure: - Destination: !GetAtt SampleDLQ.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - - SampleTable: - Type: AWS::DynamoDB::Table - Properties: - BillingMode: PAY_PER_REQUEST - AttributeDefinitions: - - AttributeName: pk - AttributeType: S - - AttributeName: sk - AttributeType: S - KeySchema: - - AttributeName: pk - KeyType: HASH - - AttributeName: sk - KeyType: RANGE - SSESpecification: - SSEEnabled: yes - StreamSpecification: - StreamViewType: NEW_AND_OLD_IMAGES - + --8<-- "docs/examples/utilities/batch/dynamodb_streams_template.yml" ``` ### Processing messages from SQS Processing batches from SQS works in four stages: -1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type -2. Define your function to handle each batch record, and use [`SQSRecord`](data_classes.md#sqs){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type +2. Define your function to handle each batch record, and use [`SQSRecord`](data_classes.md#sqs){target="_blank"} type annotation for autocompletion +3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing +4. Return the appropriate response contract to Lambda via **`.response()`** processor method ???+ info This code example optionally uses Tracer and Logger for completion. === "As a decorator" - ```python hl_lines="4-5 9 15 23 25" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="4-5 8 14 23 25" + --8<-- "docs/examples/utilities/batch/sqs_decorator.py" ``` === "As a context manager" - ```python hl_lines="4-5 9 15 24-26 28" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - - return processor.response() + ```python hl_lines="4-5 8 14 24-26 28" + --8<-- "docs/examples/utilities/batch/sqs_context_manager.py" ``` === "Sample response" @@ -350,73 +139,24 @@ Processing batches from SQS works in four stages: Processing batches from Kinesis works in four stages: -1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type -2. Define your function to handle each batch record, and use [`KinesisStreamRecord`](data_classes.md#kinesis-streams){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type +2. Define your function to handle each batch record, and use [`KinesisStreamRecord`](data_classes.md#kinesis-streams){target="_blank"} type annotation for autocompletion +3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing +4. Return the appropriate response contract to Lambda via **`.response()`** processor method ???+ info This code example optionally uses Tracer and Logger for completion. === "As a decorator" - ```python hl_lines="4-5 9 15 22 24" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.KinesisDataStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: KinesisStreamRecord): - logger.info(record.kinesis.data_as_text) - payload: dict = record.kinesis.data_as_json() - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="4-5 8 14 22 24" + --8<-- "docs/examples/utilities/batch/kinesis_data_streams_decorator.py" ``` === "As a context manager" - ```python hl_lines="4-5 9 15 23-25 27" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.KinesisDataStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: KinesisStreamRecord): - logger.info(record.kinesis.data_as_text) - payload: dict = record.kinesis.data_as_json() - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - - return processor.response() + ```python hl_lines="4-5 8 14 23-25 27" + --8<-- "docs/examples/utilities/batch/kinesis_data_streams_context_manager.py" ``` === "Sample response" @@ -433,7 +173,6 @@ Processing batches from Kinesis works in four stages: } ``` - === "Sample event" ```json @@ -475,84 +214,28 @@ Processing batches from Kinesis works in four stages: } ``` - ### Processing messages from DynamoDB Processing batches from Kinesis works in four stages: -1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type -2. Define your function to handle each batch record, and use [`DynamoDBRecord`](data_classes.md#dynamodb-streams){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type +2. Define your function to handle each batch record, and use [`DynamoDBRecord`](data_classes.md#dynamodb-streams){target="_blank"} type annotation for autocompletion +3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing +4. Return the appropriate response contract to Lambda via **`.response()`** processor method ???+ info This code example optionally uses Tracer and Logger for completion. === "As a decorator" - ```python hl_lines="4-5 9 15 25 27" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.DynamoDBStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: DynamoDBRecord): - logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("Message").get_value) - # alternatively: - # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image - # payload = change.get("Message").raw_event -> {"S": ""} - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="4-5 8 14 25 27" + --8<-- "docs/examples/utilities/batch/dynamodb_streams_decorator.py" ``` === "As a context manager" - ```python hl_lines="4-5 9 15 26-28 30" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.DynamoDBStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: DynamoDBRecord): - logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value) - # alternatively: - # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image - # payload = change.get("Message").raw_event -> {"S": ""} - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - - return processor.response() + ```python hl_lines="4-5 8 14 26-28 30" + --8<-- "docs/examples/utilities/batch/dynamodb_streams_context_manager.py" ``` === "Sample response" @@ -569,7 +252,6 @@ Processing batches from Kinesis works in four stages: } ``` - === "Sample event" ```json @@ -638,7 +320,6 @@ All records in the batch will be passed to this handler for processing, even if All processing logic will and should be performed by the `record_handler` function. - ## Advanced ### Pydantic integration @@ -647,139 +328,22 @@ You can bring your own Pydantic models via **`model`** parameter when inheriting Inheritance is importance because we need to access message IDs and sequence numbers from these records in the event of failure. Mypy is fully integrated with this utility, so it should identify whether you're passing the incorrect Model. - === "SQS" - ```python hl_lines="5 9-10 12-19 21 27" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import SqsRecordModel - from aws_lambda_powertools.utilities.typing import LambdaContext - - - class Order(BaseModel): - item: dict - - class OrderSqsRecord(SqsRecordModel): - body: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("body", pre=True) - def transform_body_to_dict(cls, value: str): - return json.loads(value) - - processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: OrderSqsRecord): - return record.body.item - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="6 10-11 14-21 24 30" + --8<-- "docs/examples/utilities/batch/sqs_pydantic_inheritance.py" ``` === "Kinesis Data Streams" - ```python hl_lines="5 9-10 12-20 22-23 26 32" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - class Order(BaseModel): - item: dict - - class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): - data: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("data", pre=True) - def transform_message_to_dict(cls, value: str): - # Powertools KinesisDataStreamRecordModel already decodes b64 to str here - return json.loads(value) - - class OrderKinesisRecord(KinesisDataStreamRecordModel): - kinesis: OrderKinesisPayloadRecord - - - processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: OrderKinesisRecord): - return record.kinesis.data.item - - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="8 12-13 16-24 27-28 31 37" + --8<-- "docs/examples/utilities/batch/kinesis_data_streams_pydantic_inheritance.py" ``` === "DynamoDB Streams" - ```python hl_lines="7 11-12 14-21 23-25 27-28 31 37" - import json - - from typing import Dict, Literal - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel - from aws_lambda_powertools.utilities.typing import LambdaContext - - - class Order(BaseModel): - item: dict - - class OrderDynamoDB(BaseModel): - Message: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("Message", pre=True) - def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): - return json.loads(value["S"]) - - class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): - NewImage: Optional[OrderDynamoDB] - OldImage: Optional[OrderDynamoDB] - - class OrderDynamoDBRecord(DynamoDBStreamRecordModel): - dynamodb: OrderDynamoDBChangeRecord - - - processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderKinesisRecord) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: OrderDynamoDBRecord): - return record.dynamodb.NewImage.Message.item - - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="7 11-12 15-22 25-27 30-31 34 40" + --8<-- "docs/examples/utilities/batch/dynamodb_streams_pydantic_inheritance.py" ``` ### Accessing processed messages @@ -789,51 +353,10 @@ Use the context manager to access a list of all returned values from your `recor * **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record - -```python hl_lines="31-38" title="Accessing processed messages via context manager" -import json - -from typing import Any, List, Literal, Union - -from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.utilities.batch import (BatchProcessor, - EventType, - FailureResponse, - SuccessResponse, - batch_processor) -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord -from aws_lambda_powertools.utilities.typing import LambdaContext - - -processor = BatchProcessor(event_type=EventType.SQS) -tracer = Tracer() -logger = Logger() - - -@tracer.capture_method -def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - -@logger.inject_lambda_context -@tracer.capture_lambda_handler -def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process() - - for message in processed_messages: - status: Union[Literal["success"], Literal["fail"]] = message[0] - result: Any = message[1] - record: SQSRecord = message[2] - - - return processor.response() +```python hl_lines="5 26-27 29-32" title="Accessing processed messages via context manager" +--8<-- "docs/examples/utilities/batch/sqs_processed_messages_context_manager.py" ``` - ### Extending BatchProcessor You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. @@ -847,35 +370,7 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing ```python title="Extending failure handling mechanism in BatchProcessor" - -from typing import Tuple - -from aws_lambda_powertools import Metrics -from aws_lambda_powertools.metrics import MetricUnit -from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, ExceptionInfo, EventType, FailureResponse -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - - -class MyProcessor(BatchProcessor): - def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse: - metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1) - return super().failure_handler(record, exception) - -processor = MyProcessor(event_type=EventType.SQS) -metrics = Metrics(namespace="test") - - -@tracer.capture_method -def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - -@metrics.log_metrics(capture_cold_start_metric=True) -@batch_processor(record_handler=record_handler, processor=processor) -def lambda_handler(event, context: LambdaContext): - return processor.response() +--8<-- "docs/examples/utilities/batch/sqs_batch_processor_extension.py" ``` ### Create your own partial processor @@ -888,66 +383,8 @@ You can create your own partial batch processor from scratch by inheriting the ` You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. -```python hl_lines="3 9 24 30 37 57" title="Creating a custom batch processor" -from random import randint - -from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor -import boto3 -import os - -table_name = os.getenv("TABLE_NAME", "table_not_found") - -class MyPartialProcessor(BasePartialProcessor): - """ - Process a record and stores successful results at a Amazon DynamoDB Table - - Parameters - ---------- - table_name: str - DynamoDB table name to write results to - """ - - def __init__(self, table_name: str): - self.table_name = table_name - - super().__init__() - - def _prepare(self): - # It's called once, *before* processing - # Creates table resource and clean previous results - self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) - self.success_messages.clear() - - def _clean(self): - # It's called once, *after* closing processing all records (closing the context manager) - # Here we're sending, at once, all successful messages to a ddb table - with self.ddb_table.batch_writer() as batch: - for result in self.success_messages: - batch.put_item(Item=result) - - def _process_record(self, record): - # It handles how your record is processed - # Here we're keeping the status of each run - # where self.handler is the record_handler function passed as an argument - try: - result = self.handler(record) # record_handler passed to decorator/context manager - return self.success_handler(record, result) - except Exception as exc: - return self.failure_handler(record, exc) - - def success_handler(self, record): - entry = ("success", result, record) - message = {"age": result} - self.success_messages.append(message) - return entry - - -def record_handler(record): - return randint(0, 100) - -@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) -def lambda_handler(event, context): - return {"statusCode": 200} +```python hl_lines="6 11 26 32 39 60" title="Creating a custom batch processor" +--8<-- "docs/examples/utilities/batch/custom_batch_processor.py" ``` ### Caveats @@ -958,34 +395,8 @@ When using Tracer to capture responses for each batch record processing, you mig If that's the case, you can configure [Tracer to disable response auto-capturing](../core/tracer.md#disabling-response-auto-capture){target="_blank"}. - -```python hl_lines="14" title="Disabling Tracer response auto-capturing" -import json - -from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord -from aws_lambda_powertools.utilities.typing import LambdaContext - - -processor = BatchProcessor(event_type=EventType.SQS) -tracer = Tracer() -logger = Logger() - - -@tracer.capture_method(capture_response=False) -def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - -@logger.inject_lambda_context -@tracer.capture_lambda_handler -@batch_processor(record_handler=record_handler, processor=processor) -def lambda_handler(event, context: LambdaContext): - return processor.response() - +```python hl_lines="13" title="Disabling Tracer response auto-capturing" +--8<-- "docs/examples/utilities/batch/caveats_tracer_response_auto_capture.py" ``` ## Testing your code @@ -999,87 +410,13 @@ Given a SQS batch where the first batch record succeeds and the second fails pro === "test_app.py" ```python - import json - - from pathlib import Path - from dataclasses import dataclass - - import pytest - from src.app import lambda_handler, processor - - - def load_event(path: Path): - with path.open() as f: - return json.load(f) - - - @pytest.fixture - def lambda_context(): - @dataclass - class LambdaContext: - function_name: str = "test" - memory_limit_in_mb: int = 128 - invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" - aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" - - return LambdaContext() - - @pytest.fixture() - def sqs_event(): - """Generates API GW Event""" - return load_event(path=Path("events/sqs_event.json")) - - - def test_app_batch_partial_response(sqs_event, lambda_context): - # GIVEN - processor = app.processor # access processor for additional assertions - successful_record = sqs_event["Records"][0] - failed_record = sqs_event["Records"][1] - expected_response = { - "batchItemFailures: [ - { - "itemIdentifier": failed_record["messageId"] - } - ] - } - - # WHEN - ret = app.lambda_handler(sqs_event, lambda_context) - - # THEN - assert ret == expected_response - assert len(processor.fail_messages) == 1 - assert processor.success_messages[0] == successful_record + --8<-- "docs/examples/utilities/batch/testing_test_app.py" ``` === "src/app.py" ```python - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + --8<-- "docs/examples/utilities/batch/testing_src_app.py" ``` === "Sample SQS event" @@ -1123,8 +460,6 @@ Given a SQS batch where the first batch record succeeds and the second fails pro } ``` - - ## FAQ ### Choosing between decorator and context manager @@ -1137,26 +472,15 @@ When using Sentry.io for error monitoring, you can override `failure_handler` to > Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732) -```python hl_lines="4 7-8" title="Integrating error tracking with Sentry.io" -from typing import Tuple - -from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse -from sentry_sdk import capture_exception - - -class MyProcessor(BatchProcessor): - def failure_handler(self, record, exception) -> FailureResponse: - capture_exception() # send exception to Sentry - return super().failure_handler(record, exception) +```python hl_lines="3 8-9" title="Integrating error tracking with Sentry.io" +--8<-- "docs/examples/utilities/batch/sentry_integration.py" ``` - ## Legacy ???+ tip This is kept for historical purposes. Use the new [BatchProcessor](#processing-messages-from-sqs) instead. - ### Migration guide ???+ info @@ -1175,82 +499,28 @@ You can migrate in three steps: 2. If you were using **`PartialSQSProcessor`** you can now use **`BatchProcessor`** 3. Change your Lambda Handler to return the new response format - === "Decorator: Before" - ```python hl_lines="1 6" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - - def record_handler(record): - return do_something_with(record["body"]) - - @sqs_batch_processor(record_handler=record_handler) - def lambda_handler(event, context): - return {"statusCode": 200} + ```python hl_lines="1 8" + --8<-- "docs/examples/utilities/batch/migration_decorator_before.py" ``` === "Decorator: After" - ```python hl_lines="3 5 11" - import json - - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - - processor = BatchProcessor(event_type=EventType.SQS) - - - def record_handler(record): - return do_something_with(record["body"]) - - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context): - return processor.response() + ```python hl_lines="3 5 12" + --8<-- "docs/examples/utilities/batch/migration_decorator_after.py" ``` - === "Context manager: Before" - ```python hl_lines="1-2 4 14 19" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - from botocore.config import Config - - config = Config(region_name="us-east-1") - - def record_handler(record): - return_value = do_something_with(record["body"]) - return return_value - - - def lambda_handler(event, context): - records = event["Records"] - - processor = PartialSQSProcessor(config=config) - - with processor(records, record_handler): - result = processor.process() - - return result + ```python hl_lines="1 3 5 16 21" + --8<-- "docs/examples/utilities/batch/migration_context_manager_before.py" ``` === "Context manager: After" - ```python hl_lines="1 11" - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - - - def record_handler(record): - return_value = do_something_with(record["body"]) - return return_value - - def lambda_handler(event, context): - records = event["Records"] - - processor = BatchProcessor(event_type=EventType.SQS) - - with processor(records, record_handler): - result = processor.process() - - return processor.response() + ```python hl_lines="1 12" + --8<-- "docs/examples/utilities/batch/migration_context_manager_after.py" ``` ### Customizing boto configuration @@ -1263,94 +533,28 @@ decorator or `PartialSQSProcessor` class. === "Decorator" - ```python hl_lines="4 12" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - from botocore.config import Config - - config = Config(region_name="us-east-1") - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - @sqs_batch_processor(record_handler=record_handler, config=config) - def lambda_handler(event, context): - return {"statusCode": 200} + ```python hl_lines="5 15" + --8<-- "docs/examples/utilities/batch/custom_config_decorator.py" ``` === "Context manager" - ```python hl_lines="4 16" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - from botocore.config import Config - - config = Config(region_name="us-east-1") - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - - def lambda_handler(event, context): - records = event["Records"] - - processor = PartialSQSProcessor(config=config) - - with processor(records, record_handler): - result = processor.process() - - return result + ```python hl_lines="5 18" + --8<-- "docs/examples/utilities/batch/custom_config_context_manager.py" ``` > Custom boto3 session example === "Decorator" - ```python hl_lines="4 12" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - from botocore.config import Config - - session = boto3.session.Session() - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - @sqs_batch_processor(record_handler=record_handler, boto3_session=session) - def lambda_handler(event, context): - return {"statusCode": 200} + ```python hl_lines="5 15" + --8<-- "docs/examples/utilities/batch/custom_boto3_session_decorator.py" ``` === "Context manager" - ```python hl_lines="4 16" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - import boto3 - - session = boto3.session.Session() - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - - def lambda_handler(event, context): - records = event["Records"] - - processor = PartialSQSProcessor(boto3_session=session) - - with processor(records, record_handler): - result = processor.process() - - return result + ```python hl_lines="5 18" + --8<-- "docs/examples/utilities/batch/custom_boto3_session_context_manager.py" ``` ### Suppressing exceptions @@ -1359,21 +563,12 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r === "Decorator" - ```python hl_lines="3" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - - @sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) - def lambda_handler(event, context): - return {"statusCode": 200} + ```python hl_lines="6" + --8<-- "docs/examples/utilities/batch/suppress_exception_decorator_sqs_batch_processor.py" ``` === "Context manager" - ```python hl_lines="3" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - - processor = PartialSQSProcessor(config=config, suppress_exception=True) - - with processor(records, record_handler): - result = processor.process() + ```python hl_lines="5" + --8<-- "docs/examples/utilities/batch/suppress_exception_partial_sqs_processor.py" ```