-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathcustom_partial_processor.py
70 lines (52 loc) · 2.17 KB
/
custom_partial_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import sys
from random import randint
from typing import Any
import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
table_name = os.getenv("TABLE_NAME", "table_not_found")
logger = Logger()
class MyPartialProcessor(BasePartialProcessor):
"""
Process a record and stores successful results at a Amazon DynamoDB Table
Parameters
----------
table_name: str
DynamoDB table name to write results to
"""
def __init__(self, table_name: str):
self.table_name = table_name
super().__init__()
def _prepare(self):
# It's called once, *before* processing
# Creates table resource and clean previous results
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
self.success_messages.clear()
def _clean(self):
# It's called once, *after* closing processing all records (closing the context manager)
# Here we're sending, at once, all successful messages to a ddb table
with self.ddb_table.batch_writer() as batch:
for result in self.success_messages:
batch.put_item(Item=result)
def _process_record(self, record):
# It handles how your record is processed
# Here we're keeping the status of each run
# where self.handler is the record_handler function passed as an argument
try:
result = self.handler(record) # record_handler passed to decorator/context manager
return self.success_handler(record, result)
except Exception as exc:
logger.error(exc)
return self.failure_handler(record, sys.exc_info())
def success_handler(self, record, result: Any):
entry = ("success", result, record)
self.success_messages.append(record)
return entry
async def _async_process_record(self, record: dict):
raise NotImplementedError()
def record_handler(record):
return randint(0, 100)
@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
def lambda_handler(event, context):
return {"statusCode": 200}