From e1f877d244ef5f163d47414ec1c951a96e469d92 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 25 Jul 2023 14:39:50 +0200 Subject: [PATCH 01/11] fix(parameters): make cache aware of single vs multiple calls Signed-off-by: heitorlessa --- aws_lambda_powertools/utilities/parameters/base.py | 2 +- aws_lambda_powertools/utilities/parameters/types.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/parameters/base.py b/aws_lambda_powertools/utilities/parameters/base.py index e4be9d33cdc..78bf865faf0 100644 --- a/aws_lambda_powertools/utilities/parameters/base.py +++ b/aws_lambda_powertools/utilities/parameters/base.py @@ -27,7 +27,7 @@ from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.shared.functions import resolve_max_age -from aws_lambda_powertools.utilities.parameters.types import TransformOptions +from aws_lambda_powertools.utilities.parameters.types import RecursiveOptions, TransformOptions from .exceptions import GetParameterError, TransformParameterError diff --git a/aws_lambda_powertools/utilities/parameters/types.py b/aws_lambda_powertools/utilities/parameters/types.py index 6a15873c496..2dbf1593d72 100644 --- a/aws_lambda_powertools/utilities/parameters/types.py +++ b/aws_lambda_powertools/utilities/parameters/types.py @@ -1,3 +1,4 @@ from typing_extensions import Literal TransformOptions = Literal["json", "binary", "auto", None] +RecursiveOptions = Literal[True, False] From 5abdcc6243f2f1ed67d70376ee211c59097e8fb2 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 25 Jul 2023 15:16:51 +0200 Subject: [PATCH 02/11] chore: cleanup, add test for single and nested Signed-off-by: heitorlessa --- aws_lambda_powertools/utilities/parameters/base.py | 2 +- aws_lambda_powertools/utilities/parameters/types.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/parameters/base.py b/aws_lambda_powertools/utilities/parameters/base.py index 78bf865faf0..e4be9d33cdc 100644 --- a/aws_lambda_powertools/utilities/parameters/base.py +++ b/aws_lambda_powertools/utilities/parameters/base.py @@ -27,7 +27,7 @@ from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.shared.functions import resolve_max_age -from aws_lambda_powertools.utilities.parameters.types import RecursiveOptions, TransformOptions +from aws_lambda_powertools.utilities.parameters.types import TransformOptions from .exceptions import GetParameterError, TransformParameterError diff --git a/aws_lambda_powertools/utilities/parameters/types.py b/aws_lambda_powertools/utilities/parameters/types.py index 2dbf1593d72..6a15873c496 100644 --- a/aws_lambda_powertools/utilities/parameters/types.py +++ b/aws_lambda_powertools/utilities/parameters/types.py @@ -1,4 +1,3 @@ from typing_extensions import Literal TransformOptions = Literal["json", "binary", "auto", None] -RecursiveOptions = Literal[True, False] From 5931949b4077fbb4a09e345be9b4b559596892fb Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 26 Jul 2023 17:23:19 +0200 Subject: [PATCH 03/11] docs(batch): simplify background with diagram and wording Signed-off-by: heitorlessa --- docs/utilities/batch.md | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 488be06e60b..b94921dea48 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -16,12 +16,21 @@ The batch processing utility handles partial failures when processing batches fr When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages. -If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire. +If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** records expire. + +```mermaid +journey + section Conditions + Successful response: 5: Success + Maximum retries: 3: Failure + Records expired: 1: Failure +``` -With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place: +This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration: -1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties -2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} is returned so Lambda knows which records should not be deleted during partial responses + +* [**SQS queues**](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. +* [**Kinesis data streams**](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"} and [**DynamoDB streams**](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting){target="_blank"}. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint. @@ -32,14 +41,16 @@ With this utility, batch records are processed individually – only messages th ## Getting started -Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use `ReportBatchItemFailures`. +For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed. -You do not need any additional IAM permissions to use this utility, except for what each event source requires. +You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned. ### Required resources The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. +!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." + === "SQS" ```yaml title="template.yaml" hl_lines="30-31" From 97123111faf4acce77a5d57b2e4a641b585ed731 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 26 Jul 2023 17:35:30 +0200 Subject: [PATCH 04/11] docs: recommend sqs new record.json_body Signed-off-by: heitorlessa --- docs/utilities/batch.md | 10 +++++----- examples/batch_processing/src/getting_started_sqs.py | 8 ++------ .../batch_processing/src/getting_started_sqs_fifo.py | 8 ++------ 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index b94921dea48..6941c53e397 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -78,11 +78,11 @@ Processing batches from SQS works in three stages: 3. Use **`process_partial_response`** to kick off processing ???+ info - This code example optionally uses Tracer and Logger for completion. + This code example uses Tracer and Logger for completion. === "Recommended" - ```python hl_lines="4-9 12 18 28" + ```python hl_lines="2-7 10 16 24" --8<-- "examples/batch_processing/src/getting_started_sqs.py" ``` @@ -119,7 +119,7 @@ This helps preserve the ordering of messages in your queue. === "Recommended" - ```python hl_lines="5-6 11 27" + ```python hl_lines="2-6 9 23" --8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py" ``` @@ -144,7 +144,7 @@ Processing batches from Kinesis works in three stages: 3. Use **`process_partial_response`** to kick off processing ???+ info - This code example optionally uses Tracer and Logger for completion. + This code example uses Tracer and Logger for completion. === "Recommended" @@ -187,7 +187,7 @@ Processing batches from DynamoDB Streams works in three stages: 3. Use **`process_partial_response`** to kick off processing ???+ info - This code example optionally uses Tracer and Logger for completion. + This code example uses Tracer and Logger for completion. === "Recommended" diff --git a/examples/batch_processing/src/getting_started_sqs.py b/examples/batch_processing/src/getting_started_sqs.py index 8b6fe4c4266..64ac0b11922 100644 --- a/examples/batch_processing/src/getting_started_sqs.py +++ b/examples/batch_processing/src/getting_started_sqs.py @@ -1,5 +1,3 @@ -import json - from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import ( BatchProcessor, @@ -16,10 +14,8 @@ @tracer.capture_method def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - logger.info(item) + payload: str = record.json_body # if json string data, otherwise record.body for str + logger.info(payload) @logger.inject_lambda_context diff --git a/examples/batch_processing/src/getting_started_sqs_fifo.py b/examples/batch_processing/src/getting_started_sqs_fifo.py index d30fb319c85..a5b0ee32087 100644 --- a/examples/batch_processing/src/getting_started_sqs_fifo.py +++ b/examples/batch_processing/src/getting_started_sqs_fifo.py @@ -1,5 +1,3 @@ -import json - from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import ( SqsFifoPartialProcessor, @@ -15,10 +13,8 @@ @tracer.capture_method def record_handler(record: SQSRecord): - payload: str = record.body - if payload: - item: dict = json.loads(payload) - logger.info(item) + payload: str = record.json_body # if json string data, otherwise record.body for str + logger.info(payload) @logger.inject_lambda_context From cd1ccd18e945103fd5c00c404749b3dfe4c4663a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 26 Jul 2023 17:49:48 +0200 Subject: [PATCH 05/11] docs: add code annotation for more details Signed-off-by: heitorlessa --- docs/utilities/batch.md | 10 ++++++++++ .../batch_processing/src/getting_started_dynamodb.py | 2 +- .../batch_processing/src/getting_started_kinesis.py | 2 +- examples/batch_processing/src/getting_started_sqs.py | 11 ++++++++--- .../batch_processing/src/getting_started_sqs_fifo.py | 2 +- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 6941c53e397..367c49426f2 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -86,6 +86,10 @@ Processing batches from SQS works in three stages: --8<-- "examples/batch_processing/src/getting_started_sqs.py" ``` + 1. **Step 1**. Creates a partial failure batch processor for SQS queues. See [partial failure mechanics for details](#partial-failure-mechanics) + 2. **Step 2**. Defines a function to receive one record at a time from the batch + 3. **Step 3**. Kicks off processing + === "As a context manager" ```python hl_lines="4-5 8 14 25-26 29" @@ -123,6 +127,8 @@ This helps preserve the ordering of messages in your queue. --8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py" ``` + 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) + === "As a context manager" ```python hl_lines="4 8" @@ -152,6 +158,8 @@ Processing batches from Kinesis works in three stages: --8<-- "examples/batch_processing/src/getting_started_kinesis.py" ``` + 1. **Step 1**. Creates a partial failure batch processor for Kinesis Data Streams. See [partial failure mechanics for details](#partial-failure-mechanics) + === "As a context manager" ```python hl_lines="3-5 8 14 23-25 28" @@ -195,6 +203,8 @@ Processing batches from DynamoDB Streams works in three stages: --8<-- "examples/batch_processing/src/getting_started_dynamodb.py" ``` + 1. **Step 1**. Creates a partial failure batch processor for DynamoDB Streams. See [partial failure mechanics for details](#partial-failure-mechanics) + === "As a context manager" ```python hl_lines="5-7 10 16 28-30 33" diff --git a/examples/batch_processing/src/getting_started_dynamodb.py b/examples/batch_processing/src/getting_started_dynamodb.py index 61990e2bd26..f56f0324bad 100644 --- a/examples/batch_processing/src/getting_started_dynamodb.py +++ b/examples/batch_processing/src/getting_started_dynamodb.py @@ -11,7 +11,7 @@ ) from aws_lambda_powertools.utilities.typing import LambdaContext -processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) # (1)! tracer = Tracer() logger = Logger() diff --git a/examples/batch_processing/src/getting_started_kinesis.py b/examples/batch_processing/src/getting_started_kinesis.py index 179154e3b1f..a3410fa57a2 100644 --- a/examples/batch_processing/src/getting_started_kinesis.py +++ b/examples/batch_processing/src/getting_started_kinesis.py @@ -9,7 +9,7 @@ ) from aws_lambda_powertools.utilities.typing import LambdaContext -processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) # (1)! tracer = Tracer() logger = Logger() diff --git a/examples/batch_processing/src/getting_started_sqs.py b/examples/batch_processing/src/getting_started_sqs.py index 64ac0b11922..c8c2facb09a 100644 --- a/examples/batch_processing/src/getting_started_sqs.py +++ b/examples/batch_processing/src/getting_started_sqs.py @@ -7,13 +7,13 @@ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext -processor = BatchProcessor(event_type=EventType.SQS) +processor = BatchProcessor(event_type=EventType.SQS) # (1)! tracer = Tracer() logger = Logger() @tracer.capture_method -def record_handler(record: SQSRecord): +def record_handler(record: SQSRecord): # (2)! payload: str = record.json_body # if json string data, otherwise record.body for str logger.info(payload) @@ -21,4 +21,9 @@ def record_handler(record: SQSRecord): @logger.inject_lambda_context @tracer.capture_lambda_handler def lambda_handler(event, context: LambdaContext): - return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) + return process_partial_response( # (3)! + event=event, + record_handler=record_handler, + processor=processor, + context=context, + ) diff --git a/examples/batch_processing/src/getting_started_sqs_fifo.py b/examples/batch_processing/src/getting_started_sqs_fifo.py index a5b0ee32087..95d7463eb18 100644 --- a/examples/batch_processing/src/getting_started_sqs_fifo.py +++ b/examples/batch_processing/src/getting_started_sqs_fifo.py @@ -6,7 +6,7 @@ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext -processor = SqsFifoPartialProcessor() +processor = SqsFifoPartialProcessor() # (1)! tracer = Tracer() logger = Logger() From adf15fd641ecde54312690fde42213d2bdccff1e Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 26 Jul 2023 18:16:53 +0200 Subject: [PATCH 06/11] docs: add initial error handling section Signed-off-by: heitorlessa --- docs/utilities/batch.md | 20 +++++++++++ .../src/getting_started_error_handling.py | 35 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 examples/batch_processing/src/getting_started_error_handling.py diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 367c49426f2..dccae1825b9 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -231,6 +231,26 @@ Processing batches from DynamoDB Streams works in three stages: --8<-- "examples/batch_processing/src/getting_started_dynamodb_event.json" ``` +### Error handling + +By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution. + +=== "Sample error handling with custom exception" + + ```python title="" hl_lines="24" + --8<-- "examples/batch_processing/src/getting_started_error_handling.py" + ``` + + 1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor) + + 2. Exceptions raised in `record_handler` will propagate to `process_partial_response`.

