From 11e385d7c1dc6c6c0fc56446dd0ec117df757871 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Wed, 2 Sep 2020 17:23:47 +0200 Subject: [PATCH 1/9] chore: add sqs_batch_processor decorator to simplify interface --- aws_lambda_powertools/utilities/batch/__init__.py | 8 ++------ aws_lambda_powertools/utilities/batch/middlewares.py | 6 +++++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 068cdaa9ee9..92b0383b529 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -5,11 +5,7 @@ """ from .base import BasePartialProcessor -from .middlewares import batch_processor +from .middlewares import batch_processor, sqs_batch_processor from .sqs import PartialSQSProcessor -__all__ = ( - "BasePartialProcessor", - "PartialSQSProcessor", - "batch_processor", -) +__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor") diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index 7ea84e0ce02..5de56eb2974 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -3,12 +3,13 @@ """ Middlewares for batch utilities """ - +import functools from typing import Callable, Dict from aws_lambda_powertools.middleware_factory import lambda_handler_decorator from .base import BasePartialProcessor +from .sqs import PartialSQSProcessor @lambda_handler_decorator @@ -54,3 +55,6 @@ def batch_processor( processor.process() return handler(event, context) + + +sqs_batch_processor = functools.partial(batch_processor, processor=PartialSQSProcessor()) From 9575b4c77ede7eb0f5dda5c7f7a211f9457e9de9 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Wed, 2 Sep 2020 17:25:33 +0200 Subject: [PATCH 2/9] docs: simplify documentation more SQS specific focus Update for sqs_batch_processor interface --- docs/content/utilities/batch.mdx | 88 ++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 37 deletions(-) diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index 608d958f0b5..961ed832c25 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -1,65 +1,81 @@ --- -title: Batch +title: SQS Batch Processing description: Utility --- import Note from "../../src/components/Note" -One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once. - -The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (`BasePartialProcessor`) allowing you to create your **own** batch processor. +The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. **Key Features** -* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure; -* Build your own batch processor using the base classes. +* Prevent succesfully processed messages being returned to SQS +* Simple interface for individually processing messages from a batch +* Build your own batch processor using the base classes -**IAM Permissions** +**Background** + +When using SQS as a Lambda event source mapping, functions can be triggered with a batch of messages from SQS. If the Lambda function fails when processing the batch, +all messages in the batch will be returned to the queue. With this utility, messages within a batch are handled individually - only messages that were not successfully processed +are returned to the queue. More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). -This utility requires additional permissions to work as expected. See the following table: + + While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible. +
-Processor | Function/Method | IAM Permission -|---------|-----------------|---------------| -PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch` -### PartialSQSProcessor +**IAM Permissions** + +This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission. -SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations. +## Processing messages from SQS -As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the **whole** batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your [configuration][3], until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency. +There are 2 ways to use this utility for processing SQS messages: -A *naive* approach to solving this problem is to delete successful records from the queue before redriving's phase. The `PartialSQSProcessor` class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures occurred during lambda's execution. Two examples are provided below, displaying the behavior of this class. +**With a decorator:** -**Examples:** +Using the `sqs_batch_processor` decorator with your lambda handler function, you provide a `record_handler` which is responsible for processing individual messages. It should raise an exception if +it is unable to process the record - this will lead to the message returning to the queue. If the function does not return an exception, the message will be deleted from the queue. When using the decorator, you +will not have accessed to the processed messages within the lambda handler - all processing should be handled from the `record_handler` function. -```python:title=context_manager.py -from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor +```python:title=app.py +from aws_lambda_powertools.utilities.batch import sqs_batch_processor def record_handler(record): - return record["body"] + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value +@sqs_batch_processor(record_handler=record_handler) def lambda_handler(event, context): - records = event["Records"] + return {"statusCode": 200} +``` - # highlight-start - with processor(records, record_handler): - result = processor.process() - # highlight-end +**With a context manager:** - return result -``` +If you require access to the result of processed messages, you can use the context manager. The result from calling `process()` on the context manager will be a list of +all the return values from your `record_handler` function. -```python:title=middleware.py -from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor +```python:title=app.py +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor def record_handler(record): - return record["body"] + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + -# highlight-start -@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) -# highlight-end def lambda_handler(event, context): - return {"statusCode": 200} + records = event["Records"] + + processor = PartialSQSProcessor() + + with processor(records, record_handler): + result = processor.process() # Returns a list of all results from record_handler + + return result ``` ## Create your own partial processor @@ -68,6 +84,8 @@ You can create your own partial batch processor by inheriting the `BasePartialPr All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively. +You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. + **Example:** ```python:title=custom_processor.py @@ -131,7 +149,3 @@ class MyPartialProcessor(BasePartialProcessor): def lambda_handler(event, context): return {"statusCode": 200} ``` - -[1]: https://aws.amazon.com/eventbridge/integrations/ -[2]: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html -[3]: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html From 6619ff85f18d3221833dfdd81d778fdcd3281af5 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Wed, 2 Sep 2020 18:23:16 +0200 Subject: [PATCH 3/9] fix: add sqs_batch_processor as its own method --- .../utilities/batch/middlewares.py | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index 5de56eb2974..fadf1f529b3 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -3,8 +3,9 @@ """ Middlewares for batch utilities """ -import functools -from typing import Callable, Dict +from typing import Callable, Dict, Optional + +from botocore.config import Config from aws_lambda_powertools.middleware_factory import lambda_handler_decorator @@ -57,4 +58,49 @@ def batch_processor( return handler(event, context) -sqs_batch_processor = functools.partial(batch_processor, processor=PartialSQSProcessor()) +@lambda_handler_decorator +def sqs_batch_processor( + handler: Callable, event: Dict, context: Dict, record_handler: Callable, config: Optional[Config] = None +): + """ + Middleware to handle batch event processing + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: Dict + Lambda's Context + record_handler: Callable + Callable to process each record from the batch + config: Config + botocore config object + + Examples + -------- + **Processes Lambda's event with PartialSQSProcessor** + >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @sqs_batch_processor(record_handler=record_handler) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + + Limitations + ----------- + * Async batch processors + + """ + config = config or Config() + processor = PartialSQSProcessor(config=config) + + records = event["Records"] + + with processor(records, record_handler): + processor.process() + + return handler(event, context) From 8517dd1e760d13d177c31bd360c5bd107a4c6b60 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Wed, 2 Sep 2020 21:25:04 +0200 Subject: [PATCH 4/9] chore: add test for sqs_batch_processor interface --- tests/functional/test_utilities_batch.py | 36 +++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 342d119629b..bcc93b6e1a3 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,10 +1,11 @@ from typing import Callable +from unittest.mock import patch import pytest from botocore.config import Config from botocore.stub import Stubber -from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor @pytest.fixture(scope="module") @@ -46,6 +47,13 @@ def partial_processor(config) -> PartialSQSProcessor: return PartialSQSProcessor(config=config) +@pytest.fixture(scope="function") +def stubbed_partial_processor(config) -> PartialSQSProcessor: + processor = PartialSQSProcessor(config=config) + with Stubber(processor.client) as stubber: + yield stubber, processor + + def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor): """ Test processor with one failing record @@ -131,6 +139,32 @@ def lambda_handler(event, context): assert result is True +@patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor") +def test_sqs_batch_processor_middleware( + patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor +): + """ + Test middleware's integration with PartialSQSProcessor + """ + + @sqs_batch_processor(record_handler=record_handler) + def lambda_handler(event, context): + return True + + stubber, processor = stubbed_partial_processor + patched_sqs_processor.return_value = processor + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + stubber.add_response("delete_message_batch", response) + result = lambda_handler(event, {}) + stubber.assert_no_pending_responses() + + assert result is True + + def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config): """ Test middlewares' integration with custom batch processor From 64c709312a4995c13960312b2ef054dbf7e98132 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Thu, 3 Sep 2020 12:49:56 +0200 Subject: [PATCH 5/9] fix: throw exception by default if messages processing fails --- aws_lambda_powertools/utilities/batch/base.py | 2 + .../utilities/batch/exceptions.py | 7 ++ .../utilities/batch/middlewares.py | 13 +- aws_lambda_powertools/utilities/batch/sqs.py | 24 +++- tests/functional/test_utilities_batch.py | 112 +++++++++++++++--- tests/unit/test_utilities_batch.py | 5 +- 6 files changed, 136 insertions(+), 27 deletions(-) create mode 100644 aws_lambda_powertools/utilities/batch/exceptions.py diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index a184a879441..631b24c9526 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -16,6 +16,7 @@ class BasePartialProcessor(ABC): def __init__(self): self.success_messages: List = [] self.fail_messages: List = [] + self.exceptions: List = [] @abstractmethod def _prepare(self): @@ -89,5 +90,6 @@ def failure_handler(self, record: Any, exception: Exception): "fail", exceptions args, original record """ entry = ("fail", exception.args, record) + self.exceptions.append(exception) self.fail_messages.append(record) return entry diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py new file mode 100644 index 00000000000..3e456eacec4 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/exceptions.py @@ -0,0 +1,7 @@ +""" +Batch processing exceptions +""" + + +class SQSBatchProcessingError(Exception): + """When at least one message within a batch could not be processed""" diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index fadf1f529b3..830016d7e5d 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -60,10 +60,15 @@ def batch_processor( @lambda_handler_decorator def sqs_batch_processor( - handler: Callable, event: Dict, context: Dict, record_handler: Callable, config: Optional[Config] = None + handler: Callable, + event: Dict, + context: Dict, + record_handler: Callable, + config: Optional[Config] = None, + suppress_exception: bool = False, ): """ - Middleware to handle batch event processing + Middleware to handle SQS batch event processing Parameters ---------- @@ -77,6 +82,8 @@ def sqs_batch_processor( Callable to process each record from the batch config: Config botocore config object + suppress_exception: bool, optional + Supress exception raised if any messages fail processing, by default False Examples -------- @@ -96,7 +103,7 @@ def sqs_batch_processor( """ config = config or Config() - processor = PartialSQSProcessor(config=config) + processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception) records = event["Records"] diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index ac0a1baa711..072a6d72e33 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -9,20 +9,24 @@ from botocore.config import Config from .base import BasePartialProcessor +from .exceptions import SQSBatchProcessingError class PartialSQSProcessor(BasePartialProcessor): """ Amazon SQS batch processor to delete successes from the Queue. - Only the **special** case of partial failure is handled, thus a batch in - which all records failed is **not** going to be removed from the queue, and - the same is valid for a full success. + The whole batch will be processed, even if failures occur. After all records are processed, + SQSBatchProcessingError will be raised if there were any failures, causing messages to + be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception. Parameters ---------- config: Config botocore config object + suppress_exception: bool, optional + Supress exception raised if any messages fail processing, by default False + Example ------- @@ -46,12 +50,13 @@ class PartialSQSProcessor(BasePartialProcessor): >>> return result """ - def __init__(self, config: Optional[Config] = None): + def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False): """ Initializes sqs client. """ config = config or Config() self.client = boto3.client("sqs", config=config) + self.suppress_exception = suppress_exception super().__init__() @@ -97,10 +102,17 @@ def _clean(self): """ Delete messages from Queue in case of partial failure. """ - if not (self.fail_messages and self.success_messages): + # If all messages were successful, fall back to the default SQS - + # Lambda behaviour which deletes messages if Lambda responds successfully + if not self.fail_messages: return queue_url = self._get_queue_url() entries_to_remove = self._get_entries_to_clean() - return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) + delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) + + if self.fail_messages and not self.suppress_exception: + raise SQSBatchProcessingError(list(self.exceptions)) + + return delete_message_response diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index bcc93b6e1a3..19fec24b50f 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -6,6 +6,7 @@ from botocore.stub import Stubber from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor +from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError @pytest.fixture(scope="module") @@ -47,6 +48,11 @@ def partial_processor(config) -> PartialSQSProcessor: return PartialSQSProcessor(config=config) +@pytest.fixture(scope="function") +def partial_processor_suppressed(config) -> PartialSQSProcessor: + return PartialSQSProcessor(config=config, suppress_exception=True) + + @pytest.fixture(scope="function") def stubbed_partial_processor(config) -> PartialSQSProcessor: processor = PartialSQSProcessor(config=config) @@ -54,6 +60,13 @@ def stubbed_partial_processor(config) -> PartialSQSProcessor: yield stubber, processor +@pytest.fixture(scope="function") +def stubbed_partial_processor_suppressed(config) -> PartialSQSProcessor: + processor = PartialSQSProcessor(config=config, suppress_exception=True) + with Stubber(processor.client) as stubber: + yield stubber, processor + + def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor): """ Test processor with one failing record @@ -68,16 +81,13 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha with Stubber(partial_processor.client) as stubber: stubber.add_response("delete_message_batch", response) - with partial_processor(records, record_handler) as ctx: - result = ctx.process() + with pytest.raises(SQSBatchProcessingError) as error: + with partial_processor(records, record_handler) as ctx: + ctx.process() + assert len(error.value.args[0]) == 1 stubber.assert_no_pending_responses() - assert result == [ - ("fail", ("Failed to process record.",), fail_record), - ("success", success_record["body"], success_record), - ] - def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor): """ @@ -126,18 +136,17 @@ def lambda_handler(event, context): fail_record = sqs_event_factory("fail") - event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]} response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} with Stubber(partial_processor.client) as stubber: stubber.add_response("delete_message_batch", response) + with pytest.raises(SQSBatchProcessingError) as error: + lambda_handler(event, {}) - result = lambda_handler(event, {}) - + assert len(error.value.args[0]) == 2 stubber.assert_no_pending_responses() - assert result is True - @patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor") def test_sqs_batch_processor_middleware( @@ -159,10 +168,11 @@ def lambda_handler(event, context): event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} stubber.add_response("delete_message_batch", response) - result = lambda_handler(event, {}) - stubber.assert_no_pending_responses() + with pytest.raises(SQSBatchProcessingError) as error: + lambda_handler(event, {}) - assert result is True + assert len(error.value.args[0]) == 1 + stubber.assert_no_pending_responses() def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config): @@ -188,10 +198,80 @@ def lambda_handler(event, context): with Stubber(processor.client) as stubber: stubber.add_response("delete_message_batch", response) + with pytest.raises(SQSBatchProcessingError) as error: + lambda_handler(event, {}) + + stubber.assert_no_pending_responses() + + assert len(error.value.args[0]) == 1 + assert capsys.readouterr().out == "Oh no ! It's a failure.\n" + + +def test_batch_processor_middleware_suppressed_exceptions( + sqs_event_factory, record_handler, partial_processor_suppressed +): + """ + Test middleware's integration with PartialSQSProcessor + """ + + @batch_processor(record_handler=record_handler, processor=partial_processor_suppressed) + def lambda_handler(event, context): + return True + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + with Stubber(partial_processor_suppressed.client) as stubber: + stubber.add_response("delete_message_batch", response) result = lambda_handler(event, {}) stubber.assert_no_pending_responses() + assert result is True + + +def test_partial_sqs_processor_suppressed_exceptions(sqs_event_factory, record_handler, partial_processor_suppressed): + """ + Test processor without failure + """ + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("fail") + records = [first_record, second_record] + + fail_record = sqs_event_factory("fail") + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + + with Stubber(partial_processor_suppressed.client) as stubber: + stubber.add_response("delete_message_batch", response) + with partial_processor_suppressed(records, record_handler) as ctx: + ctx.process() + + assert partial_processor_suppressed.success_messages == [first_record] + + +@patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor") +def test_sqs_batch_processor_middleware_suppressed_exception( + patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor_suppressed +): + """ + Test middleware's integration with PartialSQSProcessor + """ + + @sqs_batch_processor(record_handler=record_handler) + def lambda_handler(event, context): + return True + + stubber, processor = stubbed_partial_processor_suppressed + patched_sqs_processor.return_value = processor + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + stubber.add_response("delete_message_batch", response) + result = lambda_handler(event, {}) + + stubber.assert_no_pending_responses() assert result is True - assert capsys.readouterr().out == "Oh no ! It's a failure.\n" diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py index 054cc4099df..136e6ff2e8c 100644 --- a/tests/unit/test_utilities_batch.py +++ b/tests/unit/test_utilities_batch.py @@ -2,6 +2,7 @@ from botocore.config import Config from aws_lambda_powertools.utilities.batch import PartialSQSProcessor +from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError @pytest.fixture(scope="function") @@ -126,8 +127,8 @@ def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor): entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True) - - partial_sqs_processor._clean() + with pytest.raises(SQSBatchProcessingError): + partial_sqs_processor._clean() client_mock.delete_message_batch.assert_called_once_with( QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean From 24acda9e9c0d4e5e4026a83816c6c63975903678 Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Thu, 3 Sep 2020 13:08:30 +0200 Subject: [PATCH 6/9] docs: add detail to batch processing --- docs/content/utilities/batch.mdx | 76 +++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index 961ed832c25..339edcc925a 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -35,8 +35,9 @@ There are 2 ways to use this utility for processing SQS messages: **With a decorator:** Using the `sqs_batch_processor` decorator with your lambda handler function, you provide a `record_handler` which is responsible for processing individual messages. It should raise an exception if -it is unable to process the record - this will lead to the message returning to the queue. If the function does not return an exception, the message will be deleted from the queue. When using the decorator, you -will not have accessed to the processed messages within the lambda handler - all processing should be handled from the `record_handler` function. +it is unable to process the record. All records in the batch will be passed to this handler for processing, even if exceptions are thrown. After all messages are processed, any successfully processed +ones will be deleted from the queue. If there were any messages the `record_handler` couldn't process, `SQSBatchProcessingError` will be raised. You will not have accessed to the _processed_ messages +within the lambda handler - all processing logic should be performed by the `record_handler` function. ```python:title=app.py from aws_lambda_powertools.utilities.batch import sqs_batch_processor @@ -78,6 +79,77 @@ def lambda_handler(event, context): return result ``` +## Passing custom boto3 config + +If you need to pass custom configuration such as region to the SDK, you can pass your own [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) to +the `sqs_batch_processor` decorator: + +```python:title=app.py +from aws_lambda_powertools.utilities.batch import sqs_batch_processor +from botocore.config import Config + +config = Config(region_name="us-east-1") # highlight-line + +def record_handler(record): + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + +@sqs_batch_processor(record_handler=record_handler, config=config) # highlight-line +def lambda_handler(event, context): + return {"statusCode": 200} +``` + +Or to the `PartialSQSProcessor` class: +```python:title=app.py +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + +from botocore.config import Config + +config = Config(region_name="us-east-1") # highlight-line + +def record_handler(record): + # This will be called for each individual message from a batch + # It should raise an exception if the message was not processed successfully + return_value = do_something_with(record["body"]) + return return_value + + +def lambda_handler(event, context): + records = event["Records"] + + processor = PartialSQSProcessor(config=config) # highlight-line + + with processor(records, record_handler): + result = processor.process() + + return result +``` + + +## Suppressing exceptions + +If you want to disable the defualt behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` argument. + + +If your Lambda function executes successfully and returns a response, all messages in the batch will be deleted from the queue. +
+ +```python:title=app.py +... +@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) # highlight-line +def lambda_handler(event, context): + return {"statusCode": 200} +``` +or +```python:title=app.py +processor = PartialSQSProcessor(config=config, suppress_exception=True) # highlight-line + +with processor(records, record_handler): + result = processor.process() +``` + ## Create your own partial processor You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. From 249b59522995d0914c8dea170c0ac38416f7eefc Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Thu, 3 Sep 2020 14:32:53 +0200 Subject: [PATCH 7/9] chore: remove middlewares module, moving decorator functionality to base and sqs --- .../utilities/batch/__init__.py | 5 +- aws_lambda_powertools/utilities/batch/base.py | 49 +++++++- .../utilities/batch/middlewares.py | 113 ------------------ aws_lambda_powertools/utilities/batch/sqs.py | 58 ++++++++- tests/functional/test_utilities_batch.py | 4 +- 5 files changed, 109 insertions(+), 120 deletions(-) delete mode 100644 aws_lambda_powertools/utilities/batch/middlewares.py diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 92b0383b529..d308a56abda 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -4,8 +4,7 @@ Batch processing utility """ -from .base import BasePartialProcessor -from .middlewares import batch_processor, sqs_batch_processor -from .sqs import PartialSQSProcessor +from .base import BasePartialProcessor, batch_processor +from .sqs import PartialSQSProcessor, sqs_batch_processor __all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor") diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 631b24c9526..3189c4c8d50 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -5,7 +5,9 @@ """ from abc import ABC, abstractmethod -from typing import Any, Callable, Iterable, List, Tuple +from typing import Any, Callable, Dict, Iterable, List, Tuple + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator class BasePartialProcessor(ABC): @@ -93,3 +95,48 @@ def failure_handler(self, record: Any, exception: Exception): self.exceptions.append(exception) self.fail_messages.append(record) return entry + + +@lambda_handler_decorator +def batch_processor( + handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None +): + """ + Middleware to handle batch event processing + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: Dict + Lambda's Context + record_handler: Callable + Callable to process each record from the batch + processor: PartialSQSProcessor + Batch Processor to handle partial failure cases + + Examples + -------- + **Processes Lambda's event with PartialSQSProcessor** + >>> from aws_lambda_powertools.utilities.batch import batch_processor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + + Limitations + ----------- + * Async batch processors + + """ + records = event["Records"] + + with processor(records, record_handler): + processor.process() + + return handler(event, context) diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py deleted file mode 100644 index 830016d7e5d..00000000000 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ /dev/null @@ -1,113 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -Middlewares for batch utilities -""" -from typing import Callable, Dict, Optional - -from botocore.config import Config - -from aws_lambda_powertools.middleware_factory import lambda_handler_decorator - -from .base import BasePartialProcessor -from .sqs import PartialSQSProcessor - - -@lambda_handler_decorator -def batch_processor( - handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None -): - """ - Middleware to handle batch event processing - - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: Dict - Lambda's Context - record_handler: Callable - Callable to process each record from the batch - processor: PartialSQSProcessor - Batch Processor to handle partial failure cases - - Examples - -------- - **Processes Lambda's event with PartialSQSProcessor** - >>> from aws_lambda_powertools.utilities.batch import batch_processor - >>> - >>> def record_handler(record): - >>> return record["body"] - >>> - >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) - >>> def handler(event, context): - >>> return {"StatusCode": 200} - - Limitations - ----------- - * Async batch processors - - """ - records = event["Records"] - - with processor(records, record_handler): - processor.process() - - return handler(event, context) - - -@lambda_handler_decorator -def sqs_batch_processor( - handler: Callable, - event: Dict, - context: Dict, - record_handler: Callable, - config: Optional[Config] = None, - suppress_exception: bool = False, -): - """ - Middleware to handle SQS batch event processing - - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: Dict - Lambda's Context - record_handler: Callable - Callable to process each record from the batch - config: Config - botocore config object - suppress_exception: bool, optional - Supress exception raised if any messages fail processing, by default False - - Examples - -------- - **Processes Lambda's event with PartialSQSProcessor** - >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor - >>> - >>> def record_handler(record): - >>> return record["body"] - >>> - >>> @sqs_batch_processor(record_handler=record_handler) - >>> def handler(event, context): - >>> return {"StatusCode": 200} - - Limitations - ----------- - * Async batch processors - - """ - config = config or Config() - processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception) - - records = event["Records"] - - with processor(records, record_handler): - processor.process() - - return handler(event, context) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 072a6d72e33..83f5b053048 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -3,11 +3,12 @@ """ Batch SQS utilities """ -from typing import List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Tuple import boto3 from botocore.config import Config +from ...middleware_factory import lambda_handler_decorator from .base import BasePartialProcessor from .exceptions import SQSBatchProcessingError @@ -116,3 +117,58 @@ def _clean(self): raise SQSBatchProcessingError(list(self.exceptions)) return delete_message_response + + +@lambda_handler_decorator +def sqs_batch_processor( + handler: Callable, + event: Dict, + context: Dict, + record_handler: Callable, + config: Optional[Config] = None, + suppress_exception: bool = False, +): + """ + Middleware to handle SQS batch event processing + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: Dict + Lambda's Context + record_handler: Callable + Callable to process each record from the batch + config: Config + botocore config object + suppress_exception: bool, optional + Supress exception raised if any messages fail processing, by default False + + Examples + -------- + **Processes Lambda's event with PartialSQSProcessor** + >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @sqs_batch_processor(record_handler=record_handler) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + + Limitations + ----------- + * Async batch processors + + """ + config = config or Config() + processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception) + + records = event["Records"] + + with processor(records, record_handler): + processor.process() + + return handler(event, context) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 19fec24b50f..f56a172637a 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -148,7 +148,7 @@ def lambda_handler(event, context): stubber.assert_no_pending_responses() -@patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor") +@patch("aws_lambda_powertools.utilities.batch.sqs.PartialSQSProcessor") def test_sqs_batch_processor_middleware( patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor ): @@ -251,7 +251,7 @@ def test_partial_sqs_processor_suppressed_exceptions(sqs_event_factory, record_h assert partial_processor_suppressed.success_messages == [first_record] -@patch("aws_lambda_powertools.utilities.batch.middlewares.PartialSQSProcessor") +@patch("aws_lambda_powertools.utilities.batch.sqs.PartialSQSProcessor") def test_sqs_batch_processor_middleware_suppressed_exception( patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor_suppressed ): From 592b5a8d7c34cc4b9eaa43664290130d923e1b2c Mon Sep 17 00:00:00 2001 From: Tom McCarthy Date: Thu, 3 Sep 2020 14:50:07 +0200 Subject: [PATCH 8/9] chore: add debug logging for sqs batch processing --- aws_lambda_powertools/utilities/batch/base.py | 4 ++++ aws_lambda_powertools/utilities/batch/sqs.py | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 3189c4c8d50..19b627704f9 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -4,11 +4,14 @@ Batch processing utilities """ +import logging from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Iterable, List, Tuple from aws_lambda_powertools.middleware_factory import lambda_handler_decorator +logger = logging.getLogger(__name__) + class BasePartialProcessor(ABC): """ @@ -92,6 +95,7 @@ def failure_handler(self, record: Any, exception: Exception): "fail", exceptions args, original record """ entry = ("fail", exception.args, record) + logger.debug("Record processing exception: ", exception) self.exceptions.append(exception) self.fail_messages.append(record) return entry diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 83f5b053048..4a4aa9c98b1 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -3,6 +3,7 @@ """ Batch SQS utilities """ +import logging from typing import Callable, Dict, List, Optional, Tuple import boto3 @@ -12,6 +13,8 @@ from .base import BasePartialProcessor from .exceptions import SQSBatchProcessingError +logger = logging.getLogger(__name__) + class PartialSQSProcessor(BasePartialProcessor): """ @@ -106,6 +109,7 @@ def _clean(self): # If all messages were successful, fall back to the default SQS - # Lambda behaviour which deletes messages if Lambda responds successfully if not self.fail_messages: + logger.debug(f"All {len(self.success_messages)} records successfully processed") return queue_url = self._get_queue_url() @@ -113,7 +117,10 @@ def _clean(self): delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) - if self.fail_messages and not self.suppress_exception: + if self.suppress_exception: + logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed") + else: + logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception") raise SQSBatchProcessingError(list(self.exceptions)) return delete_message_response From 285054dadefeee82ed82c1c2fda7db256c45db51 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 3 Sep 2020 16:44:10 +0200 Subject: [PATCH 9/9] docs: address readability feedbacks --- docs/content/utilities/batch.mdx | 88 ++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index 339edcc925a..a5ed8c90ff6 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -15,12 +15,17 @@ The SQS batch processing utility provides a way to handle partial failures when **Background** -When using SQS as a Lambda event source mapping, functions can be triggered with a batch of messages from SQS. If the Lambda function fails when processing the batch, -all messages in the batch will be returned to the queue. With this utility, messages within a batch are handled individually - only messages that were not successfully processed -are returned to the queue. More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). +When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS. + +If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function is triggered with the same batch one more time. + +With this utility, messages within a batch are handled individually - only messages that were not successfully processed +are returned to the queue. While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible. +

+ More details on how Lambda works with SQS can be found in the AWS documentation

@@ -30,14 +35,37 @@ This utility requires additional permissions to work as expected. Lambda functio ## Processing messages from SQS -There are 2 ways to use this utility for processing SQS messages: +You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager. + +They have nearly the same behaviour when it comes to processing messages from the batch: + +* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost +* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will: + - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` + - **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue + +The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need. + +## Record Handler + +Both decorator and context managers require an explicit function to process the batch of messages - namely `record_handler` parameter. -**With a decorator:** +This function is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent. -Using the `sqs_batch_processor` decorator with your lambda handler function, you provide a `record_handler` which is responsible for processing individual messages. It should raise an exception if -it is unable to process the record. All records in the batch will be passed to this handler for processing, even if exceptions are thrown. After all messages are processed, any successfully processed -ones will be deleted from the queue. If there were any messages the `record_handler` couldn't process, `SQSBatchProcessingError` will be raised. You will not have accessed to the _processed_ messages -within the lambda handler - all processing logic should be performed by the `record_handler` function. +**Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion. + +### sqs_batch_processor decorator + +When using the this decorator, you need provide a function via `record_handler` param that will process individual messages from the batch - It should raise an exception if it is unable to process the record. + +All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: + +* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch` +* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue + + + You will not have accessed to the processed messages within the Lambda Handler - all processing logic will and should be performed by the record_handler function. +
```python:title=app.py from aws_lambda_powertools.utilities.batch import sqs_batch_processor @@ -53,10 +81,11 @@ def lambda_handler(event, context): return {"statusCode": 200} ``` -**With a context manager:** +### PartialSQSProcessor context manager + +If you require access to the result of processed messages, you can use this context manager. -If you require access to the result of processed messages, you can use the context manager. The result from calling `process()` on the context manager will be a list of -all the return values from your `record_handler` function. +The result from calling `process()` on the context manager will be a list of all the return values from your `record_handler` function. ```python:title=app.py from aws_lambda_powertools.utilities.batch import PartialSQSProcessor @@ -73,8 +102,8 @@ def lambda_handler(event, context): processor = PartialSQSProcessor() - with processor(records, record_handler): - result = processor.process() # Returns a list of all results from record_handler + with processor(records, record_handler) as proc: + result = proc.process() # Returns a list of all results from record_handler return result ``` @@ -130,11 +159,9 @@ def lambda_handler(event, context): ## Suppressing exceptions -If you want to disable the defualt behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` argument. +If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument. - -If your Lambda function executes successfully and returns a response, all messages in the batch will be deleted from the queue. -
+**Within the decorator** ```python:title=app.py ... @@ -142,7 +169,9 @@ If your Lambda function executes successfully and returns a response, all messag def lambda_handler(event, context): return {"statusCode": 200} ``` -or + +**Within the context manager** + ```python:title=app.py processor = PartialSQSProcessor(config=config, suppress_exception=True) # highlight-line @@ -154,7 +183,9 @@ with processor(records, record_handler): You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. -All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively. +* **`_process_record()`** - Handles all processing logic for each individual message of a batch, including calling the `record_handler` (self.handler) +* **`_prepare()`** - Called once as part of the processor initialization +* **`clean()`** - Teardown logic called once after `_process_record` completes You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. @@ -165,19 +196,18 @@ from random import randint from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor import boto3 +import os -def record_handler(record): - return randint(0, 100) - +table_name = os.getenv("TABLE_NAME", "table_not_found") class MyPartialProcessor(BasePartialProcessor): """ - Process a record and stores successful results at a DDB Table + Process a record and stores successful results at a Amazon DynamoDB Table Parameters ---------- table_name: str - Table name to write results + DynamoDB table name to write results to """ def __init__(self, table_name: str): @@ -203,9 +233,10 @@ class MyPartialProcessor(BasePartialProcessor): def _process_record(self, record): # It handles how your record is processed # Here we're keeping the status of each run + # where self.handler is the record_handler function passed as an argument # E.g.: try: - result = self.handler(record) + result = self.handler(record) # record_handler passed to decorator/context manager return self.success_handler(record, result) except Exception as exc: return self.failure_handler(record, exc) @@ -217,7 +248,10 @@ class MyPartialProcessor(BasePartialProcessor): return entry -@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table")) +def record_handler(record): + return randint(0, 100) + +@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) def lambda_handler(event, context): return {"statusCode": 200} ```