Skip to content

Commit fd9f882

Browse files
docs(batch): improved the example demonstrating how to create a custom partial processor. (#4024)
Making clear the example of how to create a custom processor
1 parent cea4af6 commit fd9f882

File tree

4 files changed

+57
-5
lines changed

4 files changed

+57
-5
lines changed

docs/utilities/batch.md

+20-4
Original file line numberDiff line numberDiff line change
@@ -604,12 +604,28 @@ classDiagram
604604
* **`_prepare()`** – called once as part of the processor initialization
605605
* **`_clean()`** – teardown logic called once after `_process_record` completes
606606
* **`_async_process_record()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
607+
* **`response()`** - called upon completion of processing
607608

608-
You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function.
609+
You can utilize this class to instantiate a new processor and then pass it to the `process_partial_response` function.
610+
611+
=== "Creating a custom batch processor"
612+
613+
```python hl_lines="10-13 21 37 43 46 53 64 69 73"
614+
--8<-- "examples/batch_processing/src/custom_partial_processor.py"
615+
```
616+
617+
=== "DynamoDB table used for storing processed records."
618+
619+
```yaml
620+
--8<-- "examples/batch_processing/sam/custom_partial_processor_dynamodb_table.yaml"
621+
```
622+
623+
=== "Sample event"
624+
625+
```json
626+
--8<-- "examples/batch_processing/src/custom_partial_processor_payload.json"
627+
```
609628

610-
```python hl_lines="9-11 19 33 39 46 57 62 66 74" title="Creating a custom batch processor"
611-
--8<-- "examples/batch_processing/src/custom_partial_processor.py"
612-
```
613629

614630
### Caveats
615631

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
Transform: AWS::Serverless-2016-10-31
2+
Resources:
3+
IdempotencyTable:
4+
Type: AWS::DynamoDB::Table
5+
Properties:
6+
AttributeDefinitions:
7+
- AttributeName: messageId
8+
AttributeType: S
9+
KeySchema:
10+
- AttributeName: messageId
11+
KeyType: HASH
12+
TimeToLiveSpecification:
13+
AttributeName: expiration
14+
Enabled: true
15+
BillingMode: PAY_PER_REQUEST

examples/batch_processing/src/custom_partial_processor.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import os
23
import sys
34
from random import randint
@@ -10,13 +11,15 @@
1011
BasePartialProcessor,
1112
process_partial_response,
1213
)
14+
from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse
1315

14-
table_name = os.getenv("TABLE_NAME", "table_not_found")
16+
table_name = os.getenv("TABLE_NAME", "table_store_batch")
1517

1618
logger = Logger()
1719

1820

1921
class MyPartialProcessor(BasePartialProcessor):
22+
DEFAULT_RESPONSE: PartialItemFailureResponse = {"batchItemFailures": []}
2023
"""
2124
Process a record and stores successful results at a Amazon DynamoDB Table
2225
@@ -28,6 +31,7 @@ class MyPartialProcessor(BasePartialProcessor):
2831

2932
def __init__(self, table_name: str):
3033
self.table_name = table_name
34+
self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
3135
super().__init__()
3236

3337
def _prepare(self):
@@ -36,6 +40,9 @@ def _prepare(self):
3640
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
3741
self.success_messages.clear()
3842

43+
def response(self) -> PartialItemFailureResponse:
44+
return self.batch_response
45+
3946
def _clean(self):
4047
# It's called once, *after* closing processing all records (closing the context manager)
4148
# Here we're sending, at once, all successful messages to a ddb table
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
5+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
6+
"body": "{\"Message\": \"success\"}"
7+
},
8+
{
9+
"messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
10+
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
11+
"body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg=="
12+
}
13+
]
14+
}

0 commit comments

Comments
 (0)