We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab). + +=== "Sample response" + + ```json + --8<-- "examples/batch_processing/src/getting_started_sqs_response.json" + ``` + ### Partial failure mechanics All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: diff --git a/examples/batch_processing/src/getting_started_error_handling.py b/examples/batch_processing/src/getting_started_error_handling.py new file mode 100644 index 00000000000..7307f0d0d09 --- /dev/null +++ b/examples/batch_processing/src/getting_started_error_handling.py @@ -0,0 +1,35 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +class InvalidPayload(Exception): + ... + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + logger.info(payload) + if not payload: + raise InvalidPayload("Payload does not contain minimum information to be processed.") # (1)! + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response( # (2)! + event=event, + record_handler=record_handler, + processor=processor, + context=context, + ) From 2bfdc9f15f785951fa3d782741b2230b9d29a016 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 26 Jul 2023 18:53:05 +0200 Subject: [PATCH 07/11] docs: add visual for partial failure mechanics Signed-off-by: heitorlessa --- docs/utilities/batch.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index dccae1825b9..fb091b27968 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -253,12 +253,29 @@ By default, we catch any exception raised by your record handler function. This ### Partial failure mechanics -All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: +All batch items will be passed to the record handler for processing, even if exceptions are thrown - Here's the behavior after completing the batch: * **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing * **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing + +
+```mermaid +graph LR + Batch[Batch processed] --> Success{Batch succeeded?} + Batch[Batch processed] --> Partial{Partial failure?} + Batch[Batch processed] --> Failure{Batch failed?} + + Success --> |Business as usual| SimpleResponse["Return an empty list of item failures"] + Partial --> |Collect message ID or sequence numbers| PartialResponse["Return a list of batch items that failed processing"] + Failure --> |Aggregate all exceptions raised| FailureResponse["Raise BatchProcessingError exception"] + +``` +Visual representation for partial failure mechanics +
+ + ### Processing messages asynchronously !!! tip "New to AsyncIO? Read this [comprehensive guide first](https://realpython.com/async-io-python/){target="_blank" rel="nofollow"}." From 41560233137b723e7dd11266f481c8aa85559b76 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Wed, 26 Jul 2023 20:08:55 +0200 Subject: [PATCH 08/11] docs: add fifo and stream sequence numbers Signed-off-by: heitorlessa --- docs/utilities/batch.md | 108 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 99 insertions(+), 9 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index fb091b27968..287f68186bb 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -259,22 +259,112 @@ All batch items will be passed to the record handler for processing, even if exc * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing * **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing - +The following sequence diagrams explain how each Batch processor behaves under different scenarios. + +#### SQS Standard + +Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues. +
```mermaid -graph LR - Batch[Batch processed] --> Success{Batch succeeded?} - Batch[Batch processed] --> Partial{Partial failure?} - Batch[Batch processed] --> Failure{Batch failed?} +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + Lambda function->>Lambda service: Report some failed messages + activate SQS queue + Lambda service->>SQS queue: Delete successful messages + SQS queue-->>SQS queue: Failed messages return + Note over SQS queue,Lambda service: Process repeat + deactivate SQS queue +``` +SQS mechanism with Batch Item Failures +
+ +#### SQS FIFO + +Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues. + +
+```mermaid +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure + deactivate Lambda function + activate SQS queue + Lambda service->>SQS queue: Delete successful messages (1-2) + SQS queue-->>SQS queue: Failed messages return (3-10) + deactivate SQS queue +``` +SQS FIFO mechanism with Batch Item Failures +
+ +#### Kinesis and DynamoDB Streams + +Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb). - Success --> |Business as usual| SimpleResponse["Return an empty list of item failures"] - Partial --> |Collect message ID or sequence numbers| PartialResponse["Return a list of batch items that failed processing"] - Failure --> |Aggregate all exceptions raised| FailureResponse["Raise BatchProcessingError exception"] +!!! note "For brevity, we will use "Streams" to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}" +
+```mermaid +sequenceDiagram + autonumber + participant Streams + participant Lambda service + participant Lambda function + Lambda service->>Streams: Poll latest records + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function-->Lambda function: Continue processing batch items (4-10) + Lambda function->>Lambda service: Report batch item as failure (3) + deactivate Lambda function + activate Streams + Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item + Lambda service->>Streams: Poll records starting from updated checkpoint + deactivate Streams ``` -Visual representation for partial failure mechanics +Kinesis and DynamoDB streams mechanism with single batch item failure
+The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported. + +!!! important "Note that the batch item sequence number could be different from batch item number in the illustration." + +
+```mermaid +sequenceDiagram + autonumber + participant Streams + participant Lambda service + participant Lambda function + Lambda service->>Streams: Poll latest records + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3-5 batch items + Lambda function-->Lambda function: Continue processing batch items (6-10) + Lambda function->>Lambda service: Report batch items as failure (3-5) + deactivate Lambda function + activate Streams + Lambda service->>Streams: Checkpoints to lowest sequence number + Lambda service->>Streams: Poll records starting from updated checkpoint + deactivate Streams +``` +Kinesis and DynamoDB streams mechanism with multiple batch item failures +
### Processing messages asynchronously From f1dfad6b7e9dce1c7fdbb21809d4b6c5ae875a75 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 27 Jul 2023 10:22:49 +0200 Subject: [PATCH 09/11] docs: revamp ctx manager processing Signed-off-by: heitorlessa --- .gitleaksignore | 3 ++ docs/utilities/batch.md | 53 +++++++++++++------ .../src/context_manager_access.py | 5 +- .../src/context_manager_access_output.txt | 12 +++++ ...context_manager_access_output_pydantic.txt | 17 ++++++ 5 files changed, 71 insertions(+), 19 deletions(-) create mode 100644 .gitleaksignore create mode 100644 examples/batch_processing/src/context_manager_access_output.txt create mode 100644 examples/batch_processing/src/context_manager_access_output_pydantic.txt diff --git a/.gitleaksignore b/.gitleaksignore new file mode 100644 index 00000000000..d501e5cc212 --- /dev/null +++ b/.gitleaksignore @@ -0,0 +1,3 @@ +examples/batch_processing/src/context_manager_access_output_pydantic.txt:aws-access-token:10 +examples/batch_processing/src/context_manager_access_output_pydantic.txt:aws-access-token:15 +examples/batch_processing/src/context_manager_access_output.txt:aws-access-token:10 diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 287f68186bb..28f078f79b9 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -77,8 +77,7 @@ Processing batches from SQS works in three stages: 2. Define your function to handle each batch record, and use [`SQSRecord`](data_classes.md#sqs){target="_blank" rel="nofollow"} type annotation for autocompletion 3. Use **`process_partial_response`** to kick off processing -???+ info - This code example uses Tracer and Logger for completion. +!!! info "This code example uses Tracer and Logger for completion." === "Recommended" @@ -149,8 +148,7 @@ Processing batches from Kinesis works in three stages: 2. Define your function to handle each batch record, and use [`KinesisStreamRecord`](data_classes.md#kinesis-streams){target="_blank" rel="nofollow"} type annotation for autocompletion 3. Use **`process_partial_response`** to kick off processing -???+ info - This code example uses Tracer and Logger for completion. +!!! info "This code example uses Tracer and Logger for completion." === "Recommended" @@ -194,8 +192,7 @@ Processing batches from DynamoDB Streams works in three stages: 2. Define your function to handle each batch record, and use [`DynamoDBRecord`](data_classes.md#dynamodb-streams){target="_blank" rel="nofollow"} type annotation for autocompletion 3. Use **`process_partial_response`** to kick off processing -???+ info - This code example uses Tracer and Logger for completion. +!!! info "This code example uses Tracer and Logger for completion." === "Recommended" @@ -314,7 +311,7 @@ sequenceDiagram Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb). -!!! note "For brevity, we will use "Streams" to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}" +For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}
```mermaid @@ -368,14 +365,14 @@ sequenceDiagram ### Processing messages asynchronously -!!! tip "New to AsyncIO? Read this [comprehensive guide first](https://realpython.com/async-io-python/){target="_blank" rel="nofollow"}." +> New to AsyncIO? Read this [comprehensive guide first](https://realpython.com/async-io-python/){target="_blank" rel="nofollow"}. You can use `AsyncBatchProcessor` class and `async_process_partial_response` function to process messages concurrently. ???+ question "When is this useful?" Your use case might be able to process multiple records at the same time without conflicting with one another. - For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently. + For example, imagine you need to process multiple loyalty points and incrementally save them in the database. While you await the database to confirm your records are saved, you could start processing another request concurrently. The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). @@ -384,9 +381,7 @@ You can use `AsyncBatchProcessor` class and `async_process_partial_response` fun ``` ???+ warning "Using tracer?" - `AsyncBatchProcessor` uses `asyncio.gather` which can cause side effects and reach trace limits at high concurrency. - - See [Tracing concurrent asynchronous functions](../core/tracer.md#concurrent-asynchronous-functions){target="_blank" rel="nofollow"}. + `AsyncBatchProcessor` uses `asyncio.gather`. This might cause [side effects and reach trace limits at high concurrency](../core/tracer.md#concurrent-asynchronous-functions){target="_blank"}. ## Advanced @@ -436,12 +431,36 @@ Inheritance is importance because we need to access message IDs and sequence num Use the context manager to access a list of all returned values from your `record_handler` function. -* **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record -* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record +* **When successful**. We include a tuple with **1/** `success`, **2/** the result of `record_handler`, and **3/** the batch item +* **When failed**. We include a tuple with **1/** `fail`, **2/** exception as a string, and **3/** the batch item serialized as Event Source Data Class or Pydantic model. -```python hl_lines="28-33" title="Accessing processed messages via context manager" ---8<-- "examples/batch_processing/src/context_manager_access.py" -``` +!!! note "If a Pydantic model fails validation early, we serialize its failure record as Event Source Data Class to be able to collect message ID/sequence numbers etc." + +=== "Accessing raw processed messages" + + ```python hl_lines="29-36" + --8<-- "examples/batch_processing/src/context_manager_access.py" + ``` + + 1. Context manager requires the records list. This is typically handled by `process_partial_response`. + 2. Cause contains `exception` str if failed, or `success` otherwise. + +=== "Sample processed messages" + + ```python + --8<-- "examples/batch_processing/src/context_manager_access_output.txt" + ``` + + 1. Sample exception could have raised from within `record_handler` function. + +=== "Sample processed messages (Pydantic)" + + ```python + --8<-- "examples/batch_processing/src/context_manager_access_output_pydantic.txt" + ``` + + 1. Sample when a model fails validation early.

Batch item (3rd item) is serialized to the respective Event Source Data Class event type. + 2. Sample when model validated successfully but another exception was raised during processing. ### Accessing Lambda Context diff --git a/examples/batch_processing/src/context_manager_access.py b/examples/batch_processing/src/context_manager_access.py index 9882092bd83..05cf612a6ae 100644 --- a/examples/batch_processing/src/context_manager_access.py +++ b/examples/batch_processing/src/context_manager_access.py @@ -26,14 +26,15 @@ def record_handler(record: SQSRecord): @logger.inject_lambda_context @tracer.capture_lambda_handler def lambda_handler(event, context: LambdaContext): - batch = event["Records"] + batch = event["Records"] # (1)! with processor(records=batch, handler=record_handler): processed_messages: List[Tuple] = processor.process() for message in processed_messages: status: Literal["success"] | Literal["fail"] = message[0] + cause: str = message[1] # (2)! record: SQSRecord = message[2] - logger.info(status, record=record) + logger.info(status, record=record, cause=cause) return processor.response() diff --git a/examples/batch_processing/src/context_manager_access_output.txt b/examples/batch_processing/src/context_manager_access_output.txt new file mode 100644 index 00000000000..cf3d6267f4d --- /dev/null +++ b/examples/batch_processing/src/context_manager_access_output.txt @@ -0,0 +1,12 @@ +[ + ( + "fail", + " + ), + ( + "success", + "success", + {'messageId': '88891c36-32eb-4a25-9905-654a32916893', 'receiptHandle': 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', 'body': 'success', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1545082649183', 'SenderId': 'AIDAIENQZJOLO23YVJ4VO', 'ApproximateFirstReceiveTimestamp': '1545082649185'}, 'messageAttributes': {}, 'md5OfBody': 'e4e68fb7bd0e697a0ae8f1bb342846b3', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-east-2:123456789012:my-queue', 'awsRegion': 'us-east-1'} + ) +] diff --git a/examples/batch_processing/src/context_manager_access_output_pydantic.txt b/examples/batch_processing/src/context_manager_access_output_pydantic.txt new file mode 100644 index 00000000000..748a6e61aa0 --- /dev/null +++ b/examples/batch_processing/src/context_manager_access_output_pydantic.txt @@ -0,0 +1,17 @@ +[ + ( + "fail", # (1)! + ":1 validation error for OrderSqs\nbody\n JSON object must be str, bytes or bytearray (type=type_error.json)", + + ), + ( + "success", + "success", + {'messageId': '88891c36-32eb-4a25-9905-654a32916893', 'receiptHandle': 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', 'body': 'success', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1545082649183', 'SenderId': 'AIDAIENQZJOLO23YVJ4VO', 'ApproximateFirstReceiveTimestamp': '1545082649185'}, 'messageAttributes': {}, 'md5OfBody': 'e4e68fb7bd0e697a0ae8f1bb342846b3', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-east-2:123456789012:my-queue', 'awsRegion': 'us-east-1'} + ), + ( + "fail", # (2)! + ":Failed to process record.", + OrderSqs(messageId='9d0bfba5-d213-4b64-89bd-f4fbd7e58358', receiptHandle='AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', body=Order(item={'type': 'fail'}), attributes=SqsAttributesModel(ApproximateReceiveCount='1', ApproximateFirstReceiveTimestamp=datetime.datetime(2018, 12, 17, 21, 37, 29, 185000, tzinfo=datetime.timezone.utc), MessageDeduplicationId=None, MessageGroupId=None, SenderId='AIDAIENQZJOLO23YVJ4VO', SentTimestamp=datetime.datetime(2018, 12, 17, 21, 37, 29, 183000, tzinfo=datetime.timezone.utc), SequenceNumber=None, AWSTraceHeader=None), messageAttributes={}, md5OfBody='e4e68fb7bd0e697a0ae8f1bb342846b3', md5OfMessageAttributes=None, eventSource='aws:sqs', eventSourceARN='arn:aws:sqs:us-east-2:123456789012:my-queue', awsRegion='us-east-1') + ) +] From 26898bbdb9ed90c38fb1baa8049923ca3493fcd3 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 27 Jul 2023 10:26:58 +0200 Subject: [PATCH 10/11] docs: highlight use of model in BatchProcessor Signed-off-by: heitorlessa --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 28f078f79b9..fd9d0cccbd3 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -393,7 +393,7 @@ Inheritance is importance because we need to access message IDs and sequence num === "SQS" - ```python hl_lines="8 17 27 35" + ```python hl_lines="8 17 21 27 35" --8<-- "examples/batch_processing/src/pydantic_sqs.py" ``` From 2ba7d2a6289c618cf542044f53217b4115e90366 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 27 Jul 2023 11:30:14 +0200 Subject: [PATCH 11/11] docs: diagram for BYOP --- docs/utilities/batch.md | 59 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index fd9d0cccbd3..57e0a6020b1 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -5,6 +5,30 @@ description: Utility The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. +```mermaid +stateDiagram-v2 + direction LR + BatchSource: Amazon SQS

Amazon Kinesis Data Streams

Amazon DynamoDB Streams

+ LambdaInit: Lambda invocation + BatchProcessor: Batch Processor + RecordHandler: Record Handler function + YourLogic: Your logic to process each batch item + LambdaResponse: Lambda response + + BatchSource --> LambdaInit + + LambdaInit --> BatchProcessor + BatchProcessor --> RecordHandler + + state BatchProcessor { + [*] --> RecordHandler: Your function + RecordHandler --> YourLogic + } + + RecordHandler --> BatchProcessor: Collect results + BatchProcessor --> LambdaResponse: Report items that failed processing +``` + ## Key features * Reports batch item failures to reduce number of retries for a record upon errors @@ -29,8 +53,8 @@ journey This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration: -* [**SQS queues**](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. -* [**Kinesis data streams**](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"} and [**DynamoDB streams**](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting){target="_blank"}. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint. +* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. +* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint. @@ -260,6 +284,8 @@ The following sequence diagrams explain how each Batch processor behaves under d #### SQS Standard +> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. + Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues.
@@ -283,6 +309,8 @@ sequenceDiagram #### SQS FIFO +> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. + Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
@@ -309,6 +337,8 @@ sequenceDiagram #### Kinesis and DynamoDB Streams +> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}. + Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb). For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"} @@ -506,6 +536,31 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()`, `_process_record()` and `_async_process_record()`. + +
+```mermaid +classDiagram + direction LR + class BasePartialProcessor { + <> + +_prepare() + +_clean() + +_process_record_(record: Dict) + +_async_process_record_() + } + + class YourCustomProcessor { + +_prepare() + +_clean() + +_process_record_(record: Dict) + +_async_process_record_() + } + + BasePartialProcessor <|-- YourCustomProcessor : implement +``` +Visual representation to bring your own processor +
+ * **`_process_record()`** – handles all processing logic for each individual message of a batch, including calling the `record_handler` (self.handler) * **`_prepare()`** – called once as part of the processor initialization * **`_clean()`** – teardown logic called once after `_process_record` completes