From 513edb78111ddd8b43f4f57c69ed24b97d2c2636 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 26 Mar 2024 22:24:01 +0000 Subject: [PATCH] Making clear the example of how to create a custom processor --- docs/utilities/batch.md | 24 +++++++++++++++---- ...stom_partial_processor_dynamodb_table.yaml | 15 ++++++++++++ .../src/custom_partial_processor.py | 9 ++++++- .../src/custom_partial_processor_payload.json | 14 +++++++++++ 4 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 examples/batch_processing/sam/custom_partial_processor_dynamodb_table.yaml create mode 100644 examples/batch_processing/src/custom_partial_processor_payload.json diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index e5241d516e8..6b8e0fd3000 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -604,12 +604,28 @@ classDiagram * **`_prepare()`** – called once as part of the processor initialization * **`_clean()`** – teardown logic called once after `_process_record` completes * **`_async_process_record()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic +* **`response()`** - called upon completion of processing -You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. +You can utilize this class to instantiate a new processor and then pass it to the `process_partial_response` function. + +=== "Creating a custom batch processor" + + ```python hl_lines="10-13 21 37 43 46 53 64 69 73" + --8<-- "examples/batch_processing/src/custom_partial_processor.py" + ``` + +=== "DynamoDB table used for storing processed records." + + ```yaml + --8<-- "examples/batch_processing/sam/custom_partial_processor_dynamodb_table.yaml" + ``` + +=== "Sample event" + + ```json + --8<-- "examples/batch_processing/src/custom_partial_processor_payload.json" + ``` -```python hl_lines="9-11 19 33 39 46 57 62 66 74" title="Creating a custom batch processor" ---8<-- "examples/batch_processing/src/custom_partial_processor.py" -``` ### Caveats diff --git a/examples/batch_processing/sam/custom_partial_processor_dynamodb_table.yaml b/examples/batch_processing/sam/custom_partial_processor_dynamodb_table.yaml new file mode 100644 index 00000000000..ac8d9253ed2 --- /dev/null +++ b/examples/batch_processing/sam/custom_partial_processor_dynamodb_table.yaml @@ -0,0 +1,15 @@ +Transform: AWS::Serverless-2016-10-31 +Resources: + IdempotencyTable: + Type: AWS::DynamoDB::Table + Properties: + AttributeDefinitions: + - AttributeName: messageId + AttributeType: S + KeySchema: + - AttributeName: messageId + KeyType: HASH + TimeToLiveSpecification: + AttributeName: expiration + Enabled: true + BillingMode: PAY_PER_REQUEST diff --git a/examples/batch_processing/src/custom_partial_processor.py b/examples/batch_processing/src/custom_partial_processor.py index f4aaa5733b5..aa8e319b21d 100644 --- a/examples/batch_processing/src/custom_partial_processor.py +++ b/examples/batch_processing/src/custom_partial_processor.py @@ -1,3 +1,4 @@ +import copy import os import sys from random import randint @@ -10,13 +11,15 @@ BasePartialProcessor, process_partial_response, ) +from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse -table_name = os.getenv("TABLE_NAME", "table_not_found") +table_name = os.getenv("TABLE_NAME", "table_store_batch") logger = Logger() class MyPartialProcessor(BasePartialProcessor): + DEFAULT_RESPONSE: PartialItemFailureResponse = {"batchItemFailures": []} """ Process a record and stores successful results at a Amazon DynamoDB Table @@ -28,6 +31,7 @@ class MyPartialProcessor(BasePartialProcessor): def __init__(self, table_name: str): self.table_name = table_name + self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE) super().__init__() def _prepare(self): @@ -36,6 +40,9 @@ def _prepare(self): self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) self.success_messages.clear() + def response(self) -> PartialItemFailureResponse: + return self.batch_response + def _clean(self): # It's called once, *after* closing processing all records (closing the context manager) # Here we're sending, at once, all successful messages to a ddb table diff --git a/examples/batch_processing/src/custom_partial_processor_payload.json b/examples/batch_processing/src/custom_partial_processor_payload.json new file mode 100644 index 00000000000..421305a8c3d --- /dev/null +++ b/examples/batch_processing/src/custom_partial_processor_payload.json @@ -0,0 +1,14 @@ +{ + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"Message\": \"success\"}" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==" + } + ] + }