Skip to content

Commit 1ad32f5

Browse files
Refactoring logic to skip execution when messages are part of a groupid with failed messages
1 parent de5d756 commit 1ad32f5

File tree

3 files changed

+86
-20
lines changed

3 files changed

+86
-20
lines changed

aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py

+33-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import logging
12
from typing import List, Optional, Tuple
23

34
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
45
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel
56

7+
logger = logging.getLogger(__name__)
8+
69

710
class SQSFifoCircuitBreakerError(Exception):
811
"""
@@ -57,19 +60,20 @@ def lambda_handler(event, context: LambdaContext):
5760
None,
5861
)
5962

60-
def __init__(self, model: Optional["BatchSqsTypeModel"] = None, return_on_first_error: bool = True):
63+
def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_error: bool = False):
6164
"""
6265
Initialize the SqsFifoProcessor.
6366
6467
Parameters
6568
----------
6669
model: Optional["BatchSqsTypeModel"]
6770
An optional model for batch processing.
68-
return_on_first_error: bool
69-
Flag to determine whether to return on the first error encountered. Default is True
71+
skip_group_on_error: bool
72+
# TODO: Alterar
73+
Determine whether to return on the first error encountered. Default is True
7074
7175
"""
72-
self.return_on_first_error = return_on_first_error
76+
self._skip_group_on_error = skip_group_on_error
7377
super().__init__(EventType.SQS, model)
7478

7579
def process(self) -> List[Tuple]:
@@ -78,25 +82,41 @@ def process(self) -> List[Tuple]:
7882
the process is short-circuited, and the remaining messages are reported as failed items.
7983
"""
8084
result: List[Tuple] = []
85+
skip_messages_group_id: List = []
8186

8287
for i, record in enumerate(self.records):
8388
# If we have failed messages and we are set to return on the first error,
8489
# short circuit the process and return the remaining messages as failed items
85-
if self.fail_messages and self.return_on_first_error:
90+
if self.fail_messages and not self._skip_group_on_error:
91+
logger.debug("Processing of failed messages stopped due to the 'skip_group_on_error' is set to False")
8692
return self._short_circuit_processing(i, result)
8793

88-
# Process the current record
94+
msg_id = record.get("messageId")
95+
96+
# skip_group_on_error is True:
97+
# Skip processing the current message if its ID belongs to a group with failed messages
98+
if msg_id in skip_messages_group_id:
99+
logger.debug(
100+
f"Skipping message with ID '{msg_id}' as it is part of a group containing failed messages.",
101+
)
102+
continue
103+
89104
processed_messages = self._process_record(record)
90105

91-
# If a processed message fail,
106+
# If a processed message fail and skip_group_on_error is True,
92107
# mark subsequent messages from the same MessageGroupId as skipped
93-
if processed_messages[0] == "fail":
108+
if processed_messages[0] == "fail" and self._skip_group_on_error:
109+
_attributes_record = record.get("attributes", {})
94110
for subsequent_record in self.records[i + 1 :]:
95-
if subsequent_record.get("attributes", {}).get("MessageGroupId") == record.get(
96-
"attributes",
97-
{},
98-
).get("MessageGroupId"):
99-
continue # Skip subsequent message from the same MessageGroupId
111+
_attributes = subsequent_record.get("attributes", {})
112+
if _attributes.get("MessageGroupId") == _attributes_record.get("MessageGroupId"):
113+
skip_messages_group_id.append(subsequent_record.get("messageId"))
114+
data = self._to_batch_type(
115+
record=subsequent_record,
116+
event_type=self.event_type,
117+
model=self.model,
118+
)
119+
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
100120

101121
# Append the processed message normally
102122
result.append(processed_messages)

docs/utilities/batch.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ Processing batches from SQS works in three stages:
141141

142142
#### FIFO queues
143143

144-
When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, it's important to know that a batch sent from SQS to Lambda can include multiple messages from different group IDs.
144+
When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, you should know that a batch may include messages from different group IDs.
145+
146+
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering.
145147

146148
By default, message processing halts after the initial failure, returning all failed and unprocessed messages in `batchItemFailures` to preserve the ordering of messages in your queue. However, customers can opt to continue processing messages and retrieve failed messages within a message group ID by setting `return_on_first_error` to False.
147149

tests/functional/test_utilities_batch.py

