From bd4aca594f9bd7efc5383e2bfead48ef0381034e Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 9 Jul 2024 18:13:24 +0100 Subject: [PATCH 1/4] Adding support for raise_on_entire_batch_fail flag --- aws_lambda_powertools/utilities/batch/base.py | 17 ++++++-- docs/utilities/batch.md | 14 +++++++ .../src/working_with_entire_batch_fail.py | 29 +++++++++++++ .../test_utilities_batch.py | 42 +++++++++++++++++++ 4 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 examples/batch_processing/src/working_with_entire_batch_fail.py diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 569467f2248..559128ac20c 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -222,7 +222,12 @@ def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse: class BasePartialBatchProcessor(BasePartialProcessor): # noqa DEFAULT_RESPONSE: PartialItemFailureResponse = {"batchItemFailures": []} - def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None): + def __init__( + self, + event_type: EventType, + model: Optional["BatchTypeModels"] = None, + raise_on_entire_batch_fail: bool = True, + ): """Process batch and partially report failed items Parameters @@ -231,6 +236,9 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event model: Optional["BatchTypeModels"] Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord + raise_on_entire_batch_fail: bool + Raise an exception when the entire batch has failed processing. + When set to False, partial failures are reported in the response Exceptions ---------- @@ -239,6 +247,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N """ self.event_type = event_type self.model = model + self.raise_on_entire_batch_fail = raise_on_entire_batch_fail self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE) self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, @@ -274,7 +283,7 @@ def _clean(self): if not self._has_messages_to_report(): return - if self._entire_batch_failed(): + if self._entire_batch_failed() and self.raise_on_entire_batch_fail: raise BatchProcessingError( msg=f"All records failed processing. {len(self.exceptions)} individual errors logged " f"separately below.", @@ -475,7 +484,7 @@ def lambda_handler(event, context: LambdaContext): Raises ------ BatchProcessingError - When all batch records fail processing + When all batch records fail processing and raise_on_entire_batch_fail is True Limitations ----------- @@ -624,7 +633,7 @@ def lambda_handler(event, context: LambdaContext): Raises ------ BatchProcessingError - When all batch records fail processing + When all batch records fail processing and raise_on_entire_batch_fail is True Limitations ----------- diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 6b8e0fd3000..a07b25a459f 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -491,6 +491,20 @@ Inheritance is importance because we need to access message IDs and sequence num --8<-- "examples/batch_processing/src/pydantic_dynamodb_event.json" ``` +### Working with full batch failures + +By default, the `BatchProcessor` will raise `BatchProcessingError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics. + +When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance. + +For these scenarios, you can set the `raise_on_entire_batch_fail` option to `False`. + +=== "working_with_entire_batch_fail.py" + + ```python hl_lines="10" + --8<-- "examples/batch_processing/src/working_with_entire_batch_fail.py" + ``` + ### Accessing processed messages Use the context manager to access a list of all returned values from your `record_handler` function. diff --git a/examples/batch_processing/src/working_with_entire_batch_fail.py b/examples/batch_processing/src/working_with_entire_batch_fail.py new file mode 100644 index 00000000000..ede1b4b3302 --- /dev/null +++ b/examples/batch_processing/src/working_with_entire_batch_fail.py @@ -0,0 +1,29 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_fail=False) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.json_body # if json string data, otherwise record.body for str + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response( + event=event, + record_handler=record_handler, + processor=processor, + context=context, + ) diff --git a/tests/functional/batch/required_dependencies/test_utilities_batch.py b/tests/functional/batch/required_dependencies/test_utilities_batch.py index 732e2f0ef78..10b489fa7df 100644 --- a/tests/functional/batch/required_dependencies/test_utilities_batch.py +++ b/tests/functional/batch/required_dependencies/test_utilities_batch.py @@ -408,6 +408,48 @@ def lambda_handler(event, context): assert "All records failed processing. " in str(e.value) +def test_batch_processor_not_raise_when_entire_batch_fails_sync(sqs_event_factory, record_handler): + first_record = SQSRecord(sqs_event_factory("fail")) + second_record = SQSRecord(sqs_event_factory("fail")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} + + # GIVEN the BatchProcessor constructor with raise_on_entire_batch_fail False + processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_fail=False) + + # WHEN processing the messages + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + response = lambda_handler(event, {}) + + # THEN assert the `itemIdentifier` of each failure matches the message ID of the corresponding record + assert len(response["batchItemFailures"]) == 2 + assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id + assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id + + +def test_batch_processor_not_raise_when_entire_batch_fails_async(sqs_event_factory, record_handler): + first_record = SQSRecord(sqs_event_factory("fail")) + second_record = SQSRecord(sqs_event_factory("fail")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} + + # GIVEN the BatchProcessor constructor with raise_on_entire_batch_fail False + processor = AsyncBatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_fail=False) + + # WHEN processing the messages + @async_batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + response = lambda_handler(event, {}) + + # THEN assert the `itemIdentifier` of each failure matches the message ID of the corresponding record + assert len(response["batchItemFailures"]) == 2 + assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id + assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id + + def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_fifo_factory, record_handler): # GIVEN first_record = SQSRecord(sqs_event_fifo_factory("success")) From 56b9f1ef27d9e009ba88ce1fed2db0911c77b9da Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 10 Jul 2024 10:21:03 +0100 Subject: [PATCH 2/4] Addressing Andrea's feedback --- aws_lambda_powertools/utilities/batch/base.py | 8 ++++---- docs/utilities/batch.md | 2 +- .../src/working_with_entire_batch_fail.py | 2 +- .../batch/required_dependencies/test_utilities_batch.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 559128ac20c..a153e9e54b7 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -226,7 +226,7 @@ def __init__( self, event_type: EventType, model: Optional["BatchTypeModels"] = None, - raise_on_entire_batch_fail: bool = True, + raise_on_entire_batch_failure: bool = True, ): """Process batch and partially report failed items @@ -247,7 +247,7 @@ def __init__( """ self.event_type = event_type self.model = model - self.raise_on_entire_batch_fail = raise_on_entire_batch_fail + self.raise_on_entire_batch_fail = raise_on_entire_batch_failure self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE) self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, @@ -484,7 +484,7 @@ def lambda_handler(event, context: LambdaContext): Raises ------ BatchProcessingError - When all batch records fail processing and raise_on_entire_batch_fail is True + When all batch records fail processing and raise_on_entire_batch_failure is True Limitations ----------- @@ -633,7 +633,7 @@ def lambda_handler(event, context: LambdaContext): Raises ------ BatchProcessingError - When all batch records fail processing and raise_on_entire_batch_fail is True + When all batch records fail processing and raise_on_entire_batch_failure is True Limitations ----------- diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index a07b25a459f..65efb6a1805 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -497,7 +497,7 @@ By default, the `BatchProcessor` will raise `BatchProcessingError` if all record When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance. -For these scenarios, you can set the `raise_on_entire_batch_fail` option to `False`. +For these scenarios, you can set the `raise_on_entire_batch_failure` option to `False`. === "working_with_entire_batch_fail.py" diff --git a/examples/batch_processing/src/working_with_entire_batch_fail.py b/examples/batch_processing/src/working_with_entire_batch_fail.py index ede1b4b3302..9058ce23483 100644 --- a/examples/batch_processing/src/working_with_entire_batch_fail.py +++ b/examples/batch_processing/src/working_with_entire_batch_fail.py @@ -7,7 +7,7 @@ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext -processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_fail=False) +processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False) tracer = Tracer() logger = Logger() diff --git a/tests/functional/batch/required_dependencies/test_utilities_batch.py b/tests/functional/batch/required_dependencies/test_utilities_batch.py index 10b489fa7df..a85ea822b2b 100644 --- a/tests/functional/batch/required_dependencies/test_utilities_batch.py +++ b/tests/functional/batch/required_dependencies/test_utilities_batch.py @@ -414,7 +414,7 @@ def test_batch_processor_not_raise_when_entire_batch_fails_sync(sqs_event_factor event = {"Records": [first_record.raw_event, second_record.raw_event]} # GIVEN the BatchProcessor constructor with raise_on_entire_batch_fail False - processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_fail=False) + processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False) # WHEN processing the messages @batch_processor(record_handler=record_handler, processor=processor) @@ -435,7 +435,7 @@ def test_batch_processor_not_raise_when_entire_batch_fails_async(sqs_event_facto event = {"Records": [first_record.raw_event, second_record.raw_event]} # GIVEN the BatchProcessor constructor with raise_on_entire_batch_fail False - processor = AsyncBatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_fail=False) + processor = AsyncBatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False) # WHEN processing the messages @async_batch_processor(record_handler=record_handler, processor=processor) From 0e29d93eb704456e1d3e363ea1a4f97ca6f35961 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 10 Jul 2024 10:25:16 +0100 Subject: [PATCH 3/4] Addressing Andrea's feedback --- .../batch/required_dependencies/test_utilities_batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional/batch/required_dependencies/test_utilities_batch.py b/tests/functional/batch/required_dependencies/test_utilities_batch.py index a85ea822b2b..77b1f865dca 100644 --- a/tests/functional/batch/required_dependencies/test_utilities_batch.py +++ b/tests/functional/batch/required_dependencies/test_utilities_batch.py @@ -413,7 +413,7 @@ def test_batch_processor_not_raise_when_entire_batch_fails_sync(sqs_event_factor second_record = SQSRecord(sqs_event_factory("fail")) event = {"Records": [first_record.raw_event, second_record.raw_event]} - # GIVEN the BatchProcessor constructor with raise_on_entire_batch_fail False + # GIVEN the BatchProcessor constructor with raise_on_entire_batch_failure False processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False) # WHEN processing the messages @@ -434,7 +434,7 @@ def test_batch_processor_not_raise_when_entire_batch_fails_async(sqs_event_facto second_record = SQSRecord(sqs_event_factory("fail")) event = {"Records": [first_record.raw_event, second_record.raw_event]} - # GIVEN the BatchProcessor constructor with raise_on_entire_batch_fail False + # GIVEN the BatchProcessor constructor with raise_on_entire_batch_failure False processor = AsyncBatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False) # WHEN processing the messages From 7a44ba0ddddf6f5a4da88e587b607c3f94884f1e Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 10 Jul 2024 14:10:59 +0100 Subject: [PATCH 4/4] Addressing Andrea's feedback --- aws_lambda_powertools/utilities/batch/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index a153e9e54b7..72d43d8af82 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -236,7 +236,7 @@ def __init__( Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event model: Optional["BatchTypeModels"] Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord - raise_on_entire_batch_fail: bool + raise_on_entire_batch_failure: bool Raise an exception when the entire batch has failed processing. When set to False, partial failures are reported in the response @@ -247,7 +247,7 @@ def __init__( """ self.event_type = event_type self.model = model - self.raise_on_entire_batch_fail = raise_on_entire_batch_failure + self.raise_on_entire_batch_failure = raise_on_entire_batch_failure self.batch_response: PartialItemFailureResponse = copy.deepcopy(self.DEFAULT_RESPONSE) self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, @@ -283,7 +283,7 @@ def _clean(self): if not self._has_messages_to_report(): return - if self._entire_batch_failed() and self.raise_on_entire_batch_fail: + if self._entire_batch_failed() and self.raise_on_entire_batch_failure: raise BatchProcessingError( msg=f"All records failed processing. {len(self.exceptions)} individual errors logged " f"separately below.",