-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathbase.py
146 lines (119 loc) · 3.6 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# -*- coding: utf-8 -*-
"""
Batch processing utilities
"""
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Tuple
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
logger = logging.getLogger(__name__)
class BasePartialProcessor(ABC):
"""
Abstract class for batch processors.
"""
def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []
self.exceptions: List = []
@abstractmethod
def _prepare(self):
"""
Prepare context manager.
"""
raise NotImplementedError()
@abstractmethod
def _clean(self):
"""
Clear context manager.
"""
raise NotImplementedError()
@abstractmethod
def _process_record(self, record: Any):
"""
Process record with handler.
"""
raise NotImplementedError()
def process(self) -> List[Tuple]:
"""
Call instance's handler for each record.
"""
return [self._process_record(record) for record in self.records]
def __enter__(self):
self._prepare()
return self
def __exit__(self, exception_type, exception_value, traceback):
self._clean()
def __call__(self, records: Iterable[Any], handler: Callable):
"""
Set instance attributes before execution
Parameters
----------
records: Iterable[Any]
Iterable with objects to be processed.
handler: Callable
Callable to process "records" entries.
"""
self.records = records
self.handler = handler
return self
def success_handler(self, record: Any, result: Any):
"""
Success callback
Returns
-------
tuple
"success", result, original record
"""
entry = ("success", result, record)
self.success_messages.append(record)
return entry
def failure_handler(self, record: Any, exception: Exception):
"""
Failure callback
Returns
-------
tuple
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
logger.debug("Record processing exception: ", exception)
self.exceptions.append(exception)
self.fail_messages.append(record)
return entry
@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
):
"""
Middleware to handle batch event processing
Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
record_handler: Callable
Callable to process each record from the batch
processor: PartialSQSProcessor
Batch Processor to handle partial failure cases
Examples
--------
**Processes Lambda's event with PartialSQSProcessor**
>>> from aws_lambda_powertools.utilities.batch import batch_processor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
>>> def handler(event, context):
>>> return {"StatusCode": 200}
Limitations
-----------
* Async batch processors
"""
records = event["Records"]
with processor(records, record_handler):
processor.process()
return handler(event, context)