From 76ce9dbdf5f94e44db563e173713d00dcbe5e92e Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 31 Jul 2023 19:17:31 +0200 Subject: [PATCH] docs(batch): added flow charts --- .../snippets/batch/accessProcessedMessages.ts | 7 +- docs/snippets/batch/customPartialProcessor.ts | 8 +- .../batch/gettingStartedDynamoDBStreams.ts | 2 +- .../batch/gettingStartedErrorHandling.ts | 43 ++++ docs/snippets/batch/gettingStartedKinesis.ts | 2 +- docs/snippets/batch/gettingStartedSQS.ts | 8 +- docs/snippets/batch/gettingStartedSQSFifo.ts | 2 +- docs/utilities/batch.md | 236 +++++++++++++++++- 8 files changed, 281 insertions(+), 27 deletions(-) create mode 100644 docs/snippets/batch/gettingStartedErrorHandling.ts diff --git a/docs/snippets/batch/accessProcessedMessages.ts b/docs/snippets/batch/accessProcessedMessages.ts index 6dbd338c7b..fe5d727ea0 100644 --- a/docs/snippets/batch/accessProcessedMessages.ts +++ b/docs/snippets/batch/accessProcessedMessages.ts @@ -22,16 +22,17 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - const batch = event.Records; + const batch = event.Records; // (1)! - processor.register(batch, recordHandler, { context }); + processor.register(batch, recordHandler, { context }); // (2)! const processedMessages = processor.process(); for (const message of processedMessages) { const status: 'success' | 'fail' = message[0]; + const error = message[1]; const record = message[2]; - logger.info('Processed record', { status, record }); + logger.info('Processed record', { status, record, error }); } return processor.response(); diff --git a/docs/snippets/batch/customPartialProcessor.ts b/docs/snippets/batch/customPartialProcessor.ts index d44b99a602..9a45c92f01 100644 --- a/docs/snippets/batch/customPartialProcessor.ts +++ b/docs/snippets/batch/customPartialProcessor.ts @@ -11,7 +11,7 @@ import { import type { SuccessResponse, FailureResponse, - EventSourceType, + BaseRecord, } from '@aws-lambda-powertools/batch'; import type { SQSEvent, Context, SQSBatchResponse } from 'aws-lambda'; @@ -27,7 +27,7 @@ class MyPartialProcessor extends BasePartialProcessor { } public async asyncProcessRecord( - _record: EventSourceType + _record: BaseRecord ): Promise { throw new Error('Not implemented'); } @@ -69,9 +69,7 @@ class MyPartialProcessor extends BasePartialProcessor { * Here we are keeping the status of each run, `this.handler` is * the function that is passed when calling `processor.register()`. */ - public processRecord( - record: EventSourceType - ): SuccessResponse | FailureResponse { + public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { try { const result = this.handler(record); diff --git a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts index 4d1842bcec..3304d31b95 100644 --- a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts +++ b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts @@ -11,7 +11,7 @@ import type { DynamoDBBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.DynamoDBStreams); +const processor = new BatchProcessor(EventType.DynamoDBStreams); // (1)! const logger = new Logger(); const recordHandler = (record: DynamoDBRecord): void => { diff --git a/docs/snippets/batch/gettingStartedErrorHandling.ts b/docs/snippets/batch/gettingStartedErrorHandling.ts new file mode 100644 index 0000000000..08efa51732 --- /dev/null +++ b/docs/snippets/batch/gettingStartedErrorHandling.ts @@ -0,0 +1,43 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); +const logger = new Logger(); + +class InvalidPayload extends Error { + public constructor(message: string) { + super(message); + this.name = 'InvalidPayload'; + } +} + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } else { + // prettier-ignore + throw new InvalidPayload('Payload does not contain minumum required fields'); // (1)! + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + // prettier-ignore + return processPartialResponse(event, recordHandler, processor, { // (2)! + context, + }); +}; diff --git a/docs/snippets/batch/gettingStartedKinesis.ts b/docs/snippets/batch/gettingStartedKinesis.ts index eb1c8a8810..2def6b2e55 100644 --- a/docs/snippets/batch/gettingStartedKinesis.ts +++ b/docs/snippets/batch/gettingStartedKinesis.ts @@ -11,7 +11,7 @@ import type { KinesisStreamBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.KinesisDataStreams); +const processor = new BatchProcessor(EventType.KinesisDataStreams); // (1)! const logger = new Logger(); const recordHandler = (record: KinesisStreamRecord): void => { diff --git a/docs/snippets/batch/gettingStartedSQS.ts b/docs/snippets/batch/gettingStartedSQS.ts index 3ee6a3fa56..9878b865e9 100644 --- a/docs/snippets/batch/gettingStartedSQS.ts +++ b/docs/snippets/batch/gettingStartedSQS.ts @@ -11,10 +11,11 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.SQS); +const processor = new BatchProcessor(EventType.SQS); // (1)! const logger = new Logger(); -const recordHandler = (record: SQSRecord): void => { +// prettier-ignore +const recordHandler = (record: SQSRecord): void => { // (2)! const payload = record.body; if (payload) { const item = JSON.parse(payload); @@ -26,7 +27,8 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + // prettier-ignore + return processPartialResponse(event, recordHandler, processor, { // (3)! context, }); }; diff --git a/docs/snippets/batch/gettingStartedSQSFifo.ts b/docs/snippets/batch/gettingStartedSQSFifo.ts index 34ff76e705..5c63c1e0d7 100644 --- a/docs/snippets/batch/gettingStartedSQSFifo.ts +++ b/docs/snippets/batch/gettingStartedSQSFifo.ts @@ -10,7 +10,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new SqsFifoPartialProcessor(); +const processor = new SqsFifoPartialProcessor(); // (1)! const logger = new Logger(); const recordHandler = (record: SQSRecord): void => { diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 6c8c96d004..6b02a2f538 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -3,14 +3,35 @@ title: Batch Processing description: Utility --- - - ???+ warning - **This page refers to an unreleased utility that has yet to be published on the npm registry. Any version of the package built from source, as well as all future versions tagged with the `-alpha` suffix should be treated as experimental. Follow the [Beta release](https://github.com/aws-powertools/powertools-lambda-typescript/milestone/13) milestone for updates on the progress of this utility.** + **This utility is currently released as beta developer preview** and is intended strictly for feedback and testing purposes **and not for production workloads**. The version and all future versions tagged with the `-beta` suffix should be treated as not stable. Up until before the [General Availability release](https://github.com/aws-powertools/powertools-lambda-typescript/milestone/14) we might introduce significant breaking changes and improvements in response to customers feedback. The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. +```mermaid +stateDiagram-v2 + direction LR + BatchSource: Amazon SQS