+50-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from aws_lambda_powertools.utilities.parser.models import (
2929
DynamoDBStreamChangedRecordModel,
3030
DynamoDBStreamRecordModel,
31+
SqsRecordModel,
3132
)
3233
from aws_lambda_powertools.utilities.parser.types import Literal
3334
from tests.functional.batch.sample_models import (
@@ -728,13 +729,13 @@ def lambda_handler(event, context):
728729
assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id
729730

730731

731-
def test_sqs_fifo_batch_processor_middleware_without_first_failure(sqs_event_fifo_factory, record_handler):
732+
def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error(sqs_event_fifo_factory, record_handler):
732733
# GIVEN a batch of 5 records with 3 different MessageGroupID
733734
first_record = SQSRecord(sqs_event_fifo_factory("success", "1"))
734735
second_record = SQSRecord(sqs_event_fifo_factory("success", "1"))
735736
third_record = SQSRecord(sqs_event_fifo_factory("fail", "2"))
736-
fourth_record = SQSRecord(sqs_event_fifo_factory("fail", "2"))
737-
fifth_record = SQSRecord(sqs_event_fifo_factory("success", "3"))
737+
fourth_record = SQSRecord(sqs_event_fifo_factory("success", "2"))
738+
fifth_record = SQSRecord(sqs_event_fifo_factory("fail", "3"))
738739
event = {
739740
"Records": [
740741
first_record.raw_event,
@@ -746,7 +747,7 @@ def test_sqs_fifo_batch_processor_middleware_without_first_failure(sqs_event_fif
746747
}
747748

748749
# WHEN the FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID
749-
processor = SqsFifoPartialProcessor(return_on_first_error=False)
750+
processor = SqsFifoPartialProcessor(skip_group_on_error=True)
750751

751752
@batch_processor(record_handler=record_handler, processor=processor)
752753
def lambda_handler(event, context):
@@ -755,10 +756,53 @@ def lambda_handler(event, context):
755756
# WHEN
756757
result = lambda_handler(event, {})
757758

758-
# THEN only failed messages should originate from MessageGroupID 2
759-
assert len(result["batchItemFailures"]) == 2
759+
# THEN only failed messages should originate from MessageGroupID 3
760+
assert len(result["batchItemFailures"]) == 3
761+
assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id
762+
assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id
763+
assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id
764+
765+
766+
def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error_and_model(sqs_event_fifo_factory, record_handler):
767+
# GIVEN a batch of 5 records with 3 different MessageGroupID
768+
first_record = SQSRecord(sqs_event_fifo_factory("success", "1"))
769+
second_record = SQSRecord(sqs_event_fifo_factory("success", "1"))
770+
third_record = SQSRecord(sqs_event_fifo_factory("fail", "2"))
771+
fourth_record = SQSRecord(sqs_event_fifo_factory("success", "2"))
772+
fifth_record = SQSRecord(sqs_event_fifo_factory("fail", "3"))
773+
event = {
774+
"Records": [
775+
first_record.raw_event,
776+
second_record.raw_event,
777+
third_record.raw_event,
778+
fourth_record.raw_event,
779+
fifth_record.raw_event,
780+
],
781+
}
782+
783+
class OrderSqsRecord(SqsRecordModel):
784+
receiptHandle: str
785+
786+
# WHEN the FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID
787+
# WHEN processor is using a Pydantic Model we must be able to access MessageGroupID property
788+
processor = SqsFifoPartialProcessor(skip_group_on_error=True, model=OrderSqsRecord)
789+
790+
def record_handler(record: OrderSqsRecord):
791+
if record.body == "fail":
792+
raise ValueError("blah")
793+
794+
@batch_processor(record_handler=record_handler, processor=processor)
795+
def lambda_handler(event, context):
796+
return processor.response()
797+
798+
# WHEN
799+
result = lambda_handler(event, {})
800+
801+
# THEN only failed messages should originate from MessageGroupID 3
802+
assert len(result["batchItemFailures"]) == 3
760803
assert result["batchItemFailures"][0]["itemIdentifier"] == third_record.message_id
761804
assert result["batchItemFailures"][1]["itemIdentifier"] == fourth_record.message_id
805+
assert result["batchItemFailures"][2]["itemIdentifier"] == fifth_record.message_id
762806

763807

764808
def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler):

0 commit comments

Comments
 (0)