diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py
index 068cdaa9ee9..d308a56abda 100644
--- a/aws_lambda_powertools/utilities/batch/__init__.py
+++ b/aws_lambda_powertools/utilities/batch/__init__.py
@@ -4,12 +4,7 @@
Batch processing utility
"""
-from .base import BasePartialProcessor
-from .middlewares import batch_processor
-from .sqs import PartialSQSProcessor
+from .base import BasePartialProcessor, batch_processor
+from .sqs import PartialSQSProcessor, sqs_batch_processor
-__all__ = (
- "BasePartialProcessor",
- "PartialSQSProcessor",
- "batch_processor",
-)
+__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor")
diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py
index a184a879441..19b627704f9 100644
--- a/aws_lambda_powertools/utilities/batch/base.py
+++ b/aws_lambda_powertools/utilities/batch/base.py
@@ -4,8 +4,13 @@
Batch processing utilities
"""
+import logging
from abc import ABC, abstractmethod
-from typing import Any, Callable, Iterable, List, Tuple
+from typing import Any, Callable, Dict, Iterable, List, Tuple
+
+from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
+
+logger = logging.getLogger(__name__)
class BasePartialProcessor(ABC):
@@ -16,6 +21,7 @@ class BasePartialProcessor(ABC):
def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []
+ self.exceptions: List = []
@abstractmethod
def _prepare(self):
@@ -89,5 +95,52 @@ def failure_handler(self, record: Any, exception: Exception):
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
+ logger.debug("Record processing exception: ", exception)
+ self.exceptions.append(exception)
self.fail_messages.append(record)
return entry
+
+
+@lambda_handler_decorator
+def batch_processor(
+ handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
+):
+ """
+ Middleware to handle batch event processing
+
+ Parameters
+ ----------
+ handler: Callable
+ Lambda's handler
+ event: Dict
+ Lambda's Event
+ context: Dict
+ Lambda's Context
+ record_handler: Callable
+ Callable to process each record from the batch
+ processor: PartialSQSProcessor
+ Batch Processor to handle partial failure cases
+
+ Examples
+ --------
+ **Processes Lambda's event with PartialSQSProcessor**
+ >>> from aws_lambda_powertools.utilities.batch import batch_processor
+ >>>
+ >>> def record_handler(record):
+ >>> return record["body"]
+ >>>
+ >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
+ >>> def handler(event, context):
+ >>> return {"StatusCode": 200}
+
+ Limitations
+ -----------
+ * Async batch processors
+
+ """
+ records = event["Records"]
+
+ with processor(records, record_handler):
+ processor.process()
+
+ return handler(event, context)
diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py
new file mode 100644
index 00000000000..3e456eacec4
--- /dev/null
+++ b/aws_lambda_powertools/utilities/batch/exceptions.py
@@ -0,0 +1,7 @@
+"""
+Batch processing exceptions
+"""
+
+
+class SQSBatchProcessingError(Exception):
+ """When at least one message within a batch could not be processed"""
diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py
deleted file mode 100644
index 7ea84e0ce02..00000000000
--- a/aws_lambda_powertools/utilities/batch/middlewares.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-
-"""
-Middlewares for batch utilities
-"""
-
-from typing import Callable, Dict
-
-from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
-
-from .base import BasePartialProcessor
-
-
-@lambda_handler_decorator
-def batch_processor(
- handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
-):
- """
- Middleware to handle batch event processing
-
- Parameters
- ----------
- handler: Callable
- Lambda's handler
- event: Dict
- Lambda's Event
- context: Dict
- Lambda's Context
- record_handler: Callable
- Callable to process each record from the batch
- processor: PartialSQSProcessor
- Batch Processor to handle partial failure cases
-
- Examples
- --------
- **Processes Lambda's event with PartialSQSProcessor**
- >>> from aws_lambda_powertools.utilities.batch import batch_processor
- >>>
- >>> def record_handler(record):
- >>> return record["body"]
- >>>
- >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
- >>> def handler(event, context):
- >>> return {"StatusCode": 200}
-
- Limitations
- -----------
- * Async batch processors
-
- """
- records = event["Records"]
-
- with processor(records, record_handler):
- processor.process()
-
- return handler(event, context)
diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py
index ac0a1baa711..4a4aa9c98b1 100644
--- a/aws_lambda_powertools/utilities/batch/sqs.py
+++ b/aws_lambda_powertools/utilities/batch/sqs.py
@@ -3,26 +3,34 @@
"""
Batch SQS utilities
"""
-from typing import List, Optional, Tuple
+import logging
+from typing import Callable, Dict, List, Optional, Tuple
import boto3
from botocore.config import Config
+from ...middleware_factory import lambda_handler_decorator
from .base import BasePartialProcessor
+from .exceptions import SQSBatchProcessingError
+
+logger = logging.getLogger(__name__)
class PartialSQSProcessor(BasePartialProcessor):
"""
Amazon SQS batch processor to delete successes from the Queue.
- Only the **special** case of partial failure is handled, thus a batch in
- which all records failed is **not** going to be removed from the queue, and
- the same is valid for a full success.
+ The whole batch will be processed, even if failures occur. After all records are processed,
+ SQSBatchProcessingError will be raised if there were any failures, causing messages to
+ be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.
Parameters
----------
config: Config
botocore config object
+ suppress_exception: bool, optional
+ Supress exception raised if any messages fail processing, by default False
+
Example
-------
@@ -46,12 +54,13 @@ class PartialSQSProcessor(BasePartialProcessor):
>>> return result
"""
- def __init__(self, config: Optional[Config] = None):
+ def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False):
"""
Initializes sqs client.
"""
config = config or Config()
self.client = boto3.client("sqs", config=config)
+ self.suppress_exception = suppress_exception
super().__init__()
@@ -97,10 +106,76 @@ def _clean(self):
"""
Delete messages from Queue in case of partial failure.
"""
- if not (self.fail_messages and self.success_messages):
+ # If all messages were successful, fall back to the default SQS -
+ # Lambda behaviour which deletes messages if Lambda responds successfully
+ if not self.fail_messages:
+ logger.debug(f"All {len(self.success_messages)} records successfully processed")
return
queue_url = self._get_queue_url()
entries_to_remove = self._get_entries_to_clean()
- return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
+ delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
+
+ if self.suppress_exception:
+ logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
+ else:
+ logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception")
+ raise SQSBatchProcessingError(list(self.exceptions))
+
+ return delete_message_response
+
+
+@lambda_handler_decorator
+def sqs_batch_processor(
+ handler: Callable,
+ event: Dict,
+ context: Dict,
+ record_handler: Callable,
+ config: Optional[Config] = None,
+ suppress_exception: bool = False,
+):
+ """
+ Middleware to handle SQS batch event processing
+
+ Parameters
+ ----------
+ handler: Callable
+ Lambda's handler
+ event: Dict
+ Lambda's Event
+ context: Dict
+ Lambda's Context
+ record_handler: Callable
+ Callable to process each record from the batch
+ config: Config
+ botocore config object
+ suppress_exception: bool, optional
+ Supress exception raised if any messages fail processing, by default False
+
+ Examples
+ --------
+ **Processes Lambda's event with PartialSQSProcessor**
+ >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor
+ >>>
+ >>> def record_handler(record):
+ >>> return record["body"]
+ >>>
+ >>> @sqs_batch_processor(record_handler=record_handler)
+ >>> def handler(event, context):
+ >>> return {"StatusCode": 200}
+
+ Limitations
+ -----------
+ * Async batch processors
+
+ """
+ config = config or Config()
+ processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception)
+
+ records = event["Records"]
+
+ with processor(records, record_handler):
+ processor.process()
+
+ return handler(event, context)
diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx
index 608d958f0b5..a5ed8c90ff6 100644
--- a/docs/content/utilities/batch.mdx
+++ b/docs/content/utilities/batch.mdx
@@ -1,72 +1,193 @@
---
-title: Batch
+title: SQS Batch Processing
description: Utility
---
import Note from "../../src/components/Note"
-One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once.
-
-The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (`BasePartialProcessor`) allowing you to create your **own** batch processor.
+The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
**Key Features**
-* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure;
-* Build your own batch processor using the base classes.
+* Prevent succesfully processed messages being returned to SQS
+* Simple interface for individually processing messages from a batch
+* Build your own batch processor using the base classes
+
+**Background**
+
+When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS.
+
+If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function is triggered with the same batch one more time.
+
+With this utility, messages within a batch are handled individually - only messages that were not successfully processed
+are returned to the queue.
+
+
+ While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible.
+
+ More details on how Lambda works with SQS can be found in the AWS documentation
+
+
**IAM Permissions**
-This utility requires additional permissions to work as expected. See the following table:
+This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission.
+
+## Processing messages from SQS
+
+You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager.
+
+They have nearly the same behaviour when it comes to processing messages from the batch:
+
+* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
+* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will:
+ - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
+ - **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue
+
+The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need.
+
+## Record Handler
+
+Both decorator and context managers require an explicit function to process the batch of messages - namely `record_handler` parameter.
+
+This function is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent.
+
+**Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion.
+
+### sqs_batch_processor decorator
+
+When using the this decorator, you need provide a function via `record_handler` param that will process individual messages from the batch - It should raise an exception if it is unable to process the record.
+
+All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
+
+* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch`
+* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue
+
+
+ You will not have accessed to the processed messages within the Lambda Handler - all processing logic will and should be performed by the record_handler
function.
+
+
+```python:title=app.py
+from aws_lambda_powertools.utilities.batch import sqs_batch_processor
+
+def record_handler(record):
+ # This will be called for each individual message from a batch
+ # It should raise an exception if the message was not processed successfully
+ return_value = do_something_with(record["body"])
+ return return_value
+
+@sqs_batch_processor(record_handler=record_handler)
+def lambda_handler(event, context):
+ return {"statusCode": 200}
+```
+
+### PartialSQSProcessor context manager
+
+If you require access to the result of processed messages, you can use this context manager.
+
+The result from calling `process()` on the context manager will be a list of all the return values from your `record_handler` function.
+
+```python:title=app.py
+from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
+
+def record_handler(record):
+ # This will be called for each individual message from a batch
+ # It should raise an exception if the message was not processed successfully
+ return_value = do_something_with(record["body"])
+ return return_value
-Processor | Function/Method | IAM Permission
-|---------|-----------------|---------------|
-PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch`
-### PartialSQSProcessor
+def lambda_handler(event, context):
+ records = event["Records"]
-SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations.
+ processor = PartialSQSProcessor()
-As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the **whole** batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your [configuration][3], until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency.
+ with processor(records, record_handler) as proc:
+ result = proc.process() # Returns a list of all results from record_handler
-A *naive* approach to solving this problem is to delete successful records from the queue before redriving's phase. The `PartialSQSProcessor` class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures occurred during lambda's execution. Two examples are provided below, displaying the behavior of this class.
+ return result
+```
-**Examples:**
+## Passing custom boto3 config
-```python:title=context_manager.py
-from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
+If you need to pass custom configuration such as region to the SDK, you can pass your own [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) to
+the `sqs_batch_processor` decorator:
+
+```python:title=app.py
+from aws_lambda_powertools.utilities.batch import sqs_batch_processor
+from botocore.config import Config
+
+config = Config(region_name="us-east-1") # highlight-line
def record_handler(record):
- return record["body"]
+ # This will be called for each individual message from a batch
+ # It should raise an exception if the message was not processed successfully
+ return_value = do_something_with(record["body"])
+ return return_value
+
+@sqs_batch_processor(record_handler=record_handler, config=config) # highlight-line
+def lambda_handler(event, context):
+ return {"statusCode": 200}
+```
+
+Or to the `PartialSQSProcessor` class:
+```python:title=app.py
+from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
+
+from botocore.config import Config
+
+config = Config(region_name="us-east-1") # highlight-line
+
+def record_handler(record):
+ # This will be called for each individual message from a batch
+ # It should raise an exception if the message was not processed successfully
+ return_value = do_something_with(record["body"])
+ return return_value
+
def lambda_handler(event, context):
records = event["Records"]
- # highlight-start
+ processor = PartialSQSProcessor(config=config) # highlight-line
+
with processor(records, record_handler):
result = processor.process()
- # highlight-end
return result
```
-```python:title=middleware.py
-from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
-def record_handler(record):
- return record["body"]
+## Suppressing exceptions
+
+If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument.
+
+**Within the decorator**
-# highlight-start
-@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
-# highlight-end
+```python:title=app.py
+...
+@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) # highlight-line
def lambda_handler(event, context):
return {"statusCode": 200}
```
+**Within the context manager**
+
+```python:title=app.py
+processor = PartialSQSProcessor(config=config, suppress_exception=True) # highlight-line
+
+with processor(records, record_handler):
+ result = processor.process()
+```
+
## Create your own partial processor
You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`.
-All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively.
+* **`_process_record()`** - Handles all processing logic for each individual message of a batch, including calling the `record_handler` (self.handler)
+* **`_prepare()`** - Called once as part of the processor initialization
+* **`clean()`** - Teardown logic called once after `_process_record` completes
+
+You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function.
**Example:**
@@ -75,19 +196,18 @@ from random import randint
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
import boto3
+import os
-def record_handler(record):
- return randint(0, 100)
-
+table_name = os.getenv("TABLE_NAME", "table_not_found")
class MyPartialProcessor(BasePartialProcessor):
"""
- Process a record and stores successful results at a DDB Table
+ Process a record and stores successful results at a Amazon DynamoDB Table
Parameters
----------
table_name: str
- Table name to write results
+ DynamoDB table name to write results to
"""
def __init__(self, table_name: str):
@@ -113,9 +233,10 @@ class MyPartialProcessor(BasePartialProcessor):
def _process_record(self, record):
# It handles how your record is processed
# Here we're keeping the status of each run
+ # where self.handler is the record_handler function passed as an argument
# E.g.:
try:
- result = self.handler(record)
+ result = self.handler(record) # record_handler passed to decorator/context manager
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)
@@ -127,11 +248,10 @@ class MyPartialProcessor(BasePartialProcessor):
return entry
-@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table"))
+def record_handler(record):
+ return randint(0, 100)
+
+@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
def lambda_handler(event, context):
return {"statusCode": 200}
```
-
-[1]: https://aws.amazon.com/eventbridge/integrations/
-[2]: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html
-[3]: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py
index 342d119629b..f56a172637a 100644
--- a/tests/functional/test_utilities_batch.py
+++ b/tests/functional/test_utilities_batch.py
@@ -1,10 +1,12 @@
from typing import Callable
+from unittest.mock import patch
import pytest
from botocore.config import Config
from botocore.stub import Stubber
-from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor
+from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor
+from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError
@pytest.fixture(scope="module")
@@ -46,6 +48,25 @@ def partial_processor(config) -> PartialSQSProcessor:
return PartialSQSProcessor(config=config)
+@pytest.fixture(scope="function")
+def partial_processor_suppressed(config) -> PartialSQSProcessor:
+ return PartialSQSProcessor(config=config, suppress_exception=True)
+
+
+@pytest.fixture(scope="function")
+def stubbed_partial_processor(config) -> PartialSQSProcessor:
+ processor = PartialSQSProcessor(config=config)
+ with Stubber(processor.client) as stubber:
+ yield stubber, processor
+
+
+@pytest.fixture(scope="function")
+def stubbed_partial_processor_suppressed(config) -> PartialSQSProcessor:
+ processor = PartialSQSProcessor(config=config, suppress_exception=True)
+ with Stubber(processor.client) as stubber:
+ yield stubber, processor
+
+
def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor):
"""
Test processor with one failing record
@@ -60,16 +81,13 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha
with Stubber(partial_processor.client) as stubber:
stubber.add_response("delete_message_batch", response)
- with partial_processor(records, record_handler) as ctx:
- result = ctx.process()
+ with pytest.raises(SQSBatchProcessingError) as error:
+ with partial_processor(records, record_handler) as ctx:
+ ctx.process()
+ assert len(error.value.args[0]) == 1
stubber.assert_no_pending_responses()
- assert result == [
- ("fail", ("Failed to process record.",), fail_record),
- ("success", success_record["body"], success_record),
- ]
-
def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor):
"""
@@ -118,17 +136,43 @@ def lambda_handler(event, context):
fail_record = sqs_event_factory("fail")
- event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]}
+ event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]}
response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
with Stubber(partial_processor.client) as stubber:
stubber.add_response("delete_message_batch", response)
+ with pytest.raises(SQSBatchProcessingError) as error:
+ lambda_handler(event, {})
- result = lambda_handler(event, {})
-
+ assert len(error.value.args[0]) == 2
stubber.assert_no_pending_responses()
- assert result is True
+
+@patch("aws_lambda_powertools.utilities.batch.sqs.PartialSQSProcessor")
+def test_sqs_batch_processor_middleware(
+ patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor
+):
+ """
+ Test middleware's integration with PartialSQSProcessor
+ """
+
+ @sqs_batch_processor(record_handler=record_handler)
+ def lambda_handler(event, context):
+ return True
+
+ stubber, processor = stubbed_partial_processor
+ patched_sqs_processor.return_value = processor
+
+ fail_record = sqs_event_factory("fail")
+
+ event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]}
+ response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
+ stubber.add_response("delete_message_batch", response)
+ with pytest.raises(SQSBatchProcessingError) as error:
+ lambda_handler(event, {})
+
+ assert len(error.value.args[0]) == 1
+ stubber.assert_no_pending_responses()
def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config):
@@ -154,10 +198,80 @@ def lambda_handler(event, context):
with Stubber(processor.client) as stubber:
stubber.add_response("delete_message_batch", response)
+ with pytest.raises(SQSBatchProcessingError) as error:
+ lambda_handler(event, {})
+
+ stubber.assert_no_pending_responses()
+
+ assert len(error.value.args[0]) == 1
+ assert capsys.readouterr().out == "Oh no ! It's a failure.\n"
+
+
+def test_batch_processor_middleware_suppressed_exceptions(
+ sqs_event_factory, record_handler, partial_processor_suppressed
+):
+ """
+ Test middleware's integration with PartialSQSProcessor
+ """
+
+ @batch_processor(record_handler=record_handler, processor=partial_processor_suppressed)
+ def lambda_handler(event, context):
+ return True
+ fail_record = sqs_event_factory("fail")
+
+ event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]}
+ response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
+
+ with Stubber(partial_processor_suppressed.client) as stubber:
+ stubber.add_response("delete_message_batch", response)
result = lambda_handler(event, {})
stubber.assert_no_pending_responses()
+ assert result is True
+
+def test_partial_sqs_processor_suppressed_exceptions(sqs_event_factory, record_handler, partial_processor_suppressed):
+ """
+ Test processor without failure
+ """
+
+ first_record = sqs_event_factory("success")
+ second_record = sqs_event_factory("fail")
+ records = [first_record, second_record]
+
+ fail_record = sqs_event_factory("fail")
+ response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
+
+ with Stubber(partial_processor_suppressed.client) as stubber:
+ stubber.add_response("delete_message_batch", response)
+ with partial_processor_suppressed(records, record_handler) as ctx:
+ ctx.process()
+
+ assert partial_processor_suppressed.success_messages == [first_record]
+
+
+@patch("aws_lambda_powertools.utilities.batch.sqs.PartialSQSProcessor")
+def test_sqs_batch_processor_middleware_suppressed_exception(
+ patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor_suppressed
+):
+ """
+ Test middleware's integration with PartialSQSProcessor
+ """
+
+ @sqs_batch_processor(record_handler=record_handler)
+ def lambda_handler(event, context):
+ return True
+
+ stubber, processor = stubbed_partial_processor_suppressed
+ patched_sqs_processor.return_value = processor
+
+ fail_record = sqs_event_factory("fail")
+
+ event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]}
+ response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []}
+ stubber.add_response("delete_message_batch", response)
+ result = lambda_handler(event, {})
+
+ stubber.assert_no_pending_responses()
assert result is True
- assert capsys.readouterr().out == "Oh no ! It's a failure.\n"
diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py
index 054cc4099df..136e6ff2e8c 100644
--- a/tests/unit/test_utilities_batch.py
+++ b/tests/unit/test_utilities_batch.py
@@ -2,6 +2,7 @@
from botocore.config import Config
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
+from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError
@pytest.fixture(scope="function")
@@ -126,8 +127,8 @@ def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor):
entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean
client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True)
-
- partial_sqs_processor._clean()
+ with pytest.raises(SQSBatchProcessingError):
+ partial_sqs_processor._clean()
client_mock.delete_message_batch.assert_called_once_with(
QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean