diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py new file mode 100644 index 00000000000..068cdaa9ee9 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- + +""" +Batch processing utility +""" + +from .base import BasePartialProcessor +from .middlewares import batch_processor +from .sqs import PartialSQSProcessor + +__all__ = ( + "BasePartialProcessor", + "PartialSQSProcessor", + "batch_processor", +) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py new file mode 100644 index 00000000000..a184a879441 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- + +""" +Batch processing utilities +""" + +from abc import ABC, abstractmethod +from typing import Any, Callable, Iterable, List, Tuple + + +class BasePartialProcessor(ABC): + """ + Abstract class for batch processors. + """ + + def __init__(self): + self.success_messages: List = [] + self.fail_messages: List = [] + + @abstractmethod + def _prepare(self): + """ + Prepare context manager. + """ + raise NotImplementedError() + + @abstractmethod + def _clean(self): + """ + Clear context manager. + """ + raise NotImplementedError() + + @abstractmethod + def _process_record(self, record: Any): + """ + Process record with handler. + """ + raise NotImplementedError() + + def process(self) -> List[Tuple]: + """ + Call instance's handler for each record. + """ + return [self._process_record(record) for record in self.records] + + def __enter__(self): + self._prepare() + return self + + def __exit__(self, exception_type, exception_value, traceback): + self._clean() + + def __call__(self, records: Iterable[Any], handler: Callable): + """ + Set instance attributes before execution + + Parameters + ---------- + records: Iterable[Any] + Iterable with objects to be processed. + handler: Callable + Callable to process "records" entries. + """ + self.records = records + self.handler = handler + return self + + def success_handler(self, record: Any, result: Any): + """ + Success callback + + Returns + ------- + tuple + "success", result, original record + """ + entry = ("success", result, record) + self.success_messages.append(record) + return entry + + def failure_handler(self, record: Any, exception: Exception): + """ + Failure callback + + Returns + ------- + tuple + "fail", exceptions args, original record + """ + entry = ("fail", exception.args, record) + self.fail_messages.append(record) + return entry diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py new file mode 100644 index 00000000000..7ea84e0ce02 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +""" +Middlewares for batch utilities +""" + +from typing import Callable, Dict + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator + +from .base import BasePartialProcessor + + +@lambda_handler_decorator +def batch_processor( + handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None +): + """ + Middleware to handle batch event processing + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: Dict + Lambda's Context + record_handler: Callable + Callable to process each record from the batch + processor: PartialSQSProcessor + Batch Processor to handle partial failure cases + + Examples + -------- + **Processes Lambda's event with PartialSQSProcessor** + >>> from aws_lambda_powertools.utilities.batch import batch_processor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + + Limitations + ----------- + * Async batch processors + + """ + records = event["Records"] + + with processor(records, record_handler): + processor.process() + + return handler(event, context) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py new file mode 100644 index 00000000000..8bbb7e5ef77 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- + +""" +Batch SQS utilities +""" +from typing import List, Optional, Tuple + +import boto3 +from botocore.config import Config + +from .base import BasePartialProcessor + + +class PartialSQSProcessor(BasePartialProcessor): + """ + Amazon SQS batch processor to delete successes from the Queue. + + Only the **special** case of partial failure is handled, thus a batch in + which all records failed is **not** going to be removed from the queue, and + the same is valid for a full success. + + Parameters + ---------- + config: Config + botocore config object + + Example + ------- + **Process batch triggered by SQS** + + >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> def handler(event, context): + >>> records = event["Records"] + >>> processor = PartialSQSProcessor() + >>> + >>> with processor(records=records, handler=record_handler): + >>> result = processor.process() + >>> + >>> # Case a partial failure occurred, all successful executions + >>> # have been deleted from the queue after context's exit. + >>> + >>> return result + """ + + def __init__(self, config: Optional[Config] = None): + """ + Initializes sqs client. + """ + config = config or Config() + self.client = boto3.client("sqs", config=config) + + super().__init__() + + def _get_queue_url(self) -> str: + """ + Format QueueUrl from first records entry + """ + if not getattr(self, "records", None): + return + + *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") + return f"{self.client._endpoint.host}/{account_id}/{queue_name}" + + def _get_entries_to_clean(self) -> List: + """ + Format messages to use in batch deletion + """ + return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages] + + def _process_record(self, record) -> Tuple: + """ + Process a record with instance's handler + + Parameters + ---------- + record: Any + An object to be processed. + """ + try: + result = self.handler(record) + return self.success_handler(record, result) + except Exception as exc: + return self.failure_handler(record, exc) + + def _prepare(self): + """ + Remove results from previous execution. + """ + self.success_messages.clear() + self.fail_messages.clear() + + def _clean(self): + """ + Delete messages from Queue in case of partial failure. + """ + if not (self.fail_messages and self.success_messages): + return + + queue_url = self._get_queue_url() + entries_to_remove = self._get_entries_to_clean() + + return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) diff --git a/aws_lambda_powertools/utilities/parameters/secrets.py b/aws_lambda_powertools/utilities/parameters/secrets.py index 67cb94c340b..e3981d22bcc 100644 --- a/aws_lambda_powertools/utilities/parameters/secrets.py +++ b/aws_lambda_powertools/utilities/parameters/secrets.py @@ -27,7 +27,7 @@ class SecretsProvider(BaseProvider): >>> from aws_lambda_powertools.utilities.parameters import SecretsProvider >>> secrets_provider = SecretsProvider() >>> - >>> value secrets_provider.get("my-parameter") + >>> value = secrets_provider.get("my-parameter") >>> >>> print(value) My parameter value diff --git a/docs/content/index.mdx b/docs/content/index.mdx index 26ab367ba4c..fe13020fa23 100644 --- a/docs/content/index.mdx +++ b/docs/content/index.mdx @@ -24,6 +24,12 @@ Powertools is available in PyPi. You can use your favourite dependency managemen ```bash:title=hello_world.sh sam init --location https://github.com/aws-samples/cookiecutter-aws-sam-python ``` +* [Tracing](./core/tracer) - Decorators and utilities to trace Lambda function handlers, and both synchronous and asynchronous functions +* [Logging](./core/logger) - Structured logging made easier, and decorator to enrich structured logging with key Lambda context details +* [Metrics](./core/metrics) - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF) +* [Bring your own middleware](./utilities/middleware_factory) - Decorator factory to create your own middleware to run logic before, and after each Lambda invocation +* [Parameters utility](./utilities/parameters) - Retrieve parameter values from AWS Systems Manager Parameter Store, AWS Secrets Manager, or Amazon DynamoDB, and cache them for a specific amount of time +* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, handles partial failure. ### Lambda Layer diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx new file mode 100644 index 00000000000..ef2649e55e3 --- /dev/null +++ b/docs/content/utilities/batch.mdx @@ -0,0 +1,137 @@ +--- +title: Batch +description: Utility +--- + +import Note from "../../src/components/Note" + +One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once. + +The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (`BasePartialProcessor`) allowing you to create your **own** batch processor. + +**Key Features** + +* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure; +* Build your own batch processor using the base classes. + +**IAM Permissions** + +This utility requires additional permissions to work as expected. See the following table: + +Processor | Function/Method | IAM Permission +|---------|-----------------|---------------| +PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch` + +### PartialSQSProcessor + +SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations. + +As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the **whole** batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your [configuration][3], until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency. + +A *naive* approach to solving this problem is to delete successful records from the queue before redriving's phase. The `PartialSQSProcessor` class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures ocurred during lambda's execution. Two examples are provided below, displaying the behavior of this class. + +**Examples:** + +```python:title=context_manager.py +from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor + +def record_handler(record): + return record["body"] + +def lambda_handler(event, context): + records = event["Records"] + + # highlight-start + with processor(records, record_handler): + result = processor.process() + # highlight-end + + return result +``` + +```python:title=middleware.py +from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor + +def record_handler(record): + return record["body"] + +# highlight-start +@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) +# highlight-end +def lambda_handler(event, context): + return {"statusCode": 200} +``` + +## Create your own partial processor + +You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. + +All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively. + +**Example:** + +```python:title=custom_processor.py +from random import randint + +from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor +import boto3 + +def record_handler(record): + return randint(0, 100) + + +class MyPartialProcessor(BasePartialProcessor): + """ + Process a record and stores successful results at a DDB Table + + Parameters + ---------- + table_name: str + Table name to write results + """ + + def __init__(self, table_name: str): + self.table_name = table_name + + super().__init__() + + def _prepare(self): + # It's called once, *before* processing + # Creates table resource and clean previous results + # E.g.: + self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) + self.success_messages.clear() + + def _clean(self): + # It's called once, *after* closing processing all records (closing the context manager) + # Here we're sending, at once, all successful messages to a ddb table + # E.g.: + with ddb_table.batch_writer() as batch: + for result in self.success_messages: + batch.put_item(Item=result) + + def _process_record(self, record): + # It handles how your record is processed + # Here we're keeping the status of each run + # E.g.: + try: + result = self.handler(record) + return self.success_handler(record, result) + except Exception as exc: + return self.failure_handler(record, exc) + + def success_handler(self, record): + entry = ("success", result, record) + message = {"age": result} + self.success_messages.append(message) + return entry + + +@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table")) +def lambda_handler(event, context): + return {"statusCode": 200} +``` + +[1]: https://aws.amazon.com/eventbridge/integrations/ +[2]: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html +[3]: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js index d518ee8e715..06781ded9e2 100644 --- a/docs/gatsby-config.js +++ b/docs/gatsby-config.js @@ -32,6 +32,7 @@ module.exports = { 'Utilities': [ 'utilities/middleware_factory', 'utilities/parameters', + 'utilities/batch', ], }, navConfig: { diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py new file mode 100644 index 00000000000..342d119629b --- /dev/null +++ b/tests/functional/test_utilities_batch.py @@ -0,0 +1,163 @@ +from typing import Callable + +import pytest +from botocore.config import Config +from botocore.stub import Stubber + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor + + +@pytest.fixture(scope="module") +def sqs_event_factory() -> Callable: + def factory(body: str): + return { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": body, + "attributes": {}, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1", + } + + return factory + + +@pytest.fixture(scope="module") +def record_handler() -> Callable: + def handler(record): + body = record["body"] + if "fail" in body: + raise Exception("Failed to process record.") + return body + + return handler + + +@pytest.fixture(scope="module") +def config() -> Config: + return Config(region_name="us-east-1") + + +@pytest.fixture(scope="function") +def partial_processor(config) -> PartialSQSProcessor: + return PartialSQSProcessor(config=config) + + +def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor): + """ + Test processor with one failing record + """ + fail_record = sqs_event_factory("fail") + success_record = sqs_event_factory("success") + + records = [fail_record, success_record] + + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + + with Stubber(partial_processor.client) as stubber: + stubber.add_response("delete_message_batch", response) + + with partial_processor(records, record_handler) as ctx: + result = ctx.process() + + stubber.assert_no_pending_responses() + + assert result == [ + ("fail", ("Failed to process record.",), fail_record), + ("success", success_record["body"], success_record), + ] + + +def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor): + """ + Test processor without failure + """ + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("success") + + records = [first_record, second_record] + + with partial_processor(records, record_handler) as ctx: + result = ctx.process() + + assert result == [ + ("success", first_record["body"], first_record), + ("success", second_record["body"], second_record), + ] + + +def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_handler, partial_processor): + """ + Test processor without failure + """ + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("success") + + records = [first_record, second_record] + + with partial_processor(records, record_handler) as ctx: + ctx.process() + + with partial_processor([first_record], record_handler) as ctx: + ctx.process() + + assert partial_processor.success_messages == [first_record] + + +def test_batch_processor_middleware_with_partial_sqs_processor(sqs_event_factory, record_handler, partial_processor): + """ + Test middleware's integration with PartialSQSProcessor + """ + + @batch_processor(record_handler=record_handler, processor=partial_processor) + def lambda_handler(event, context): + return True + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + + with Stubber(partial_processor.client) as stubber: + stubber.add_response("delete_message_batch", response) + + result = lambda_handler(event, {}) + + stubber.assert_no_pending_responses() + + assert result is True + + +def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config): + """ + Test middlewares' integration with custom batch processor + """ + + class CustomProcessor(PartialSQSProcessor): + def failure_handler(self, record, exception): + print("Oh no ! It's a failure.") + return super().failure_handler(record, exception) + + processor = CustomProcessor(config=config) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return True + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} + + with Stubber(processor.client) as stubber: + stubber.add_response("delete_message_batch", response) + + result = lambda_handler(event, {}) + + stubber.assert_no_pending_responses() + + assert result is True + assert capsys.readouterr().out == "Oh no ! It's a failure.\n" diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py new file mode 100644 index 00000000000..054cc4099df --- /dev/null +++ b/tests/unit/test_utilities_batch.py @@ -0,0 +1,134 @@ +import pytest +from botocore.config import Config + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + + +@pytest.fixture(scope="function") +def sqs_event(): + return { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "", + "attributes": {}, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1", + } + + +@pytest.fixture(scope="module") +def config() -> Config: + return Config(region_name="us-east-1") + + +@pytest.fixture(scope="function") +def partial_sqs_processor(config) -> PartialSQSProcessor: + return PartialSQSProcessor(config=config) + + +def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event, partial_sqs_processor): + expected_url = "https://queue.amazonaws.com/123456789012/my-queue" + + records_mock = mocker.patch.object(PartialSQSProcessor, "records", create=True, new_callable=mocker.PropertyMock) + records_mock.return_value = [sqs_event] + + result = partial_sqs_processor._get_queue_url() + assert result == expected_url + + +def test_partial_sqs_get_queue_url_without_records(partial_sqs_processor): + assert partial_sqs_processor._get_queue_url() is None + + +def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event, partial_sqs_processor): + expected_entries = [{"Id": sqs_event["messageId"], "ReceiptHandle": sqs_event["receiptHandle"]}] + + success_messages_mock = mocker.patch.object( + PartialSQSProcessor, "success_messages", create=True, new_callable=mocker.PropertyMock + ) + success_messages_mock.return_value = [sqs_event] + + result = partial_sqs_processor._get_entries_to_clean() + + assert result == expected_entries + + +def test_partial_sqs_get_entries_to_clean_without_success(mocker, partial_sqs_processor): + expected_entries = [] + + success_messages_mock = mocker.patch.object( + PartialSQSProcessor, "success_messages", create=True, new_callable=mocker.PropertyMock + ) + success_messages_mock.return_value = [] + + result = partial_sqs_processor._get_entries_to_clean() + + assert result == expected_entries + + +def test_partial_sqs_process_record_success(mocker, partial_sqs_processor): + expected_value = mocker.sentinel.expected_value + + success_result = mocker.sentinel.success_result + record = mocker.sentinel.record + + handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, return_value=success_result) + success_handler_mock = mocker.patch.object(PartialSQSProcessor, "success_handler", return_value=expected_value) + + result = partial_sqs_processor._process_record(record) + + handler_mock.assert_called_once_with(record) + success_handler_mock.assert_called_once_with(record, success_result) + + assert result == expected_value + + +def test_partial_sqs_process_record_failure(mocker, partial_sqs_processor): + expected_value = mocker.sentinel.expected_value + + failure_result = Exception() + record = mocker.sentinel.record + + handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, side_effect=failure_result) + failure_handler_mock = mocker.patch.object(PartialSQSProcessor, "failure_handler", return_value=expected_value) + + result = partial_sqs_processor._process_record(record) + + handler_mock.assert_called_once_with(record) + failure_handler_mock.assert_called_once_with(record, failure_result) + + assert result == expected_value + + +def test_partial_sqs_prepare(mocker, partial_sqs_processor): + success_messages_mock = mocker.patch.object(partial_sqs_processor, "success_messages", spec=list) + failed_messages_mock = mocker.patch.object(partial_sqs_processor, "fail_messages", spec=list) + + partial_sqs_processor._prepare() + + success_messages_mock.clear.assert_called_once() + failed_messages_mock.clear.assert_called_once() + + +def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor): + records = [mocker.sentinel.record] + + monkeypatch.setattr(partial_sqs_processor, "fail_messages", records) + monkeypatch.setattr(partial_sqs_processor, "success_messages", records) + + queue_url_mock = mocker.patch.object(PartialSQSProcessor, "_get_queue_url") + entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "_get_entries_to_clean") + + queue_url_mock.return_value = mocker.sentinel.queue_url + entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean + + client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True) + + partial_sqs_processor._clean() + + client_mock.delete_message_batch.assert_called_once_with( + QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean + )