From b381b0c8eb88b6247c66adc703823928afb809d3 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 3 May 2023 15:48:27 +0200 Subject: [PATCH 01/17] chore: converted first examples --- docs/utilities/batch.md | 275 +----------------- .../sam/dynamodb_batch_processing.yaml | 66 +++++ .../sam/kinesis_batch_processing.yaml | 53 ++++ .../sam/sqs_batch_processing.yaml | 42 +++ .../src/getting_started_event.json | 36 +++ .../src/getting_started_response.json | 7 + .../src/getting_started_sqs.py | 1 - .../getting_started_sqs_context_manager.py | 29 ++ .../src/getting_started_sqs_decorator.py | 29 ++ 9 files changed, 274 insertions(+), 264 deletions(-) create mode 100644 examples/batch_processing/sam/dynamodb_batch_processing.yaml create mode 100644 examples/batch_processing/sam/kinesis_batch_processing.yaml create mode 100644 examples/batch_processing/sam/sqs_batch_processing.yaml create mode 100644 examples/batch_processing/src/getting_started_event.json create mode 100644 examples/batch_processing/src/getting_started_response.json create mode 100644 examples/batch_processing/src/getting_started_sqs_context_manager.py create mode 100644 examples/batch_processing/src/getting_started_sqs_decorator.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index c4d7dc26e6c..a0ead644355 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -42,179 +42,20 @@ The remaining sections of the documentation will rely on these samples. For comp === "SQS" - ```yaml title="template.yaml" hl_lines="31-32" - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Description: partial batch response sample - - Globals: - Function: - Timeout: 5 - MemorySize: 256 - Runtime: python3.9 - Tracing: Active - Environment: - Variables: - LOG_LEVEL: INFO - POWERTOOLS_SERVICE_NAME: hello - - Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - CodeUri: hello_world - Policies: - - SQSPollerPolicy: - QueueName: !GetAtt SampleQueue.QueueName - Events: - Batch: - Type: SQS - Properties: - Queue: !GetAtt SampleQueue.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - - SampleQueue: - Type: AWS::SQS::Queue - Properties: - VisibilityTimeout: 30 # Fn timeout * 6 - RedrivePolicy: - maxReceiveCount: 2 - deadLetterTargetArn: !GetAtt SampleDLQ.Arn + ```yaml title="template.yaml" hl_lines="30-31" + --8<-- "examples/batch_processing/sam/sqs_batch_processing.yaml" ``` === "Kinesis Data Streams" ```yaml title="template.yaml" hl_lines="44-45" - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Description: partial batch response sample - - Globals: - Function: - Timeout: 5 - MemorySize: 256 - Runtime: python3.9 - Tracing: Active - Environment: - Variables: - LOG_LEVEL: INFO - POWERTOOLS_SERVICE_NAME: hello - - Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - CodeUri: hello_world - Policies: - # Lambda Destinations require additional permissions - # to send failure records to DLQ from Kinesis/DynamoDB - - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:SendMessage - Resource: !GetAtt SampleDLQ.Arn - Events: - KinesisStream: - Type: Kinesis - Properties: - Stream: !GetAtt SampleStream.Arn - BatchSize: 100 - StartingPosition: LATEST - MaximumRetryAttempts: 2 - DestinationConfig: - OnFailure: - Destination: !GetAtt SampleDLQ.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - - SampleStream: - Type: AWS::Kinesis::Stream - Properties: - ShardCount: 1 + --8<-- "examples/batch_processing/sam/kinesis_batch_processing.yaml" ``` === "DynamoDB Streams" ```yaml title="template.yaml" hl_lines="43-44" - AWSTemplateFormatVersion: '2010-09-09' - Transform: AWS::Serverless-2016-10-31 - Description: partial batch response sample - - Globals: - Function: - Timeout: 5 - MemorySize: 256 - Runtime: python3.9 - Tracing: Active - Environment: - Variables: - LOG_LEVEL: INFO - POWERTOOLS_SERVICE_NAME: hello - - Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Handler: app.lambda_handler - CodeUri: hello_world - Policies: - # Lambda Destinations require additional permissions - # to send failure records from Kinesis/DynamoDB - - Version: "2012-10-17" - Statement: - Effect: "Allow" - Action: - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:SendMessage - Resource: !GetAtt SampleDLQ.Arn - Events: - DynamoDBStream: - Type: DynamoDB - Properties: - Stream: !GetAtt SampleTable.StreamArn - StartingPosition: LATEST - MaximumRetryAttempts: 2 - DestinationConfig: - OnFailure: - Destination: !GetAtt SampleDLQ.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - - SampleTable: - Type: AWS::DynamoDB::Table - Properties: - BillingMode: PAY_PER_REQUEST - AttributeDefinitions: - - AttributeName: pk - AttributeType: S - - AttributeName: sk - AttributeType: S - KeySchema: - - AttributeName: pk - KeyType: HASH - - AttributeName: sk - KeyType: RANGE - SSESpecification: - SSEEnabled: yes - StreamSpecification: - StreamViewType: NEW_AND_OLD_IMAGES - + --8<-- "examples/batch_processing/sam/dynamodb_batch_processing.yaml" ``` ### Processing messages from SQS @@ -230,126 +71,34 @@ Processing batches from SQS works in three stages: === "Recommended" - ```python hl_lines="4 9 12 18 29" + ```python hl_lines="4-9 12 18 28" --8<-- "examples/batch_processing/src/getting_started_sqs.py" ``` === "As a context manager" - ```python hl_lines="4-5 9 15 24-26 28" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - - return processor.response() + ```python hl_lines="4-5 8 14 25-26 29" + --8<-- "examples/batch_processing/src/getting_started_sqs_context_manager.py" ``` === "As a decorator (legacy)" - ```python hl_lines="4-5 9 15 23 25" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="4-9 12 18 27 29" + --8<-- "examples/batch_processing/src/getting_started_sqs_decorator.py" ``` === "Sample response" The second record failed to be processed, therefore the processor added its message ID in the response. - ```python - { - 'batchItemFailures': [ - { - 'itemIdentifier': '244fc6b4-87a3-44ab-83d2-361172410c3a' - } - ] - } + ```json + --8<-- "examples/batch_processing/src/getting_started_response.json" ``` === "Sample event" ```json - { - "Records": [ - { - "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "{\"Message\": \"success\"}", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", - "awsRegion": "us-east-1" - }, - { - "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", - "awsRegion": "us-east-1" - } - ] - } + --8<-- "examples/batch_processing/src/getting_started_event.json" ``` #### FIFO queues diff --git a/examples/batch_processing/sam/dynamodb_batch_processing.yaml b/examples/batch_processing/sam/dynamodb_batch_processing.yaml new file mode 100644 index 00000000000..4229a767ec8 --- /dev/null +++ b/examples/batch_processing/sam/dynamodb_batch_processing.yaml @@ -0,0 +1,66 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.9 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records from Kinesis/DynamoDB + - Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + DynamoDBStream: + Type: DynamoDB + Properties: + Stream: !GetAtt SampleTable.StreamArn + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleTable: + Type: AWS::DynamoDB::Table + Properties: + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + - AttributeName: sk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + - AttributeName: sk + KeyType: RANGE + SSESpecification: + SSEEnabled: true + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES diff --git a/examples/batch_processing/sam/kinesis_batch_processing.yaml b/examples/batch_processing/sam/kinesis_batch_processing.yaml new file mode 100644 index 00000000000..e28cd1b5622 --- /dev/null +++ b/examples/batch_processing/sam/kinesis_batch_processing.yaml @@ -0,0 +1,53 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.9 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records to DLQ from Kinesis/DynamoDB + - Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + KinesisStream: + Type: Kinesis + Properties: + Stream: !GetAtt SampleStream.Arn + BatchSize: 100 + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 1 diff --git a/examples/batch_processing/sam/sqs_batch_processing.yaml b/examples/batch_processing/sam/sqs_batch_processing.yaml new file mode 100644 index 00000000000..83a57676437 --- /dev/null +++ b/examples/batch_processing/sam/sqs_batch_processing.yaml @@ -0,0 +1,42 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.9 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt SampleQueue.QueueName + Events: + Batch: + Type: SQS + Properties: + Queue: !GetAtt SampleQueue.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleQueue: + Type: AWS::SQS::Queue + Properties: + VisibilityTimeout: 30 # Fn timeout * 6 + RedrivePolicy: + maxReceiveCount: 2 + deadLetterTargetArn: !GetAtt SampleDLQ.Arn diff --git a/examples/batch_processing/src/getting_started_event.json b/examples/batch_processing/src/getting_started_event.json new file mode 100644 index 00000000000..50a411be861 --- /dev/null +++ b/examples/batch_processing/src/getting_started_event.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"Message\": \"success\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + } + ] +} diff --git a/examples/batch_processing/src/getting_started_response.json b/examples/batch_processing/src/getting_started_response.json new file mode 100644 index 00000000000..9802316a689 --- /dev/null +++ b/examples/batch_processing/src/getting_started_response.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a" + } + ] +} diff --git a/examples/batch_processing/src/getting_started_sqs.py b/examples/batch_processing/src/getting_started_sqs.py index 15f8701f297..8b6fe4c4266 100644 --- a/examples/batch_processing/src/getting_started_sqs.py +++ b/examples/batch_processing/src/getting_started_sqs.py @@ -20,7 +20,6 @@ def record_handler(record: SQSRecord): if payload: item: dict = json.loads(payload) logger.info(item) - ... @logger.inject_lambda_context diff --git a/examples/batch_processing/src/getting_started_sqs_context_manager.py b/examples/batch_processing/src/getting_started_sqs_context_manager.py new file mode 100644 index 00000000000..19c14dfdd76 --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs_context_manager.py @@ -0,0 +1,29 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + logger.info(f"Processed ${len(processed_messages)} messages") + + return processor.response() diff --git a/examples/batch_processing/src/getting_started_sqs_decorator.py b/examples/batch_processing/src/getting_started_sqs_decorator.py new file mode 100644 index 00000000000..4f058beb862 --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs_decorator.py @@ -0,0 +1,29 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() From b2077112c6d5c6e35d3fd09c2818d3ead9612f77 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 3 May 2023 16:00:05 +0200 Subject: [PATCH 02/17] chore: refactored kinesis examples --- docs/utilities/batch.md | 110 ++---------------- .../src/getting_started_kinesis.py | 1 - ...getting_started_kinesis_context_manager.py | 28 +++++ .../src/getting_started_kinesis_decorator.py | 28 +++++ .../src/getting_started_kinesis_event.json | 36 ++++++ .../src/getting_started_kinesis_response.json | 7 ++ .../src/getting_started_sqs_fifo.py | 7 +- ...etting_started_sqs_fifo_context_manager.py | 7 +- .../src/getting_started_sqs_fifo_decorator.py | 7 +- 9 files changed, 128 insertions(+), 103 deletions(-) create mode 100644 examples/batch_processing/src/getting_started_kinesis_context_manager.py create mode 100644 examples/batch_processing/src/getting_started_kinesis_decorator.py create mode 100644 examples/batch_processing/src/getting_started_kinesis_event.json create mode 100644 examples/batch_processing/src/getting_started_kinesis_response.json diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index a0ead644355..a0a8139992d 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -108,19 +108,19 @@ This helps preserve the ordering of messages in your queue. === "Recommended" - ```python hl_lines="3 9" + ```python hl_lines="5-6 11 27" --8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py" ``` === "As a context manager" - ```python hl_lines="2 6" + ```python hl_lines="4 8" --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py" ``` === "As a decorator (legacy)" - ```python hl_lines="3 9" + ```python hl_lines="5-6 11 26" --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py" ``` @@ -137,122 +137,34 @@ Processing batches from Kinesis works in three stages: === "Recommended" - ```python hl_lines="2 7 12 18 28" + ```python hl_lines="2-9 12 18 27" --8<-- "examples/batch_processing/src/getting_started_kinesis.py" ``` === "As a context manager" - ```python hl_lines="4-5 9 15 23-25 27" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType - from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.KinesisDataStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: KinesisStreamRecord): - logger.info(record.kinesis.data_as_text) - payload: dict = record.kinesis.data_as_json() - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - - return processor.response() + ```python hl_lines="3-5 8 14 23-25 28" + --8<-- "examples/batch_processing/src/getting_started_kinesis_context_manager.py" ``` === "As a decorator (legacy)" - ```python hl_lines="2-3 7 20 22" - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.KinesisDataStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: KinesisStreamRecord): - logger.info(record.kinesis.data_as_text) - payload: dict = record.kinesis.data_as_json() - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="2-9 12 18 26" + --8<-- "examples/batch_processing/src/getting_started_kinesis_decorator.py" ``` === "Sample response" The second record failed to be processed, therefore the processor added its sequence number in the response. - ```python - { - 'batchItemFailures': [ - { - 'itemIdentifier': '6006958808509702859251049540584488075644979031228738' - } - ] - } + ```json + --8<-- "examples/batch_processing/src/getting_started_kinesis_response.json" ``` === "Sample event" ```json - { - "Records": [ - { - "kinesis": { - "kinesisSchemaVersion": "1.0", - "partitionKey": "1", - "sequenceNumber": "4107859083838847772757075850904226111829882106684065", - "data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==", - "approximateArrivalTimestamp": 1545084650.987 - }, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - "awsRegion": "us-east-2", - "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" - }, - { - "kinesis": { - "kinesisSchemaVersion": "1.0", - "partitionKey": "1", - "sequenceNumber": "6006958808509702859251049540584488075644979031228738", - "data": "c3VjY2Vzcw==", - "approximateArrivalTimestamp": 1545084650.987 - }, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - "awsRegion": "us-east-2", - "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" - } - ] - } + --8<-- "examples/batch_processing/src/getting_started_kinesis_event.json" ``` ### Processing messages from DynamoDB diff --git a/examples/batch_processing/src/getting_started_kinesis.py b/examples/batch_processing/src/getting_started_kinesis.py index e58222733e1..179154e3b1f 100644 --- a/examples/batch_processing/src/getting_started_kinesis.py +++ b/examples/batch_processing/src/getting_started_kinesis.py @@ -19,7 +19,6 @@ def record_handler(record: KinesisStreamRecord): logger.info(record.kinesis.data_as_text) payload: dict = record.kinesis.data_as_json() logger.info(payload) - ... @logger.inject_lambda_context diff --git a/examples/batch_processing/src/getting_started_kinesis_context_manager.py b/examples/batch_processing/src/getting_started_kinesis_context_manager.py new file mode 100644 index 00000000000..8af0a9e52cf --- /dev/null +++ b/examples/batch_processing/src/getting_started_kinesis_context_manager.py @@ -0,0 +1,28 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + KinesisStreamRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + logger.info(f"Processed ${len(processed_messages)} messages") + + return processor.response() diff --git a/examples/batch_processing/src/getting_started_kinesis_decorator.py b/examples/batch_processing/src/getting_started_kinesis_decorator.py new file mode 100644 index 00000000000..107c94ffbad --- /dev/null +++ b/examples/batch_processing/src/getting_started_kinesis_decorator.py @@ -0,0 +1,28 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + KinesisStreamRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/examples/batch_processing/src/getting_started_kinesis_event.json b/examples/batch_processing/src/getting_started_kinesis_event.json new file mode 100644 index 00000000000..2721ad7d9a7 --- /dev/null +++ b/examples/batch_processing/src/getting_started_kinesis_event.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "4107859083838847772757075850904226111829882106684065", + "data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "6006958808509702859251049540584488075644979031228738", + "data": "c3VjY2Vzcw==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} diff --git a/examples/batch_processing/src/getting_started_kinesis_response.json b/examples/batch_processing/src/getting_started_kinesis_response.json new file mode 100644 index 00000000000..7ebd013d7f7 --- /dev/null +++ b/examples/batch_processing/src/getting_started_kinesis_response.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "6006958808509702859251049540584488075644979031228738" + } + ] +} diff --git a/examples/batch_processing/src/getting_started_sqs_fifo.py b/examples/batch_processing/src/getting_started_sqs_fifo.py index d39f8ba63f1..d30fb319c85 100644 --- a/examples/batch_processing/src/getting_started_sqs_fifo.py +++ b/examples/batch_processing/src/getting_started_sqs_fifo.py @@ -1,3 +1,5 @@ +import json + from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import ( SqsFifoPartialProcessor, @@ -13,7 +15,10 @@ @tracer.capture_method def record_handler(record: SQSRecord): - ... + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) @logger.inject_lambda_context diff --git a/examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py b/examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py index 45759b2a585..310cc3b9839 100644 --- a/examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py +++ b/examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py @@ -1,3 +1,5 @@ +import json + from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord @@ -10,7 +12,10 @@ @tracer.capture_method def record_handler(record: SQSRecord): - ... + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) @logger.inject_lambda_context diff --git a/examples/batch_processing/src/getting_started_sqs_fifo_decorator.py b/examples/batch_processing/src/getting_started_sqs_fifo_decorator.py index a5fe9f23235..22448d2ce8a 100644 --- a/examples/batch_processing/src/getting_started_sqs_fifo_decorator.py +++ b/examples/batch_processing/src/getting_started_sqs_fifo_decorator.py @@ -1,3 +1,5 @@ +import json + from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import ( SqsFifoPartialProcessor, @@ -13,7 +15,10 @@ @tracer.capture_method def record_handler(record: SQSRecord): - ... + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) @logger.inject_lambda_context From bc60371d352041a2d8f69b8e9c42732ce6c8275b Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 14:27:03 +0200 Subject: [PATCH 03/17] chore: refactored dynamodb samples --- docs/utilities/batch.md | 120 ++---------------- .../src/getting_started_dynamodb.py | 10 +- ...etting_started_dynamodb_context_manager.py | 33 +++++ .../src/getting_started_dynamodb_decorator.py | 33 +++++ .../src/getting_started_dynamodb_event.json | 7 + .../getting_started_dynamodb_response.json | 50 ++++++++ 6 files changed, 137 insertions(+), 116 deletions(-) create mode 100644 examples/batch_processing/src/getting_started_dynamodb_context_manager.py create mode 100644 examples/batch_processing/src/getting_started_dynamodb_decorator.py create mode 100644 examples/batch_processing/src/getting_started_dynamodb_event.json create mode 100644 examples/batch_processing/src/getting_started_dynamodb_response.json diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index a0a8139992d..8fedcce4887 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -180,138 +180,34 @@ Processing batches from DynamoDB Streams works in three stages: === "Recommended" - ```python hl_lines="4 9 14 20 30" + ```python hl_lines="4-11 14 20 32" --8<-- "examples/batch_processing/src/getting_started_dynamodb.py" ``` === "As a context manager" - ```python hl_lines="4-5 9 15 23-27" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType - from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.DynamoDBStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: DynamoDBRecord): - logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("Message")) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - - return processor.response() + ```python hl_lines="5-7 10 16 28-30 33" + --8<-- "examples/batch_processing/src/getting_started_dynamodb_context_manager.py" ``` === "As a decorator (legacy)" - ```python hl_lines="4-5 9 15 22 24" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.DynamoDBStreams) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: DynamoDBRecord): - logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("Message")) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="4-11 14 20 31" + --8<-- "examples/batch_processing/src/getting_started_dynamodb_decorator.py" ``` === "Sample response" The second record failed to be processed, therefore the processor added its sequence number in the response. - ```python - { - 'batchItemFailures': [ - { - 'itemIdentifier': '8640712661' - } - ] - } + ```json + --8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json" ``` === "Sample event" ```json - { - "Records": [ - { - "eventID": "1", - "eventVersion": "1.0", - "dynamodb": { - "Keys": { - "Id": { - "N": "101" - } - }, - "NewImage": { - "Message": { - "S": "failure" - } - }, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "3275880929", - "SizeBytes": 26 - }, - "awsRegion": "us-west-2", - "eventName": "INSERT", - "eventSourceARN": "eventsource_arn", - "eventSource": "aws:dynamodb" - }, - { - "eventID": "1", - "eventVersion": "1.0", - "dynamodb": { - "Keys": { - "Id": { - "N": "101" - } - }, - "NewImage": { - "SomethingElse": { - "S": "success" - } - }, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "8640712661", - "SizeBytes": 26 - }, - "awsRegion": "us-west-2", - "eventName": "INSERT", - "eventSourceARN": "eventsource_arn", - "eventSource": "aws:dynamodb" - } - ] - } + --8<-- "examples/batch_processing/src/getting_started_dynamodb_response.json" ``` ### Partial failure mechanics diff --git a/examples/batch_processing/src/getting_started_dynamodb.py b/examples/batch_processing/src/getting_started_dynamodb.py index 60d8ed89f0e..61990e2bd26 100644 --- a/examples/batch_processing/src/getting_started_dynamodb.py +++ b/examples/batch_processing/src/getting_started_dynamodb.py @@ -18,10 +18,12 @@ @tracer.capture_method def record_handler(record: DynamoDBRecord): - logger.info(record.dynamodb.new_image) # type: ignore[union-attr] - payload: dict = json.loads(record.dynamodb.new_image.get("Message")) # type: ignore[union-attr,arg-type] - logger.info(payload) - ... + if record.dynamodb and record.dynamodb.new_image: + logger.info(record.dynamodb.new_image) + message = record.dynamodb.new_image.get("Message") + if message: + payload: dict = json.loads(message) + logger.info(payload) @logger.inject_lambda_context diff --git a/examples/batch_processing/src/getting_started_dynamodb_context_manager.py b/examples/batch_processing/src/getting_started_dynamodb_context_manager.py new file mode 100644 index 00000000000..155e1354551 --- /dev/null +++ b/examples/batch_processing/src/getting_started_dynamodb_context_manager.py @@ -0,0 +1,33 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( + DynamoDBRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + if record.dynamodb and record.dynamodb.new_image: + logger.info(record.dynamodb.new_image) + message = record.dynamodb.new_image.get("Message") + if message: + payload: dict = json.loads(message) + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + logger.info(f"Processed ${len(processed_messages)} messages") + + return processor.response() diff --git a/examples/batch_processing/src/getting_started_dynamodb_decorator.py b/examples/batch_processing/src/getting_started_dynamodb_decorator.py new file mode 100644 index 00000000000..a2df6a11f8c --- /dev/null +++ b/examples/batch_processing/src/getting_started_dynamodb_decorator.py @@ -0,0 +1,33 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( + DynamoDBRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + if record.dynamodb and record.dynamodb.new_image: + logger.info(record.dynamodb.new_image) + message = record.dynamodb.new_image.get("Message") + if message: + payload: dict = json.loads(message) + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/examples/batch_processing/src/getting_started_dynamodb_event.json b/examples/batch_processing/src/getting_started_dynamodb_event.json new file mode 100644 index 00000000000..9ccbde9ba9f --- /dev/null +++ b/examples/batch_processing/src/getting_started_dynamodb_event.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "8640712661" + } + ] +} diff --git a/examples/batch_processing/src/getting_started_dynamodb_response.json b/examples/batch_processing/src/getting_started_dynamodb_response.json new file mode 100644 index 00000000000..f74c2429a5a --- /dev/null +++ b/examples/batch_processing/src/getting_started_dynamodb_response.json @@ -0,0 +1,50 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "failure" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "3275880929", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "SomethingElse": { + "S": "success" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "8640712661", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + } + ] +} From 9951d39c57db1d4224e3dc6a3fb54b8632c76f71 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 14:40:22 +0200 Subject: [PATCH 04/17] chore: pydantic refactor --- docs/utilities/batch.md | 162 +----------------- .../src/context_manager_access.py | 36 ++++ .../batch_processing/src/pydantic_dynamodb.py | 55 ++++++ .../batch_processing/src/pydantic_kinesis.py | 41 +++++ examples/batch_processing/src/pydantic_sqs.py | 34 ++++ 5 files changed, 173 insertions(+), 155 deletions(-) create mode 100644 examples/batch_processing/src/context_manager_access.py create mode 100644 examples/batch_processing/src/pydantic_dynamodb.py create mode 100644 examples/batch_processing/src/pydantic_kinesis.py create mode 100644 examples/batch_processing/src/pydantic_sqs.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 8fedcce4887..afa111be140 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -250,130 +250,20 @@ Inheritance is importance because we need to access message IDs and sequence num === "SQS" - ```python hl_lines="5 14 23 29" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response - from aws_lambda_powertools.utilities.parser.models import SqsRecordModel - from aws_lambda_powertools.utilities.typing import LambdaContext - from aws_lambda_powertools.utilities.parser import BaseModel - from aws_lambda_powertools.utilities.parser.types import Json - - - class Order(BaseModel): - item: dict - - class OrderSqsRecord(SqsRecordModel): - body: Json[Order] # deserialize order data from JSON string - - processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: OrderSqsRecord): - return record.body.item - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) + ```python hl_lines="8 17 27 34" + --8<-- "examples/batch_processing/src/pydantic_sqs.py" ``` === "Kinesis Data Streams" - ```python hl_lines="5 15 19 23 29 36" - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response - from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecordPayload, KinesisDataStreamRecord - from aws_lambda_powertools.utilities.parser import BaseModel, validator - from aws_lambda_powertools.utilities.parser.types import Json - from aws_lambda_powertools.utilities.typing import LambdaContext - - - class Order(BaseModel): - item: dict - - - class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): - data: Json[Order] - - - class OrderKinesisRecord(KinesisDataStreamRecord): - kinesis: OrderKinesisPayloadRecord - - - processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: OrderKinesisRecord): - return record.kinesis.data.item - - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) + ```python hl_lines="9 10 20 28 34 41" + --8<-- "examples/batch_processing/src/pydantic_kinesis.py" ``` === "DynamoDB Streams" - ```python hl_lines="7 16 26 31 35 41" - import json - - from typing import Dict, Literal, Optional - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response - from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel - from aws_lambda_powertools.utilities.typing import LambdaContext - from aws_lambda_powertools.utilities.parser import BaseModel, validator - - - class Order(BaseModel): - item: dict - - - class OrderDynamoDB(BaseModel): - Message: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("Message", pre=True) - def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): - return json.loads(value["S"]) - - - class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): - NewImage: Optional[OrderDynamoDB] - OldImage: Optional[OrderDynamoDB] - - - class OrderDynamoDBRecord(DynamoDBStreamRecordModel): - dynamodb: OrderDynamoDBChangeRecord - - - processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: OrderDynamoDBRecord): - return record.dynamodb.NewImage.Message.item - - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) + ```python hl_lines="12 13 22 32 37 41 47 55" + --8<-- "examples/batch_processing/src/pydantic_dynamodb.py" ``` ### Accessing processed messages @@ -384,45 +274,7 @@ Use the context manager to access a list of all returned values from your `recor * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record ```python hl_lines="30-36" title="Accessing processed messages via context manager" -import json - -from typing import Any, List, Literal, Union - -from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.utilities.batch import (BatchProcessor, - EventType, - FailureResponse, - SuccessResponse) -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord -from aws_lambda_powertools.utilities.typing import LambdaContext - - -processor = BatchProcessor(event_type=EventType.SQS) -tracer = Tracer() -logger = Logger() - - -@tracer.capture_method -def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - -@logger.inject_lambda_context -@tracer.capture_lambda_handler -def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process() - - for message in processed_messages: - status: Union[Literal["success"], Literal["fail"]] = message[0] - result: Any = message[1] - record: SQSRecord = message[2] - - - return processor.response() +--8<-- "examples/batch_processing/src/context_manager_access.py" ``` ### Accessing Lambda Context diff --git a/examples/batch_processing/src/context_manager_access.py b/examples/batch_processing/src/context_manager_access.py new file mode 100644 index 00000000000..73a84d45ddf --- /dev/null +++ b/examples/batch_processing/src/context_manager_access.py @@ -0,0 +1,36 @@ +import json +from typing import Any, List, Literal, Tuple, Union + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages: List[Tuple] = processor.process() + + for message in processed_messages: + status: Union[Literal["success"], Literal["fail"]] = message[0] + result: Any = message[1] + record: SQSRecord = message[2] + + logger.info(status, result, record) + + return processor.response() diff --git a/examples/batch_processing/src/pydantic_dynamodb.py b/examples/batch_processing/src/pydantic_dynamodb.py new file mode 100644 index 00000000000..ae0c22144f0 --- /dev/null +++ b/examples/batch_processing/src/pydantic_dynamodb.py @@ -0,0 +1,55 @@ +import json +from typing import Dict, Literal, Optional + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.parser import BaseModel, validator +from aws_lambda_powertools.utilities.parser.models import ( + DynamoDBStreamChangedRecordModel, + DynamoDBStreamRecordModel, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class Order(BaseModel): + item: dict + + +class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(self, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + +class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + +class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: OrderDynamoDBRecord): + if record.dynamodb.NewImage and record.dynamodb.NewImage.Message: + return record.dynamodb.NewImage.Message.item + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/pydantic_kinesis.py b/examples/batch_processing/src/pydantic_kinesis.py new file mode 100644 index 00000000000..2907a978e57 --- /dev/null +++ b/examples/batch_processing/src/pydantic_kinesis.py @@ -0,0 +1,41 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.parser import BaseModel +from aws_lambda_powertools.utilities.parser.models import ( + KinesisDataStreamRecord, + KinesisDataStreamRecordPayload, +) +from aws_lambda_powertools.utilities.parser.types import Json +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class Order(BaseModel): + item: dict + + +class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Json[Order] + + +class OrderKinesisRecord(KinesisDataStreamRecord): + kinesis: OrderKinesisPayloadRecord + + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: OrderKinesisRecord): + return record.kinesis.data.item + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/pydantic_sqs.py b/examples/batch_processing/src/pydantic_sqs.py new file mode 100644 index 00000000000..31d33a1e38d --- /dev/null +++ b/examples/batch_processing/src/pydantic_sqs.py @@ -0,0 +1,34 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.parser import BaseModel +from aws_lambda_powertools.utilities.parser.models import SqsRecordModel +from aws_lambda_powertools.utilities.parser.types import Json +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class Order(BaseModel): + item: dict + + +class OrderSqsRecord(SqsRecordModel): + body: Json[Order] # deserialize order data from JSON string + + +processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: OrderSqsRecord): + return record.body.item + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) From d637f59680f3dc9b0942bf86dd39f2a6a7735d70 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 14:49:09 +0200 Subject: [PATCH 05/17] chore: advanced accessing lambda context refactor --- docs/utilities/batch.md | 59 ++----------------- .../src/advanced_accessing_lambda_context.py | 1 - ...nced_accessing_lambda_context_decorator.py | 28 +++++++++ ...vanced_accessing_lambda_context_manager.py | 27 +++++++++ 4 files changed, 60 insertions(+), 55 deletions(-) create mode 100644 examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py create mode 100644 examples/batch_processing/src/advanced_accessing_lambda_context_manager.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index afa111be140..79de48ecce5 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -273,7 +273,7 @@ Use the context manager to access a list of all returned values from your `recor * **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record -```python hl_lines="30-36" title="Accessing processed messages via context manager" +```python hl_lines="26-34" title="Accessing processed messages via context manager" --8<-- "examples/batch_processing/src/context_manager_access.py" ``` @@ -291,63 +291,14 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam === "As a decorator (legacy)" - ```python hl_lines="15" - from typing import Optional - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, - batch_processor) - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): - if lambda_context is not None: - remaining_time = lambda_context.get_remaining_time_in_millis() - ... - - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context: LambdaContext): - return processor.response() + ```python hl_lines="18" + --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py" ``` === "As a context manager" - ```python hl_lines="14 23" - from typing import Optional - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): - if lambda_context is not None: - remaining_time = lambda_context.get_remaining_time_in_millis() - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler, lambda_context=context): - result = processor.process() - - return result + ```python hl_lines="14 24" + --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context_manager.py" ``` ### Extending BatchProcessor diff --git a/examples/batch_processing/src/advanced_accessing_lambda_context.py b/examples/batch_processing/src/advanced_accessing_lambda_context.py index 96d95ca5445..9de02c6c1e5 100644 --- a/examples/batch_processing/src/advanced_accessing_lambda_context.py +++ b/examples/batch_processing/src/advanced_accessing_lambda_context.py @@ -21,7 +21,6 @@ def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = if payload: item: dict = json.loads(payload) logger.info(item) - ... @logger.inject_lambda_context diff --git a/examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py b/examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py new file mode 100644 index 00000000000..267e9ddbd62 --- /dev/null +++ b/examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py @@ -0,0 +1,28 @@ +from typing import Optional + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): + if lambda_context is not None: + remaining_time = lambda_context.get_remaining_time_in_millis() + logger.info(remaining_time) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/examples/batch_processing/src/advanced_accessing_lambda_context_manager.py b/examples/batch_processing/src/advanced_accessing_lambda_context_manager.py new file mode 100644 index 00000000000..17b719a84d4 --- /dev/null +++ b/examples/batch_processing/src/advanced_accessing_lambda_context_manager.py @@ -0,0 +1,27 @@ +from typing import Optional + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): + if lambda_context is not None: + remaining_time = lambda_context.get_remaining_time_in_millis() + logger.info(remaining_time) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler, lambda_context=context): + result = processor.process() + + return result From bda4cf968d10df74ebc63552cc5fce3b420cb197 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 14:55:12 +0200 Subject: [PATCH 06/17] chore: extending failure --- docs/utilities/batch.md | 29 +------------- .../batch_processing/src/extending_failure.py | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+), 28 deletions(-) create mode 100644 examples/batch_processing/src/extending_failure.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 79de48ecce5..8270126e245 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -314,34 +314,7 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing ```python title="Extending failure handling mechanism in BatchProcessor" - -from typing import Tuple - -from aws_lambda_powertools import Metrics -from aws_lambda_powertools.metrics import MetricUnit -from aws_lambda_powertools.utilities.batch import BatchProcessor, ExceptionInfo, EventType, FailureResponse, process_partial_response -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - - -class MyProcessor(BatchProcessor): - def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse: - metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1) - return super().failure_handler(record, exception) - -processor = MyProcessor(event_type=EventType.SQS) -metrics = Metrics(namespace="test") - - -@tracer.capture_method -def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - -@metrics.log_metrics(capture_cold_start_metric=True) -def lambda_handler(event, context: LambdaContext): - return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) +--8<-- "examples/batch_processing/src/extending_failure.py" ``` ### Create your own partial processor diff --git a/examples/batch_processing/src/extending_failure.py b/examples/batch_processing/src/extending_failure.py new file mode 100644 index 00000000000..424c9a5189b --- /dev/null +++ b/examples/batch_processing/src/extending_failure.py @@ -0,0 +1,38 @@ +import json + +from aws_lambda_powertools import Logger, Metrics, Tracer +from aws_lambda_powertools.metrics import MetricUnit +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + ExceptionInfo, + FailureResponse, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + + +class MyProcessor(BatchProcessor): + def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse: + metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1) + return super().failure_handler(record, exception) + + +processor = MyProcessor(event_type=EventType.SQS) +metrics = Metrics(namespace="test") +logger = Logger() +tracer = Tracer() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + + +@metrics.log_metrics(capture_cold_start_metric=True) +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) From b271fb6ec864b18f14ecdc17f7ec8e2e4a67c852 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 15:06:43 +0200 Subject: [PATCH 07/17] chore: custom partial processor --- docs/utilities/batch.md | 62 +--------------- .../src/custom_partial_processor.py | 70 +++++++++++++++++++ 2 files changed, 72 insertions(+), 60 deletions(-) create mode 100644 examples/batch_processing/src/custom_partial_processor.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 8270126e245..ffa667f7cd6 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -327,66 +327,8 @@ You can create your own partial batch processor from scratch by inheriting the ` You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. -```python hl_lines="3 9 24 30 37 57" title="Creating a custom batch processor" -from random import randint - -from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor -import boto3 -import os - -table_name = os.getenv("TABLE_NAME", "table_not_found") - -class MyPartialProcessor(BasePartialProcessor): - """ - Process a record and stores successful results at a Amazon DynamoDB Table - - Parameters - ---------- - table_name: str - DynamoDB table name to write results to - """ - - def __init__(self, table_name: str): - self.table_name = table_name - - super().__init__() - - def _prepare(self): - # It's called once, *before* processing - # Creates table resource and clean previous results - self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) - self.success_messages.clear() - - def _clean(self): - # It's called once, *after* closing processing all records (closing the context manager) - # Here we're sending, at once, all successful messages to a ddb table - with self.ddb_table.batch_writer() as batch: - for result in self.success_messages: - batch.put_item(Item=result) - - def _process_record(self, record): - # It handles how your record is processed - # Here we're keeping the status of each run - # where self.handler is the record_handler function passed as an argument - try: - result = self.handler(record) # record_handler passed to decorator/context manager - return self.success_handler(record, result) - except Exception as exc: - return self.failure_handler(record, exc) - - def success_handler(self, record): - entry = ("success", result, record) - message = {"age": result} - self.success_messages.append(message) - return entry - - -def record_handler(record): - return randint(0, 100) - -@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) -def lambda_handler(event, context): - return {"statusCode": 200} +```python hl_lines="9 16 31 37 44 55 68" title="Creating a custom batch processor" +--8<-- "examples/batch_processing/src/custom_partial_processor.py" ``` ### Caveats diff --git a/examples/batch_processing/src/custom_partial_processor.py b/examples/batch_processing/src/custom_partial_processor.py new file mode 100644 index 00000000000..80d3b63fbe0 --- /dev/null +++ b/examples/batch_processing/src/custom_partial_processor.py @@ -0,0 +1,70 @@ +import os +import sys +from random import randint +from typing import Any + +import boto3 + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor + +table_name = os.getenv("TABLE_NAME", "table_not_found") + +logger = Logger() + + +class MyPartialProcessor(BasePartialProcessor): + """ + Process a record and stores successful results at a Amazon DynamoDB Table + + Parameters + ---------- + table_name: str + DynamoDB table name to write results to + """ + + def __init__(self, table_name: str): + self.table_name = table_name + + super().__init__() + + def _prepare(self): + # It's called once, *before* processing + # Creates table resource and clean previous results + self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) + self.success_messages.clear() + + def _clean(self): + # It's called once, *after* closing processing all records (closing the context manager) + # Here we're sending, at once, all successful messages to a ddb table + with self.ddb_table.batch_writer() as batch: + for result in self.success_messages: + batch.put_item(Item=result) + + def _process_record(self, record): + # It handles how your record is processed + # Here we're keeping the status of each run + # where self.handler is the record_handler function passed as an argument + try: + result = self.handler(record) # record_handler passed to decorator/context manager + return self.success_handler(record, result) + except Exception as exc: + logger.error(exc) + return self.failure_handler(record, sys.exc_info()) + + def success_handler(self, record, result: Any): + entry = ("success", result, record) + self.success_messages.append(record) + return entry + + async def _async_process_record(self, record: dict): + raise NotImplementedError() + + +def record_handler(record): + return randint(0, 100) + + +@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) +def lambda_handler(event, context): + return {"statusCode": 200} From 1d5ede4a7a609555b5cc4bb67aca3ef88706c345 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 15:14:52 +0200 Subject: [PATCH 08/17] chore: refactored testing --- docs/utilities/batch.md | 145 +----------------- .../batch_processing/src/disable_tracing.py | 29 ++++ .../testing/events/sqs_event.json | 36 +++++ .../batch_processing/testing/src/__init__.py | 0 examples/batch_processing/testing/src/app.py | 28 ++++ examples/batch_processing/testing/test_app.py | 46 ++++++ 6 files changed, 145 insertions(+), 139 deletions(-) create mode 100644 examples/batch_processing/src/disable_tracing.py create mode 100644 examples/batch_processing/testing/events/sqs_event.json create mode 100644 examples/batch_processing/testing/src/__init__.py create mode 100644 examples/batch_processing/testing/src/app.py create mode 100644 examples/batch_processing/testing/test_app.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ffa667f7cd6..28189024743 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -339,33 +339,8 @@ When using Tracer to capture responses for each batch record processing, you mig If that's the case, you can configure [Tracer to disable response auto-capturing](../core/tracer.md#disabling-response-auto-capture){target="_blank"}. -```python hl_lines="14" title="Disabling Tracer response auto-capturing" -import json - -from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord -from aws_lambda_powertools.utilities.typing import LambdaContext - - -processor = BatchProcessor(event_type=EventType.SQS) -tracer = Tracer() -logger = Logger() - - -@tracer.capture_method(capture_response=False) -def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - -@logger.inject_lambda_context -@tracer.capture_lambda_handler -@batch_processor(record_handler=record_handler, processor=processor) -def lambda_handler(event, context: LambdaContext): - return processor.response() - +```python hl_lines="17" title="Disabling Tracer response auto-capturing" +--8<-- "examples/batch_processing/src/disable_tracing.py" ``` ## Testing your code @@ -379,127 +354,19 @@ Given a SQS batch where the first batch record succeeds and the second fails pro === "test_app.py" ```python - import json - - from pathlib import Path - from dataclasses import dataclass - - import pytest - from src.app import lambda_handler, processor - - - def load_event(path: Path): - with path.open() as f: - return json.load(f) - - - @pytest.fixture - def lambda_context(): - @dataclass - class LambdaContext: - function_name: str = "test" - memory_limit_in_mb: int = 128 - invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" - aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" - - return LambdaContext() - - @pytest.fixture() - def sqs_event(): - """Generates API GW Event""" - return load_event(path=Path("events/sqs_event.json")) - - - def test_app_batch_partial_response(sqs_event, lambda_context): - # GIVEN - processor = app.processor # access processor for additional assertions - successful_record = sqs_event["Records"][0] - failed_record = sqs_event["Records"][1] - expected_response = { - "batchItemFailures: [ - { - "itemIdentifier": failed_record["messageId"] - } - ] - } - - # WHEN - ret = app.lambda_handler(sqs_event, lambda_context) - - # THEN - assert ret == expected_response - assert len(processor.fail_messages) == 1 - assert processor.success_messages[0] == successful_record + --8<-- "examples/batch_processing/testing/test_app.py" ``` === "src/app.py" ```python - import json - - from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.typing import LambdaContext - - - processor = BatchProcessor(event_type=EventType.SQS) - tracer = Tracer() - logger = Logger() - - - @tracer.capture_method - def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - ... - - @logger.inject_lambda_context - @tracer.capture_lambda_handler - def lambda_handler(event, context: LambdaContext): - return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) + --8<-- "examples/batch_processing/testing/src/app.py" ``` === "Sample SQS event" - ```json title="events/sqs_sample.json" - { - "Records": [ - { - "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "{\"Message\": \"success\"}", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", - "awsRegion": "us-east-1" - }, - { - "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", - "awsRegion": "us-east-1" - } - ] - } + ```json title="events/sqs_event.json" + --8<-- "examples/batch_processing/testing/events/sqs_event.json" ``` ## FAQ diff --git a/examples/batch_processing/src/disable_tracing.py b/examples/batch_processing/src/disable_tracing.py new file mode 100644 index 00000000000..3b3e4b30d35 --- /dev/null +++ b/examples/batch_processing/src/disable_tracing.py @@ -0,0 +1,29 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method(capture_response=False) +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/examples/batch_processing/testing/events/sqs_event.json b/examples/batch_processing/testing/events/sqs_event.json new file mode 100644 index 00000000000..cdc63cf1fbe --- /dev/null +++ b/examples/batch_processing/testing/events/sqs_event.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"Message\": \"success\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1" + } + ] +} diff --git a/examples/batch_processing/testing/src/__init__.py b/examples/batch_processing/testing/src/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/examples/batch_processing/testing/src/app.py b/examples/batch_processing/testing/src/app.py new file mode 100644 index 00000000000..8b6fe4c4266 --- /dev/null +++ b/examples/batch_processing/testing/src/app.py @@ -0,0 +1,28 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/testing/test_app.py b/examples/batch_processing/testing/test_app.py new file mode 100644 index 00000000000..2caf460b04e --- /dev/null +++ b/examples/batch_processing/testing/test_app.py @@ -0,0 +1,46 @@ +import json +from dataclasses import dataclass +from pathlib import Path + +import pytest + +from examples.batch_processing.testing.src import app + + +def load_event(path: Path): + with path.open() as f: + return json.load(f) + + +@pytest.fixture +def lambda_context(): + @dataclass + class LambdaContext: + function_name: str = "test" + memory_limit_in_mb: int = 128 + invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" + aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + return LambdaContext() + + +@pytest.fixture() +def sqs_event(): + """Generates API GW Event""" + return load_event(path=Path("events/sqs_event.json")) + + +def test_app_batch_partial_response(sqs_event, lambda_context): + # GIVEN + processor = app.processor # access processor for additional assertions + successful_record = sqs_event["Records"][0] + failed_record = sqs_event["Records"][1] + expected_response = {"batchItemFailures": [{"itemIdentifier": failed_record["messageId"]}]} + + # WHEN + ret = app.lambda_handler(sqs_event, lambda_context) + + # THEN + assert ret == expected_response + assert len(processor.fail_messages) == 1 + assert processor.success_messages[0] == successful_record From 3bbb1f29e6b0834e5a1bf501a2268af168918031 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Tue, 9 May 2023 15:21:38 +0200 Subject: [PATCH 09/17] chore: sentry refactor --- docs/utilities/batch.md | 13 ++----------- .../batch_processing/src/sentry_error_tracking.py | 9 +++++++++ 2 files changed, 11 insertions(+), 11 deletions(-) create mode 100644 examples/batch_processing/src/sentry_error_tracking.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 28189024743..c92e59e2fdc 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -387,15 +387,6 @@ When using Sentry.io for error monitoring, you can override `failure_handler` to > Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732) -```python hl_lines="4 7-8" title="Integrating error tracking with Sentry.io" -from typing import Tuple - -from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse -from sentry_sdk import capture_exception - - -class MyProcessor(BatchProcessor): - def failure_handler(self, record, exception) -> FailureResponse: - capture_exception() # send exception to Sentry - return super().failure_handler(record, exception) +```python hl_lines="1 7-8" title="Integrating error tracking with Sentry.io" +--8<-- "examples/batch_processing/src/sentry_error_tracking.py" ``` diff --git a/examples/batch_processing/src/sentry_error_tracking.py b/examples/batch_processing/src/sentry_error_tracking.py new file mode 100644 index 00000000000..59ec8262cdb --- /dev/null +++ b/examples/batch_processing/src/sentry_error_tracking.py @@ -0,0 +1,9 @@ +from sentry_sdk import capture_exception + +from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse + + +class MyProcessor(BatchProcessor): + def failure_handler(self, record, exception) -> FailureResponse: + capture_exception() # send exception to Sentry + return super().failure_handler(record, exception) From 946d9f8798571b41e8b677c6be5ff311bdd6c0d9 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 9 May 2023 22:00:41 +0100 Subject: [PATCH 10/17] docs: small fixes --- docs/utilities/batch.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index c92e59e2fdc..b34781c3f5f 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -21,7 +21,7 @@ If your function fails to process any message from the batch, the entire batch r With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place: 1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties -2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sqs-batchfailurereporting-syntax){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses +2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses @@ -32,7 +32,7 @@ With this utility, batch records are processed individually – only messages th ## Getting started -Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use ``ReportBatchItemFailures`. +Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use `ReportBatchItemFailures`. You do not need any additional IAM permissions to use this utility, except for what each event source requires. @@ -231,14 +231,14 @@ You can use `AsyncBatchProcessor` class and `async_process_partial_response` fun The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). -```python hl_lines="3 11 14 24" title="High-concurrency with AsyncBatchProcessor" +```python hl_lines="3 11 14 24-26" title="High-concurrency with AsyncBatchProcessor" --8<-- "examples/batch_processing/src/getting_started_async.py" ``` ???+ warning "Using tracer?" `AsyncBatchProcessor` uses `asyncio.gather` which can cause side effects and reach trace limits at high concurrency. - See [Tracing concurrent asynchronous functions](../core/tracer.md#concurrent-asynchronous-functions). + See [Tracing concurrent asynchronous functions](../core/tracer.md#concurrent-asynchronous-functions){target="_blank"}. ## Advanced @@ -385,7 +385,7 @@ As 2.12.0, `process_partial_response` and `async_process_partial_response` are t When using Sentry.io for error monitoring, you can override `failure_handler` to capture each processing exception with Sentry SDK: -> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732) +> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732){target="_blank"} ```python hl_lines="1 7-8" title="Integrating error tracking with Sentry.io" --8<-- "examples/batch_processing/src/sentry_error_tracking.py" From 6d668d968b62036e49eb91fae790671520022287 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 9 May 2023 22:15:36 +0100 Subject: [PATCH 11/17] docs: small fixes --- docs/utilities/batch.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index b34781c3f5f..ae41efa71a7 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -319,15 +319,16 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc ### Create your own partial processor -You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. +You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()`, `_process_record()` and `_async_process_record()`. * **`_process_record()`** – handles all processing logic for each individual message of a batch, including calling the `record_handler` (self.handler) * **`_prepare()`** – called once as part of the processor initialization -* **`clean()`** – teardown logic called once after `_process_record` completes +* **`_clean()`** – teardown logic called once after `_process_record` completes +* **`_async_process_record()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. -```python hl_lines="9 16 31 37 44 55 68" title="Creating a custom batch processor" +```python hl_lines="9 16 31 37 44 55 60 68" title="Creating a custom batch processor" --8<-- "examples/batch_processing/src/custom_partial_processor.py" ``` @@ -371,7 +372,7 @@ Given a SQS batch where the first batch record succeeds and the second fails pro ## FAQ -### Choosing between decorator and context manager +### Choosing between method and context manager Use context manager when you want access to the processed messages or handle `BatchProcessingError` exception when all records within the batch fail to be processed. From 174cc3a9b8720eb808e6129ab8a9c6c3e09345cf Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 10 May 2023 13:24:42 +0200 Subject: [PATCH 12/17] Apply suggestions from code review Co-authored-by: Leandro Damascena Signed-off-by: Ruben Fonseca --- examples/batch_processing/sam/dynamodb_batch_processing.yaml | 2 +- examples/batch_processing/sam/kinesis_batch_processing.yaml | 2 +- examples/batch_processing/sam/sqs_batch_processing.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/batch_processing/sam/dynamodb_batch_processing.yaml b/examples/batch_processing/sam/dynamodb_batch_processing.yaml index 4229a767ec8..2ed70d65a86 100644 --- a/examples/batch_processing/sam/dynamodb_batch_processing.yaml +++ b/examples/batch_processing/sam/dynamodb_batch_processing.yaml @@ -6,7 +6,7 @@ Globals: Function: Timeout: 5 MemorySize: 256 - Runtime: python3.9 + Runtime: python3.10 Tracing: Active Environment: Variables: diff --git a/examples/batch_processing/sam/kinesis_batch_processing.yaml b/examples/batch_processing/sam/kinesis_batch_processing.yaml index e28cd1b5622..28b2c58402b 100644 --- a/examples/batch_processing/sam/kinesis_batch_processing.yaml +++ b/examples/batch_processing/sam/kinesis_batch_processing.yaml @@ -6,7 +6,7 @@ Globals: Function: Timeout: 5 MemorySize: 256 - Runtime: python3.9 + Runtime: python3.10 Tracing: Active Environment: Variables: diff --git a/examples/batch_processing/sam/sqs_batch_processing.yaml b/examples/batch_processing/sam/sqs_batch_processing.yaml index 83a57676437..00bbd00e569 100644 --- a/examples/batch_processing/sam/sqs_batch_processing.yaml +++ b/examples/batch_processing/sam/sqs_batch_processing.yaml @@ -6,7 +6,7 @@ Globals: Function: Timeout: 5 MemorySize: 256 - Runtime: python3.9 + Runtime: python3.10 Tracing: Active Environment: Variables: From f099c94bbfa836ff2768e0762fe8c625ec4702e3 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 10 May 2023 13:43:50 +0200 Subject: [PATCH 13/17] fix: showcase lambda_context --- docs/utilities/batch.md | 2 +- .../src/advanced_accessing_lambda_context.py | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ae41efa71a7..0e062e50726 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -285,7 +285,7 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam === "Recommended" - ```python hl_lines="19" + ```python hl_lines="18" --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context.py" ``` diff --git a/examples/batch_processing/src/advanced_accessing_lambda_context.py b/examples/batch_processing/src/advanced_accessing_lambda_context.py index 9de02c6c1e5..b0e7eeb98af 100644 --- a/examples/batch_processing/src/advanced_accessing_lambda_context.py +++ b/examples/batch_processing/src/advanced_accessing_lambda_context.py @@ -1,4 +1,3 @@ -import json from typing import Optional from aws_lambda_powertools import Logger, Tracer @@ -17,10 +16,9 @@ @tracer.capture_method def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - logger.info(item) + if lambda_context is not None: + remaining_time = lambda_context.get_remaining_time_in_millis() + logger.info(remaining_time) @logger.inject_lambda_context From bd0cde8c14d1ec8685ddb339efbf471acac185f2 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Wed, 10 May 2023 14:04:34 +0200 Subject: [PATCH 14/17] fix: addressed the rest of the comments --- docs/utilities/batch.md | 4 ++-- .../src/custom_partial_processor.py | 16 +++++++++++----- examples/batch_processing/src/disable_tracing.py | 5 ++--- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 0e062e50726..5f95b70ba05 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -313,7 +313,7 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc ???+ example Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing -```python title="Extending failure handling mechanism in BatchProcessor" +```python hl_lines="8 9 16-19 22 38" title="Extending failure handling mechanism in BatchProcessor" --8<-- "examples/batch_processing/src/extending_failure.py" ``` @@ -328,7 +328,7 @@ You can create your own partial batch processor from scratch by inheriting the ` You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. -```python hl_lines="9 16 31 37 44 55 60 68" title="Creating a custom batch processor" +```python hl_lines="9-12 20 35 41 48 59 64 68 76" title="Creating a custom batch processor" --8<-- "examples/batch_processing/src/custom_partial_processor.py" ``` diff --git a/examples/batch_processing/src/custom_partial_processor.py b/examples/batch_processing/src/custom_partial_processor.py index 80d3b63fbe0..353f612e7cc 100644 --- a/examples/batch_processing/src/custom_partial_processor.py +++ b/examples/batch_processing/src/custom_partial_processor.py @@ -6,14 +6,18 @@ import boto3 from aws_lambda_powertools import Logger -from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor +from aws_lambda_powertools.utilities.batch import ( + BasePartialBatchProcessor, + EventType, + process_partial_response, +) table_name = os.getenv("TABLE_NAME", "table_not_found") logger = Logger() -class MyPartialProcessor(BasePartialProcessor): +class MyPartialProcessor(BasePartialBatchProcessor): """ Process a record and stores successful results at a Amazon DynamoDB Table @@ -26,7 +30,7 @@ class MyPartialProcessor(BasePartialProcessor): def __init__(self, table_name: str): self.table_name = table_name - super().__init__() + super().__init__(event_type=EventType.SQS) def _prepare(self): # It's called once, *before* processing @@ -61,10 +65,12 @@ async def _async_process_record(self, record: dict): raise NotImplementedError() +processor = MyPartialProcessor(table_name) + + def record_handler(record): return randint(0, 100) -@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) def lambda_handler(event, context): - return {"statusCode": 200} + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/disable_tracing.py b/examples/batch_processing/src/disable_tracing.py index 3b3e4b30d35..c8967044f74 100644 --- a/examples/batch_processing/src/disable_tracing.py +++ b/examples/batch_processing/src/disable_tracing.py @@ -4,7 +4,7 @@ from aws_lambda_powertools.utilities.batch import ( BatchProcessor, EventType, - batch_processor, + process_partial_response, ) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -24,6 +24,5 @@ def record_handler(record: SQSRecord): @logger.inject_lambda_context @tracer.capture_lambda_handler -@batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) From b92614abf0dd65748ec50c8691035a1f3c8a1eb6 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 11 May 2023 00:40:03 +0100 Subject: [PATCH 15/17] docs: small changes to help others + sentry dev dependency --- docs/utilities/batch.md | 48 ++++++++++++------ .../src/context_manager_access.py | 9 ++-- .../src/getting_started_dynamodb_event.json | 48 +++++++++++++++++- .../getting_started_dynamodb_response.json | 47 +---------------- ...nt.json => getting_started_sqs_event.json} | 0 ...json => getting_started_sqs_response.json} | 0 .../getting_started_with_test.py} | 11 ++-- .../getting_started_with_test_app.py} | 0 .../batch_processing/src/pydantic_dynamodb.py | 3 +- .../src/pydantic_dynamodb_event.json | 50 +++++++++++++++++++ .../batch_processing/src/pydantic_kinesis.py | 1 + .../src/pydantic_kinesis_event.json | 36 +++++++++++++ examples/batch_processing/src/pydantic_sqs.py | 1 + .../src/pydantic_sqs_event.json | 36 +++++++++++++ .../testing/events/sqs_event.json | 36 ------------- .../batch_processing/testing/src/__init__.py | 0 poetry.lock | 48 ++++++++++++++++-- pyproject.toml | 1 + 18 files changed, 263 insertions(+), 112 deletions(-) rename examples/batch_processing/src/{getting_started_event.json => getting_started_sqs_event.json} (100%) rename examples/batch_processing/src/{getting_started_response.json => getting_started_sqs_response.json} (100%) rename examples/batch_processing/{testing/test_app.py => src/getting_started_with_test.py} (75%) rename examples/batch_processing/{testing/src/app.py => src/getting_started_with_test_app.py} (100%) create mode 100644 examples/batch_processing/src/pydantic_dynamodb_event.json create mode 100644 examples/batch_processing/src/pydantic_kinesis_event.json create mode 100644 examples/batch_processing/src/pydantic_sqs_event.json delete mode 100644 examples/batch_processing/testing/events/sqs_event.json delete mode 100644 examples/batch_processing/testing/src/__init__.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 5f95b70ba05..79156d56f1b 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -92,13 +92,13 @@ Processing batches from SQS works in three stages: The second record failed to be processed, therefore the processor added its message ID in the response. ```json - --8<-- "examples/batch_processing/src/getting_started_response.json" + --8<-- "examples/batch_processing/src/getting_started_sqs_response.json" ``` === "Sample event" ```json - --8<-- "examples/batch_processing/src/getting_started_event.json" + --8<-- "examples/batch_processing/src/getting_started_sqs_event.json" ``` #### FIFO queues @@ -201,13 +201,13 @@ Processing batches from DynamoDB Streams works in three stages: The second record failed to be processed, therefore the processor added its sequence number in the response. ```json - --8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json" + --8<-- "examples/batch_processing/src/getting_started_dynamodb_response.json" ``` === "Sample event" ```json - --8<-- "examples/batch_processing/src/getting_started_dynamodb_response.json" + --8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json" ``` ### Partial failure mechanics @@ -250,22 +250,40 @@ Inheritance is importance because we need to access message IDs and sequence num === "SQS" - ```python hl_lines="8 17 27 34" + ```python hl_lines="8 17 27 35" --8<-- "examples/batch_processing/src/pydantic_sqs.py" ``` +=== "SQS - Sample Event " + + ```json hl_lines="6 22" + --8<-- "examples/batch_processing/src/pydantic_sqs_event.json" + ``` + === "Kinesis Data Streams" - ```python hl_lines="9 10 20 28 34 41" + ```python hl_lines="9 10 20 28 34 42" --8<-- "examples/batch_processing/src/pydantic_kinesis.py" ``` +=== "Kinesis - Sample Event " + + ```json hl_lines="8 24" + --8<-- "examples/batch_processing/src/pydantic_kinesis_event.json" + ``` + === "DynamoDB Streams" - ```python hl_lines="12 13 22 32 37 41 47 55" + ```python hl_lines="12 13 22 32 37 41 47 56" --8<-- "examples/batch_processing/src/pydantic_dynamodb.py" ``` +=== "DynamoDB - Sample Event " + + ```json hl_lines="13-15 36-38" + --8<-- "examples/batch_processing/src/pydantic_dynamodb_event.json" + ``` + ### Accessing processed messages Use the context manager to access a list of all returned values from your `record_handler` function. @@ -273,7 +291,7 @@ Use the context manager to access a list of all returned values from your `recor * **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record -```python hl_lines="26-34" title="Accessing processed messages via context manager" +```python hl_lines="28-33" title="Accessing processed messages via context manager" --8<-- "examples/batch_processing/src/context_manager_access.py" ``` @@ -285,13 +303,13 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam === "Recommended" - ```python hl_lines="18" + ```python hl_lines="18 27" --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context.py" ``` === "As a decorator (legacy)" - ```python hl_lines="18" + ```python hl_lines="18 26" --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context_decorator.py" ``` @@ -352,22 +370,22 @@ As there is no external calls, you can unit test your code with `BatchProcessor` Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response. -=== "test_app.py" +=== "getting_started_with_test.py" ```python - --8<-- "examples/batch_processing/testing/test_app.py" + --8<-- "examples/batch_processing/src/getting_started_with_test.py" ``` -=== "src/app.py" +=== "getting_started_with_test_app.py" ```python - --8<-- "examples/batch_processing/testing/src/app.py" + --8<-- "examples/batch_processing/src/getting_started_with_test_app.py" ``` === "Sample SQS event" ```json title="events/sqs_event.json" - --8<-- "examples/batch_processing/testing/events/sqs_event.json" + --8<-- "examples/batch_processing/src/getting_started_sqs_event.json" ``` ## FAQ diff --git a/examples/batch_processing/src/context_manager_access.py b/examples/batch_processing/src/context_manager_access.py index 73a84d45ddf..aa3abf8e530 100644 --- a/examples/batch_processing/src/context_manager_access.py +++ b/examples/batch_processing/src/context_manager_access.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import json -from typing import Any, List, Literal, Tuple, Union +from typing import List, Literal, Tuple from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType @@ -27,10 +29,9 @@ def lambda_handler(event, context: LambdaContext): processed_messages: List[Tuple] = processor.process() for message in processed_messages: - status: Union[Literal["success"], Literal["fail"]] = message[0] - result: Any = message[1] + status: Literal["success"] | Literal["fail"] = message[0] record: SQSRecord = message[2] - logger.info(status, result, record) + logger.info(status, record=record) return processor.response() diff --git a/examples/batch_processing/src/getting_started_dynamodb_event.json b/examples/batch_processing/src/getting_started_dynamodb_event.json index 9ccbde9ba9f..2508a6f0b67 100644 --- a/examples/batch_processing/src/getting_started_dynamodb_event.json +++ b/examples/batch_processing/src/getting_started_dynamodb_event.json @@ -1,7 +1,51 @@ + { - "batchItemFailures": [ + "Records": [ { - "itemIdentifier": "8640712661" + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "failure" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "3275880929", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "SomethingElse": { + "S": "success" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "8640712661", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" } ] } diff --git a/examples/batch_processing/src/getting_started_dynamodb_response.json b/examples/batch_processing/src/getting_started_dynamodb_response.json index f74c2429a5a..9ccbde9ba9f 100644 --- a/examples/batch_processing/src/getting_started_dynamodb_response.json +++ b/examples/batch_processing/src/getting_started_dynamodb_response.json @@ -1,50 +1,7 @@ { - "Records": [ + "batchItemFailures": [ { - "eventID": "1", - "eventVersion": "1.0", - "dynamodb": { - "Keys": { - "Id": { - "N": "101" - } - }, - "NewImage": { - "Message": { - "S": "failure" - } - }, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "3275880929", - "SizeBytes": 26 - }, - "awsRegion": "us-west-2", - "eventName": "INSERT", - "eventSourceARN": "eventsource_arn", - "eventSource": "aws:dynamodb" - }, - { - "eventID": "1", - "eventVersion": "1.0", - "dynamodb": { - "Keys": { - "Id": { - "N": "101" - } - }, - "NewImage": { - "SomethingElse": { - "S": "success" - } - }, - "StreamViewType": "NEW_AND_OLD_IMAGES", - "SequenceNumber": "8640712661", - "SizeBytes": 26 - }, - "awsRegion": "us-west-2", - "eventName": "INSERT", - "eventSourceARN": "eventsource_arn", - "eventSource": "aws:dynamodb" + "itemIdentifier": "8640712661" } ] } diff --git a/examples/batch_processing/src/getting_started_event.json b/examples/batch_processing/src/getting_started_sqs_event.json similarity index 100% rename from examples/batch_processing/src/getting_started_event.json rename to examples/batch_processing/src/getting_started_sqs_event.json diff --git a/examples/batch_processing/src/getting_started_response.json b/examples/batch_processing/src/getting_started_sqs_response.json similarity index 100% rename from examples/batch_processing/src/getting_started_response.json rename to examples/batch_processing/src/getting_started_sqs_response.json diff --git a/examples/batch_processing/testing/test_app.py b/examples/batch_processing/src/getting_started_with_test.py similarity index 75% rename from examples/batch_processing/testing/test_app.py rename to examples/batch_processing/src/getting_started_with_test.py index 2caf460b04e..49e78269248 100644 --- a/examples/batch_processing/testing/test_app.py +++ b/examples/batch_processing/src/getting_started_with_test.py @@ -3,8 +3,7 @@ from pathlib import Path import pytest - -from examples.batch_processing.testing.src import app +from getting_started_with_test_app import lambda_handler, processor def load_event(path: Path): @@ -32,15 +31,15 @@ def sqs_event(): def test_app_batch_partial_response(sqs_event, lambda_context): # GIVEN - processor = app.processor # access processor for additional assertions + processor_result = processor # access processor for additional assertions successful_record = sqs_event["Records"][0] failed_record = sqs_event["Records"][1] expected_response = {"batchItemFailures": [{"itemIdentifier": failed_record["messageId"]}]} # WHEN - ret = app.lambda_handler(sqs_event, lambda_context) + ret = lambda_handler(sqs_event, lambda_context) # THEN assert ret == expected_response - assert len(processor.fail_messages) == 1 - assert processor.success_messages[0] == successful_record + assert len(processor_result.fail_messages) == 1 + assert processor_result.success_messages[0] == successful_record diff --git a/examples/batch_processing/testing/src/app.py b/examples/batch_processing/src/getting_started_with_test_app.py similarity index 100% rename from examples/batch_processing/testing/src/app.py rename to examples/batch_processing/src/getting_started_with_test_app.py diff --git a/examples/batch_processing/src/pydantic_dynamodb.py b/examples/batch_processing/src/pydantic_dynamodb.py index ae0c22144f0..9843d8b2f2e 100644 --- a/examples/batch_processing/src/pydantic_dynamodb.py +++ b/examples/batch_processing/src/pydantic_dynamodb.py @@ -25,7 +25,7 @@ class OrderDynamoDB(BaseModel): # auto transform json string # so Pydantic can auto-initialize nested Order model @validator("Message", pre=True) - def transform_message_to_dict(self, value: Dict[Literal["S"], str]): + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): return json.loads(value["S"]) @@ -46,6 +46,7 @@ class OrderDynamoDBRecord(DynamoDBStreamRecordModel): @tracer.capture_method def record_handler(record: OrderDynamoDBRecord): if record.dynamodb.NewImage and record.dynamodb.NewImage.Message: + logger.info(record.dynamodb.NewImage.Message.item) return record.dynamodb.NewImage.Message.item diff --git a/examples/batch_processing/src/pydantic_dynamodb_event.json b/examples/batch_processing/src/pydantic_dynamodb_event.json new file mode 100644 index 00000000000..40a8977e7bd --- /dev/null +++ b/examples/batch_processing/src/pydantic_dynamodb_event.json @@ -0,0 +1,50 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "{\"item\": {\"laptop\": \"amd\"}}" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "3275880929", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "SomethingElse": { + "S": "success" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "8640712661", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + } + ] + } diff --git a/examples/batch_processing/src/pydantic_kinesis.py b/examples/batch_processing/src/pydantic_kinesis.py index 2907a978e57..012f67a9b35 100644 --- a/examples/batch_processing/src/pydantic_kinesis.py +++ b/examples/batch_processing/src/pydantic_kinesis.py @@ -32,6 +32,7 @@ class OrderKinesisRecord(KinesisDataStreamRecord): @tracer.capture_method def record_handler(record: OrderKinesisRecord): + logger.info(record.kinesis.data.item) return record.kinesis.data.item diff --git a/examples/batch_processing/src/pydantic_kinesis_event.json b/examples/batch_processing/src/pydantic_kinesis_event.json new file mode 100644 index 00000000000..0679115dd65 --- /dev/null +++ b/examples/batch_processing/src/pydantic_kinesis_event.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "4107859083838847772757075850904226111829882106684065", + "data": "eyJpdGVtIjogeyJsYXB0b3AiOiAiYW1kIn19Cg==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "6006958808509702859251049540584488075644979031228738", + "data": "eyJpdGVtIjogeyJrZXlib2FyZCI6ICJjbGFzc2ljIn19Cg==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] + } diff --git a/examples/batch_processing/src/pydantic_sqs.py b/examples/batch_processing/src/pydantic_sqs.py index 31d33a1e38d..0e82a304e4e 100644 --- a/examples/batch_processing/src/pydantic_sqs.py +++ b/examples/batch_processing/src/pydantic_sqs.py @@ -25,6 +25,7 @@ class OrderSqsRecord(SqsRecordModel): @tracer.capture_method def record_handler(record: OrderSqsRecord): + logger.info(record.body.item) return record.body.item diff --git a/examples/batch_processing/src/pydantic_sqs_event.json b/examples/batch_processing/src/pydantic_sqs_event.json new file mode 100644 index 00000000000..c3f26d074b1 --- /dev/null +++ b/examples/batch_processing/src/pydantic_sqs_event.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"item\": {\"laptop\": \"amd\"}}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"item\": {\"keyboard\": \"classic\"}}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + } + ] + } diff --git a/examples/batch_processing/testing/events/sqs_event.json b/examples/batch_processing/testing/events/sqs_event.json deleted file mode 100644 index cdc63cf1fbe..00000000000 --- a/examples/batch_processing/testing/events/sqs_event.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "Records": [ - { - "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "{\"Message\": \"success\"}", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", - "awsRegion": "us-east-1" - }, - { - "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", - "awsRegion": "us-east-1" - } - ] -} diff --git a/examples/batch_processing/testing/src/__init__.py b/examples/batch_processing/testing/src/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/poetry.lock b/poetry.lock index 0be463c3c39..57a34975a3e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry and should not be changed by hand. +# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. [[package]] name = "anyio" @@ -2655,6 +2655,48 @@ files = [ attrs = "*" pbr = "*" +[[package]] +name = "sentry-sdk" +version = "1.22.2" +description = "Python client for Sentry (https://sentry.io)" +category = "dev" +optional = false +python-versions = "*" +files = [ + {file = "sentry-sdk-1.22.2.tar.gz", hash = "sha256:5932c092c6e6035584eb74d77064e4bce3b7935dfc4a331349719a40db265840"}, + {file = "sentry_sdk-1.22.2-py2.py3-none-any.whl", hash = "sha256:cf89a5063ef84278d186aceaed6fb595bfe67d099298e537634a323664265669"}, +] + +[package.dependencies] +certifi = "*" +urllib3 = {version = ">=1.26.11,<2.0.0", markers = "python_version >= \"3.6\""} + +[package.extras] +aiohttp = ["aiohttp (>=3.5)"] +arq = ["arq (>=0.23)"] +beam = ["apache-beam (>=2.12)"] +bottle = ["bottle (>=0.12.13)"] +celery = ["celery (>=3)"] +chalice = ["chalice (>=1.16.0)"] +django = ["django (>=1.8)"] +falcon = ["falcon (>=1.4)"] +fastapi = ["fastapi (>=0.79.0)"] +flask = ["blinker (>=1.1)", "flask (>=0.11)"] +grpcio = ["grpcio (>=1.21.1)"] +httpx = ["httpx (>=0.16.0)"] +huey = ["huey (>=2)"] +opentelemetry = ["opentelemetry-distro (>=0.35b0)"] +pure-eval = ["asttokens", "executing", "pure-eval"] +pymongo = ["pymongo (>=3.1)"] +pyspark = ["pyspark (>=2.4.4)"] +quart = ["blinker (>=1.1)", "quart (>=0.16.1)"] +rq = ["rq (>=0.6)"] +sanic = ["sanic (>=0.8)"] +sqlalchemy = ["sqlalchemy (>=1.2)"] +starlette = ["starlette (>=0.19.1)"] +starlite = ["starlite (>=1.48)"] +tornado = ["tornado (>=5)"] + [[package]] name = "six" version = "1.16.0" @@ -3026,7 +3068,7 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] [extras] -all = ["pydantic", "aws-xray-sdk", "fastjsonschema"] +all = ["aws-xray-sdk", "fastjsonschema", "pydantic"] aws-sdk = ["boto3"] parser = ["pydantic"] tracer = ["aws-xray-sdk"] @@ -3035,4 +3077,4 @@ validation = ["fastjsonschema"] [metadata] lock-version = "2.0" python-versions = "^3.7.4" -content-hash = "0c140ce333e0131b6cf5fee17b8cba631dfd3bbd3ee5f8ab66175bfeed493842" +content-hash = "fe686c11217e31bf5fd24895d0224fc3e44c9da4192c6b64d245af1c8033a9cd" diff --git a/pyproject.toml b/pyproject.toml index f2613d3c36d..006dd298516 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,6 +105,7 @@ cfn-lint = "0.77.5" mypy = "^1.1.1" types-python-dateutil = "^2.8.19.6" httpx = ">=0.23.3,<0.25.0" +sentry-sdk = "^1.22.2" [tool.coverage.run] source = ["aws_lambda_powertools"] From 6ec8f6479bbe856ac9cc0afd39e3326ea12c92a8 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 11 May 2023 00:49:53 +0100 Subject: [PATCH 16/17] docs: mypy --- examples/batch_processing/src/context_manager_access.py | 4 +++- examples/batch_processing/src/pydantic_dynamodb.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/batch_processing/src/context_manager_access.py b/examples/batch_processing/src/context_manager_access.py index aa3abf8e530..9882092bd83 100644 --- a/examples/batch_processing/src/context_manager_access.py +++ b/examples/batch_processing/src/context_manager_access.py @@ -1,7 +1,9 @@ from __future__ import annotations import json -from typing import List, Literal, Tuple +from typing import List, Tuple + +from typing_extensions import Literal from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType diff --git a/examples/batch_processing/src/pydantic_dynamodb.py b/examples/batch_processing/src/pydantic_dynamodb.py index 9843d8b2f2e..dbd5cff24c4 100644 --- a/examples/batch_processing/src/pydantic_dynamodb.py +++ b/examples/batch_processing/src/pydantic_dynamodb.py @@ -1,5 +1,7 @@ import json -from typing import Dict, Literal, Optional +from typing import Dict, Optional + +from typing_extensions import Literal from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import ( From cbb6064e0dc3b3473f932a7bf8ab945010c2da3c Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Thu, 11 May 2023 12:41:55 +0200 Subject: [PATCH 17/17] fix: highlight --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 79156d56f1b..993f343becd 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -274,7 +274,7 @@ Inheritance is importance because we need to access message IDs and sequence num === "DynamoDB Streams" - ```python hl_lines="12 13 22 32 37 41 47 56" + ```python hl_lines="14-15 24 34 39 43 49 58" --8<-- "examples/batch_processing/src/pydantic_dynamodb.py" ```