Skip to content

Commit 9caf3d1

Browse files
committed
docs: refactor example; improve docs about creating your own processor
1 parent 25d67ca commit 9caf3d1

File tree

1 file changed

+37
-30
lines changed

1 file changed

+37
-30
lines changed

docs/content/utilities/batch.mdx

+37-30
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import Note from "../../src/components/Note"
77

88
One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once.
99

10-
The proposed batch utility aims to provide an abstraction to process batch events, providing base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor.
11-
It also provides a useful implementation to handle partial batch failures from the SQS provider.
10+
The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (`BasePartialProcessor`) allowing you to create your **own** batch processor.
1211

1312
**Key Features**
1413

@@ -63,64 +62,72 @@ def lambda_handler(event, context):
6362
return {"statusCode": 200}
6463
```
6564

66-
## Create your own processor
65+
## Create your own partial processor
6766

68-
You can create your own batch processor by inheriting the `BaseProcessor` class, and implementing `_prepare()`, `_clean` and `_process_record()`.
69-
It's also possible to inherit the `BasePartialProcessor` which contains additional logic to handle a partial failure and keep track of record status.
67+
You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`.
68+
69+
All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively.
7070

7171
**Example:**
7272

7373
```python:title=custom_processor.py
74-
from uuid import uuid4
74+
from random import randint
7575

76-
from aws_lambda_powertools.utilities.batch import BaseProcessor, batch_processor
76+
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
7777
import boto3
7878

7979
def record_handler(record):
80-
return {"Id": str(uuid4()), "MessageBody": record["body"]}
80+
return randint(0, 100)
8181

8282

83-
class DDBStreamProcessor(BaseProcessor):
83+
class MyPartialProcessor(BasePartialProcessor):
8484
"""
85-
1. Listens to streams from table A;
86-
2. Process each record;
87-
3. Send a batch message to a Queue with the result.
85+
Process a record and stores successful results at a DDB Table
8886
8987
Parameters
9088
----------
91-
queue_name: str
92-
QueueName to send the results
89+
table_name: str
90+
Table name to write results
9391
"""
9492

95-
def __init__(self, queue_name: str):
96-
self.queue_name = queue_name
97-
self.queue_url = None
98-
self.client = boto3.client("sqs")
99-
self.results = []
93+
def __init__(self, table_name: str):
94+
self.table_name = table_name
95+
96+
super().__init__()
10097

10198
def _prepare(self):
10299
# It's called once, *before* processing
103-
# Formats queue_url given a name
100+
# Creates table resource and clean previous results
104101
# E.g.:
105-
self.queue_url = f"https://queue.amazonaws.com/123456789012/{self.queue_name}"
102+
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
103+
self.success_messages.clear()
106104

107105
def _clean(self):
108106
# It's called once, *after* closing processing all records (closing the context manager)
109-
# Here we're sending at once all messages to the queue, and cleaning 'results' for future invocations
107+
# Here we're sending, at once, all successful messages to a ddb table
110108
# E.g.:
111-
self.client.send_message_batch(QueueUrl=self.queue_url, Entries=[self.results])
112-
self.results.clear()
109+
with ddb_table.batch_writer() as batch:
110+
for result in self.success_messages:
111+
batch.put_item(Item=result)
113112

114113
def _process_record(self, record):
115-
# It handles how you process your record
116-
# Here we're storing the result of each record in a list
114+
# It handles how your record is processed
115+
# Here we're keeping the status of each run
117116
# E.g.:
118-
result = self.handler(record)
119-
self.results.append(result)
120-
return result
117+
try:
118+
result = self.handler(record)
119+
return self.success_handler(record, result)
120+
except Exception as exc:
121+
return self.failure_handler(record, exc)
122+
123+
def success_handler(self, record):
124+
entry = ("success", result, record)
125+
message = {"age": result}
126+
self.success_messages.append(message)
127+
return entry
121128

122129

123-
@batch_processor(record_handler=record_handler, processor=DDBStreamProcessor("dummy-queue"))
130+
@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table"))
124131
def lambda_handler(event, context):
125132
return {"statusCode": 200}
126133
```

0 commit comments

Comments
 (0)