Skip to content

Commit 7686381

Browse files
committed
feat(batch): support SQS FIFO queues
1 parent 75b6924 commit 7686381

File tree

6 files changed

+225
-1
lines changed

6 files changed

+225
-1
lines changed

Diff for: aws_lambda_powertools/utilities/batch/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
batch_processor,
1717
)
1818
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
19+
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
20+
SQSFifoPartialProcessor,
21+
)
1922

2023
__all__ = (
2124
"BatchProcessor",
@@ -26,6 +29,7 @@
2629
"EventType",
2730
"FailureResponse",
2831
"SuccessResponse",
32+
"SQSFifoPartialProcessor",
2933
"batch_processor",
3034
"async_batch_processor",
3135
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import sys
2+
from typing import List, Optional, Tuple, Type
3+
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
5+
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
6+
7+
#
8+
# type specifics
9+
#
10+
has_pydantic = "pydantic" in sys.modules
11+
12+
# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses
13+
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
14+
if has_pydantic:
15+
BatchTypeModels = Optional[Type[SqsRecordModel]]
16+
17+
18+
class SQSFifoCircuitBreakerError(Exception):
19+
"""
20+
Signals a record not processed due to the SQS FIFO processing being interrupted
21+
"""
22+
23+
pass
24+
25+
26+
class SQSFifoPartialProcessor(BatchProcessor):
27+
"""Specialized BatchProcessor subclass that handles FIFO SQS batch records.
28+
29+
As soon as the processing of the first record fails, the remaining records
30+
are marked as failed without processing, and returned as native partial responses.
31+
32+
Example
33+
_______
34+
35+
## Process batch triggered by a FIFO SQS
36+
37+
```python
38+
import json
39+
40+
from aws_lambda_powertools import Logger, Tracer
41+
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor, EventType, batch_processor
42+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
43+
from aws_lambda_powertools.utilities.typing import LambdaContext
44+
45+
46+
processor = SQSFifoPartialProcessor()
47+
tracer = Tracer()
48+
logger = Logger()
49+
50+
51+
@tracer.capture_method
52+
def record_handler(record: SQSRecord):
53+
payload: str = record.body
54+
if payload:
55+
item: dict = json.loads(payload)
56+
...
57+
58+
@logger.inject_lambda_context
59+
@tracer.capture_lambda_handler
60+
@batch_processor(record_handler=record_handler, processor=processor)
61+
def lambda_handler(event, context: LambdaContext):
62+
return processor.response()
63+
```
64+
"""
65+
66+
circuitBreakerError = SQSFifoCircuitBreakerError("A previous record failed processing.")
67+
68+
def __init__(self, model: Optional["BatchTypeModels"] = None):
69+
super().__init__(EventType.SQS, model)
70+
71+
def process(self) -> List[Tuple]:
72+
result: List[Tuple] = []
73+
74+
for i, record in enumerate(self.records):
75+
"""
76+
If we have failed messages, it means that the last message failed.
77+
We then short circuit the process, failing the remaining messages
78+
"""
79+
if self.fail_messages:
80+
return self._short_circuit_processing(i, result)
81+
82+
"""
83+
Otherwise, process the message normally
84+
"""
85+
result.append(self._process_record(record))
86+
87+
return result
88+
89+
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
90+
remaining_records = self.records[first_failure_index:]
91+
for remaining_record in remaining_records:
92+
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
93+
result.append(
94+
self.failure_handler(
95+
record=data, exception=(type(self.circuitBreakerError), self.circuitBreakerError, None)
96+
)
97+
)
98+
return result
99+
100+
async def _async_process_record(self, record: dict):
101+
raise NotImplementedError()

Diff for: docs/utilities/batch.md

+18
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,24 @@ Processing batches from SQS works in four stages:
347347
}
348348
```
349349

350+
#### FIFO queues
351+
352+
If you're using this feature with a FIFO queue, you should use the `SQSFifoPartialProcessor` class instead. We will
353+
stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
354+
This helps preserve the ordering of messages in your queue.
355+
356+
=== "As a decorator"
357+
358+
```python hl_lines="5 11"
359+
--8<-- "examples/batch_processing/src/sqs_fifo_batch_processor.py"
360+
```
361+
362+
=== "As a context manager"
363+
364+
```python hl_lines="4 8"
365+
--8<-- "examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py"
366+
```
367+
350368
### Processing messages from Kinesis
351369

352370
Processing batches from Kinesis works in four stages:
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
SQSFifoPartialProcessor,
4+
batch_processor,
5+
)
6+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
processor = SQSFifoPartialProcessor()
10+
tracer = Tracer()
11+
logger = Logger()
12+
13+
14+
@tracer.capture_method
15+
def record_handler(record: SQSRecord):
16+
...
17+
18+
19+
@logger.inject_lambda_context
20+
@tracer.capture_lambda_handler
21+
@batch_processor(record_handler=record_handler, processor=processor)
22+
def lambda_handler(event, context: LambdaContext):
23+
return processor.response()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor
3+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
4+
from aws_lambda_powertools.utilities.typing import LambdaContext
5+
6+
processor = SQSFifoPartialProcessor()
7+
tracer = Tracer()
8+
logger = Logger()
9+
10+
11+
@tracer.capture_method
12+
def record_handler(record: SQSRecord):
13+
...
14+
15+
16+
@logger.inject_lambda_context
17+
@tracer.capture_lambda_handler
18+
def lambda_handler(event, context: LambdaContext):
19+
batch = event["Records"]
20+
with processor(records=batch, handler=record_handler):
21+
processor.process() # kick off processing, return List[Tuple]
22+
23+
return processor.response()

Diff for: tests/functional/test_utilities_batch.py

+56-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import uuid
23
from random import randint
34
from typing import Any, Awaitable, Callable, Dict, Optional
45

@@ -9,6 +10,7 @@
910
AsyncBatchProcessor,
1011
BatchProcessor,
1112
EventType,
13+
SQSFifoPartialProcessor,
1214
async_batch_processor,
1315
batch_processor,
1416
)
@@ -40,7 +42,7 @@
4042
def sqs_event_factory() -> Callable:
4143
def factory(body: str):
4244
return {
43-
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
45+
"messageId": str(uuid.uuid4()),
4446
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
4547
"body": body,
4648
"attributes": {
@@ -117,6 +119,17 @@ def handler(record):
117119
return handler
118120

119121

122+
@pytest.fixture(scope="module")
123+
def sqs_fifo_record_handler() -> Callable:
124+
def handler(record):
125+
body = record["body"]
126+
if "fail" in body:
127+
raise Exception("Failed to process record.")
128+
return body
129+
130+
return handler
131+
132+
120133
@pytest.fixture(scope="module")
121134
def async_record_handler() -> Callable[..., Awaitable[Any]]:
122135
async def handler(record):
@@ -654,6 +667,48 @@ def lambda_handler(event, context):
654667
assert "All records failed processing. " in str(e.value)
655668

656669

670+
def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, sqs_fifo_record_handler):
671+
# GIVEN
672+
first_record = SQSRecord(sqs_event_factory("success"))
673+
second_record = SQSRecord(sqs_event_factory("success"))
674+
event = {"Records": [first_record.raw_event, second_record.raw_event]}
675+
676+
processor = SQSFifoPartialProcessor()
677+
678+
@batch_processor(record_handler=sqs_fifo_record_handler, processor=processor)
679+
def lambda_handler(event, context):
680+
return processor.response()
681+
682+
# WHEN
683+
result = lambda_handler(event, {})
684+
685+
# THEN
686+
assert result["batchItemFailures"] == []
687+
688+
689+
def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, sqs_fifo_record_handler):
690+
# GIVEN
691+
first_record = SQSRecord(sqs_event_factory("success"))
692+
second_record = SQSRecord(sqs_event_factory("fail"))
693+
# this would normally suceed, but since it's a FIFO queue, it will be marked as failure
694+
third_record = SQSRecord(sqs_event_factory("success"))
695+
event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]}
696+
697+
processor = SQSFifoPartialProcessor()
698+
699+
@batch_processor(record_handler=sqs_fifo_record_handler, processor=processor)
700+
def lambda_handler(event, context):
701+
return processor.response()
702+
703+
# WHEN
704+
result = lambda_handler(event, {})
705+
706+
# THEN
707+
assert len(result["batchItemFailures"]) == 2
708+
assert result["batchItemFailures"][0]["itemIdentifier"] == second_record.message_id
709+
assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id
710+
711+
657712
def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler):
658713
# GIVEN
659714
first_record = SQSRecord(sqs_event_factory("success"))

0 commit comments

Comments
 (0)