forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqs_fifo_partial_processor.py
101 lines (75 loc) · 3.28 KB
/
sqs_fifo_partial_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import sys
from typing import List, Optional, Tuple, Type
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
#
# type specifics
#
has_pydantic = "pydantic" in sys.modules
# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
if has_pydantic:
BatchTypeModels = Optional[Type[SqsRecordModel]]
class SQSFifoCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO processing being interrupted
"""
pass
class SQSFifoPartialProcessor(BatchProcessor):
"""Specialized BatchProcessor subclass that handles FIFO SQS batch records.
As soon as the processing of the first record fails, the remaining records
are marked as failed without processing, and returned as native partial responses.
Example
_______
## Process batch triggered by a FIFO SQS
```python
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = SQSFifoPartialProcessor()
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
```
"""
circuitBreakerError = SQSFifoCircuitBreakerError("A previous record failed processing.")
def __init__(self, model: Optional["BatchTypeModels"] = None):
super().__init__(EventType.SQS, model)
def process(self) -> List[Tuple]:
result: List[Tuple] = []
for i, record in enumerate(self.records):
"""
If we have failed messages, it means that the last message failed.
We then short circuit the process, failing the remaining messages
"""
if self.fail_messages:
return self._short_circuit_processing(i, result)
"""
Otherwise, process the message normally
"""
result.append(self._process_record(record))
return result
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
remaining_records = self.records[first_failure_index:]
for remaining_record in remaining_records:
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
result.append(
self.failure_handler(
record=data, exception=(type(self.circuitBreakerError), self.circuitBreakerError, None)
)
)
return result
async def _async_process_record(self, record: dict):
raise NotImplementedError()