Skip to content

Commit 14b44ce

Browse files
feat(batch): add option to not raise BatchProcessingError exception when the entire batch fails (#4719)
* Adding support for raise_on_entire_batch_fail flag * Addressing Andrea's feedback * Addressing Andrea's feedback * Addressing Andrea's feedback --------- Co-authored-by: Andrea Amorosi <[email protected]>
1 parent 3b9d8ab commit 14b44ce

File tree

4 files changed

+98
-4
lines changed

4 files changed

+98
-4
lines changed

Diff for: aws_lambda_powertools/utilities/batch/base.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,12 @@ def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
222222
class BasePartialBatchProcessor(BasePartialProcessor): # noqa
223223
DEFAULT_RESPONSE: PartialItemFailureResponse = {"batchItemFailures": []}
224224

225-
def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None):
225+
def __init__(
226+
self,
227+
event_type: EventType,
228+
model: Optional["BatchTypeModels"] = None,
229+
raise_on_entire_batch_failure: bool = True,
230+
):
226231
"""Process batch and partially report failed items
227232
228233
Parameters
@@ -231,6 +236,9 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N
231236
Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event
232237
model: Optional["BatchTypeModels"]
233238
Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord
239+
raise_on_entire_batch_failure: bool
240+
Raise an exception when the entire batch has failed processing.
241+
When set to False, partial failures are reported in the response
234242
235243
Exceptions
236244
----------
@@ -239,6 +247,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N
239247
"""
240248
self.event_type = event_type
241249
self.model = model
250+
self.raise_on_entire_batch_failure = raise_on_entire_batch_failure
242251
self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE)
243252
self._COLLECTOR_MAPPING = {
244253
EventType.SQS: self._collect_sqs_failures,
@@ -274,7 +283,7 @@ def _clean(self):
274283
if not self._has_messages_to_report():
275284
return
276285

277-
if self._entire_batch_failed():
286+
if self._entire_batch_failed() and self.raise_on_entire_batch_failure:
278287
raise BatchProcessingError(
279288
msg=f"All records failed processing. {len(self.exceptions)} individual errors logged "
280289
f"separately below.",
@@ -475,7 +484,7 @@ def lambda_handler(event, context: LambdaContext):
475484
Raises
476485
------
477486
BatchProcessingError
478-
When all batch records fail processing
487+
When all batch records fail processing and raise_on_entire_batch_failure is True
479488
480489
Limitations
481490
-----------
@@ -624,7 +633,7 @@ def lambda_handler(event, context: LambdaContext):
624633
Raises
625634
------
626635
BatchProcessingError
627-
When all batch records fail processing
636+
When all batch records fail processing and raise_on_entire_batch_failure is True
628637
629638
Limitations
630639
-----------

Diff for: docs/utilities/batch.md

+14
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,20 @@ Inheritance is importance because we need to access message IDs and sequence num
491491
--8<-- "examples/batch_processing/src/pydantic_dynamodb_event.json"
492492
```
493493

494+
### Working with full batch failures
495+
496+
By default, the `BatchProcessor` will raise `BatchProcessingError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.
497+
498+
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.
499+
500+
For these scenarios, you can set the `raise_on_entire_batch_failure` option to `False`.
501+
502+
=== "working_with_entire_batch_fail.py"
503+
504+
```python hl_lines="10"
505+
--8<-- "examples/batch_processing/src/working_with_entire_batch_fail.py"
506+
```
507+
494508
### Accessing processed messages
495509

496510
Use the context manager to access a list of all returned values from your `record_handler` function.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
BatchProcessor,
4+
EventType,
5+
process_partial_response,
6+
)
7+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
8+
from aws_lambda_powertools.utilities.typing import LambdaContext
9+
10+
processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False)
11+
tracer = Tracer()
12+
logger = Logger()
13+
14+
15+
@tracer.capture_method
16+
def record_handler(record: SQSRecord):
17+
payload: str = record.json_body # if json string data, otherwise record.body for str
18+
logger.info(payload)
19+
20+
21+
@logger.inject_lambda_context
22+
@tracer.capture_lambda_handler
23+
def lambda_handler(event, context: LambdaContext):
24+
return process_partial_response(
25+
event=event,
26+
record_handler=record_handler,
27+
processor=processor,
28+
context=context,
29+
)

Diff for: tests/functional/batch/required_dependencies/test_utilities_batch.py

+42
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,48 @@ def lambda_handler(event, context):
408408
assert "All records failed processing. " in str(e.value)
409409

410410

411+
def test_batch_processor_not_raise_when_entire_batch_fails_sync(sqs_event_factory, record_handler):
412+
first_record = SQSRecord(sqs_event_factory("fail"))
413+
second_record = SQSRecord(sqs_event_factory("fail"))
414+
event = {"Records": [first_record.raw_event, second_record.raw_event]}
415+
416+
# GIVEN the BatchProcessor constructor with raise_on_entire_batch_failure False
417+
processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False)
418+
419+
# WHEN processing the messages
420+
@batch_processor(record_handler=record_handler, processor=processor)
421+
def lambda_handler(event, context):
422+
return processor.response()
423+
424+
response = lambda_handler(event, {})
425+
426+
# THEN assert the `itemIdentifier` of each failure matches the message ID of the corresponding record
427+
assert len(response["batchItemFailures"]) == 2
428+
assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id
429+
assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id
430+
431+
432+
def test_batch_processor_not_raise_when_entire_batch_fails_async(sqs_event_factory, record_handler):
433+
first_record = SQSRecord(sqs_event_factory("fail"))
434+
second_record = SQSRecord(sqs_event_factory("fail"))
435+
event = {"Records": [first_record.raw_event, second_record.raw_event]}
436+
437+
# GIVEN the BatchProcessor constructor with raise_on_entire_batch_failure False
438+
processor = AsyncBatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False)
439+
440+
# WHEN processing the messages
441+
@async_batch_processor(record_handler=record_handler, processor=processor)
442+
def lambda_handler(event, context):
443+
return processor.response()
444+
445+
response = lambda_handler(event, {})
446+
447+
# THEN assert the `itemIdentifier` of each failure matches the message ID of the corresponding record
448+
assert len(response["batchItemFailures"]) == 2
449+
assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id
450+
assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id
451+
452+
411453
def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_fifo_factory, record_handler):
412454
# GIVEN
413455
first_record = SQSRecord(sqs_event_fifo_factory("success"))

0 commit comments

Comments
 (0)