Amazon Kinesis Data Streams

Amazon DynamoDB Streams

+ LambdaInit: Lambda invocation + BatchProcessor: Batch Processor + RecordHandler: Record Handler function + YourLogic: Your logic to process each batch item + LambdaResponse: Lambda response + + BatchSource --> LambdaInit + + LambdaInit --> BatchProcessor + BatchProcessor --> RecordHandler + + state BatchProcessor { + [*] --> RecordHandler: Your function + RecordHandler --> YourLogic + } + + RecordHandler --> BatchProcessor: Collect results + BatchProcessor --> LambdaResponse: Report items that failed processing +``` + ## Key features * Reports batch item failures to reduce number of retries for a record upon errors @@ -23,10 +44,19 @@ When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event sour If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire. -With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place: +```mermaid +journey + section Conditions + Successful response: 5: Success + Maximum retries: 3: Failure + Records expired: 1: Failure +``` + +This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration: -1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties -2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses + +* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. +* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint. @@ -37,14 +67,16 @@ With this utility, batch records are processed individually – only messages th ## Getting started -Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use `ReportBatchItemFailures`. +For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed. -You do not need any additional IAM permissions to use this utility, except for what each event source requires. +You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned. ### Required resources The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. +!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." + === "SQS" ```yaml title="template.yaml" hl_lines="30-31" @@ -77,9 +109,17 @@ Processing batches from SQS works in three stages: === "index.ts" ```typescript hl_lines="1-5 14 17 29-31" - --8<-- "docs/snippets/batch/gettingStartedSQS.ts::32" + --8<-- + docs/snippets/batch/gettingStartedSQS.ts::16 + docs/snippets/batch/gettingStartedSQS.ts:18:29 + docs/snippets/batch/gettingStartedSQS.ts:31:34 + --8<-- ``` + 1. **Step 1**. Creates a partial failure batch processor for SQS queues. See [partial failure mechanics for details](#partial-failure-mechanics) + 2. **Step 2**. Defines a function to receive one record at a time from the batch + 3. **Step 3**. Kicks off processing + === "Sample response" The second record failed to be processed, therefore the processor added its message ID in the response. @@ -103,6 +143,8 @@ This helps preserve the ordering of messages in your queue. --8<-- "docs/snippets/batch/gettingStartedSQSFifo.ts" ``` +1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) + ### Processing messages from Kinesis Processing batches from Kinesis works in three stages: @@ -120,6 +162,8 @@ Processing batches from Kinesis works in three stages: --8<-- "docs/snippets/batch/gettingStartedKinesis.ts" ``` + 1. **Step 1**. Creates a partial failure batch processor for Kinesis Data Streams. See [partial failure mechanics for details](#partial-failure-mechanics) + === "Sample response" The second record failed to be processed, therefore the processor added its sequence number in the response. @@ -151,6 +195,8 @@ Processing batches from DynamoDB Streams works in three stages: --8<-- "docs/snippets/batch/gettingStartedDynamoDBStreams.ts" ``` + 1. **Step 1**. Creates a partial failure batch processor for DynamoDB Streams. See [partial failure mechanics for details](#partial-failure-mechanics) + === "Sample response" The second record failed to be processed, therefore the processor added its sequence number in the response. @@ -165,6 +211,30 @@ Processing batches from DynamoDB Streams works in three stages: --8<-- "docs/snippets/batch/samples/sampleDynamoDBStreamsEvent.json" ``` +### Error handling + +By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution. + +=== "Sample error handling with custom exception" + + ```typescript hl_lines="30" + --8<-- + docs/snippets/batch/gettingStartedErrorHandling.ts::29 + docs/snippets/batch/gettingStartedErrorHandling.ts:31:38 + docs/snippets/batch/gettingStartedErrorHandling.ts:40:43 + --8<-- + ``` + + 1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor) + + 2. Exceptions raised in `record_handler` will propagate to `process_partial_response`.

We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab). + +=== "Sample response" + + ```json + --8<-- "docs/snippets/batch/samples/sampleSQSResponse.json" + ``` + ### Partial failure mechanics All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: @@ -173,6 +243,119 @@ All records in the batch will be passed to this handler for processing, even if * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing * **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing +The following sequence diagrams explain how each Batch processor behaves under different scenarios. + +#### SQS Standard + +> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. + +Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues. + +
+```mermaid +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + Lambda function->>Lambda service: Report some failed messages + activate SQS queue + Lambda service->>SQS queue: Delete successful messages + SQS queue-->>SQS queue: Failed messages return + Note over SQS queue,Lambda service: Process repeat + deactivate SQS queue +``` +SQS mechanism with Batch Item Failures +
+ +#### SQS FIFO + +> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. + +Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues. + +
+```mermaid +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure + deactivate Lambda function + activate SQS queue + Lambda service->>SQS queue: Delete successful messages (1-2) + SQS queue-->>SQS queue: Failed messages return (3-10) + deactivate SQS queue +``` +SQS FIFO mechanism with Batch Item Failures +
+ +#### Kinesis and DynamoDB Streams + +> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}. + +Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb). + +For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"} + +
+```mermaid +sequenceDiagram + autonumber + participant Streams + participant Lambda service + participant Lambda function + Lambda service->>Streams: Poll latest records + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function-->Lambda function: Continue processing batch items (4-10) + Lambda function->>Lambda service: Report batch item as failure (3) + deactivate Lambda function + activate Streams + Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item + Lambda service->>Streams: Poll records starting from updated checkpoint + deactivate Streams +``` +Kinesis and DynamoDB streams mechanism with single batch item failure +
+ +The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported. + +!!! important "Note that the batch item sequence number could be different from batch item number in the illustration." + +
+```mermaid +sequenceDiagram + autonumber + participant Streams + participant Lambda service + participant Lambda function + Lambda service->>Streams: Poll latest records + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3-5 batch items + Lambda function-->Lambda function: Continue processing batch items (6-10) + Lambda function->>Lambda service: Report batch items as failure (3-5) + deactivate Lambda function + activate Streams + Lambda service->>Streams: Checkpoints to lowest sequence number + Lambda service->>Streams: Poll records starting from updated checkpoint + deactivate Streams +``` +Kinesis and DynamoDB streams mechanism with multiple batch item failures +
+ ### Processing messages asynchronously You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently. @@ -197,10 +380,13 @@ Use the `BatchProcessor` directly in your function to access a list of all retur * **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record -```typescript hl_lines="27-28 30-32 37" title="Accessing processed messages" +```typescript hl_lines="25 27-28 30-33 38" title="Accessing processed messages" --8<-- "docs/snippets/batch/accessProcessedMessages.ts" ``` +1. The processor requires the records array. This is typically handled by `processPartialResponse`. +2. You need to register the `batch`, the `recordHandler` function, and optionally the `context` to access the Lambda context. + ### Accessing Lambda Context Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out. @@ -223,7 +409,7 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc ???+ example Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing -```typescript hl_lines="5-6 17-33 35 50-52" title="Extending failure handling mechanism in BatchProcessor" +```typescript hl_lines="5-6 17 21 25 31 35 50-52" title="Extending failure handling mechanism in BatchProcessor" --8<-- "docs/snippets/batch/extendingFailure.ts" ``` @@ -231,6 +417,30 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `asyncProcessRecord()` abstract methods. +
+```mermaid +classDiagram + direction LR + class BasePartialProcessor { + <> + +prepare() + +clean() + +processRecord(record: BaseRecord) + +asyncProcessRecord(record: BaseRecord) + } + + class YourCustomProcessor { + +prepare() + +clean() + +processRecord(record: BaseRecord) + +asyncProcessRecord(record: BaseRecord) + } + + BasePartialProcessor <|-- YourCustomProcessor : extends +``` +Visual representation to bring your own processor +
+ * **`processRecord()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`) * **`prepare()`** – called once as part of the processor initialization * **`clean()`** – teardown logic called once after `processRecord` completes @@ -238,7 +448,7 @@ You can create your own partial batch processor from scratch by inheriting the ` You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function. -```typescript hl_lines="7 11-13 19 28 39 60 71 82 92-94" title="Creating a custom batch processor" +```typescript hl_lines="8 12-14 20 29 40 61 72 83 93-95" title="Creating a custom batch processor" --8<-- "docs/snippets/batch/customPartialProcessor.ts" ```