forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqs_fifo_partial_processor.py
92 lines (68 loc) · 3.12 KB
/
sqs_fifo_partial_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from typing import List, Optional, Tuple
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel
class SQSFifoCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO processing being interrupted
"""
pass
class SqsFifoPartialProcessor(BatchProcessor):
"""Process native partial responses from SQS FIFO queues.
Stops processing records when the first record fails. The remaining records are reported as failed items.
Example
_______
## Process batch triggered by a FIFO SQS
```python
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = SqsFifoPartialProcessor()
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
```
"""
circuit_breaker_exc = (
SQSFifoCircuitBreakerError,
SQSFifoCircuitBreakerError("A previous record failed processing"),
None,
)
def __init__(self, model: Optional["BatchSqsTypeModel"] = None):
super().__init__(EventType.SQS, model)
def process(self) -> List[Tuple]:
"""
Call instance's handler for each record. When the first failed message is detected,
the process is short-circuited, and the remaining messages are reported as failed items.
"""
result: List[Tuple] = []
for i, record in enumerate(self.records):
# If we have failed messages, it means that the last message failed.
# We then short circuit the process, failing the remaining messages
if self.fail_messages:
return self._short_circuit_processing(i, result)
# Otherwise, process the message normally
result.append(self._process_record(record))
return result
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
"""
Starting from the first failure index, fail all the remaining messages, and append them to the result list.
"""
remaining_records = self.records[first_failure_index:]
for remaining_record in remaining_records:
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
return result
async def _async_process_record(self, record: dict):
raise NotImplementedError()