From 2e00554749034dba29eca3d68b86f133457bb2d3 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 12 Aug 2020 23:07:19 -0300 Subject: [PATCH 01/26] feat: add batch module --- aws_lambda_powertools/utilities/batch/__init__.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 aws_lambda_powertools/utilities/batch/__init__.py diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py new file mode 100644 index 00000000000..67be909187a --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +"""General utilities for Powertools""" From 9015d4333b4ff0e006928e52cb7f2cc0143b5655 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Thu, 13 Aug 2020 00:54:38 -0300 Subject: [PATCH 02/26] feat: include base processors --- aws_lambda_powertools/utilities/batch/base.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 aws_lambda_powertools/utilities/batch/base.py diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py new file mode 100644 index 00000000000..283a8b31fec --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- + +""" +Batch processing utilities +""" + +from abc import ABC, abstractmethod +from typing import Any, Callable, Iterable, List, MutableSequence, Tuple + + +class BaseProcessor(ABC): + + # init with lambda's context ? + + @abstractmethod + def _prepare(self): + """ + Prepare context manager. + """ + raise NotImplementedError() + + @abstractmethod + def _clean(self): + """ + Clear context manager. + """ + raise NotImplementedError() + + @abstractmethod + def _process_record(self): + raise NotImplementedError() + + def process(self) -> List[Tuple]: + return [self._process_record(record) for record in self.records] + + def __enter__(self): + self._prepare() + return self + + def __exit__(self, exception_type, exception_value, traceback): + self._clean() + + def __call__(self, records: Iterable[Any], handler: Callable): + """ + Set instance attributes before execution + + Parameters + ---------- + + records: Iterable[Any] + Iterable with objects to be processed. + handler: Callable + Callable to process "records" entries. + """ + self.records = records + self.handler = handler + return self + + +class BasePartialProcessor(BaseProcessor): + + success_messages: MutableSequence = None + fail_messages: MutableSequence = None + + def success_handler(self, record, result): + """ + Success callback + """ + entry = (result, record) + self.success_messages.append(record) + return entry + + def failure_handler(self, record, error): + """ + Failure callback + """ + entry = (error, record) + self.fail_messages.append(entry) + return entry From 48a7687ad4639ba2d75172fb7c5616c90b03186d Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Thu, 13 Aug 2020 00:55:06 -0300 Subject: [PATCH 03/26] feat: add sqs failure processors --- aws_lambda_powertools/utilities/batch/base.py | 2 +- aws_lambda_powertools/utilities/batch/sqs.py | 66 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 aws_lambda_powertools/utilities/batch/sqs.py diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 283a8b31fec..0cc08e8e653 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -27,7 +27,7 @@ def _clean(self): raise NotImplementedError() @abstractmethod - def _process_record(self): + def _process_record(self, record): raise NotImplementedError() def process(self) -> List[Tuple]: diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py new file mode 100644 index 00000000000..7c02c68d7bc --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- + +""" +Batch SQS utilities +""" + +from typing import List + +import boto3 + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator + +from .base import BasePartialProcessor + + +class BasePartialSQSProcessor(BasePartialProcessor): + def __init__(self): + self._client = boto3.client("sqs") + self.success_messages: List = [] + self.fail_messages: List = [] + + super().__init__() + + def get_queue_url(self): + *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") + return f"{self._client._endpoint.host}/{account_id}/{queue_name}" + + def get_entries_to_clean(self): + return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages] + + def _process_record(self, record): + try: + result = self.handler(record) + return self.success_handler(record, result) + except Exception as exc: + return self.failure_handler(record, exc) + + def _prepare(self): + """ + """ + self.success_messages.clear() + self.fail_messages.clear() + + def _clean(self): + """ + """ + if not self.fail_messages: + return + + queue_url = self.get_queue_url() + entries_to_remove = self.get_entries_to_clean() + + return self._client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) + + +class DefaultPartialSQSProcessor(BasePartialSQSProcessor): + pass + + +@lambda_handler_decorator +def partial_sqs_processor(handler, event, context, record_handler, processor=None): + records = event["Records"] + processor = processor or DefaultPartialSQSProcessor() + + with processor(records, record_handler) as ctx: + ctx.process() From 9c7324bfb9c38319c3228a63dcf175b4a6b56054 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Tue, 18 Aug 2020 17:33:25 -0300 Subject: [PATCH 04/26] fix: include proposed suggestions --- aws_lambda_powertools/utilities/batch/base.py | 3 --- aws_lambda_powertools/utilities/batch/sqs.py | 13 ++++++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 0cc08e8e653..8371b58345f 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -9,9 +9,6 @@ class BaseProcessor(ABC): - - # init with lambda's context ? - @abstractmethod def _prepare(self): """ diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 7c02c68d7bc..041bcd8ac36 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -13,7 +13,7 @@ from .base import BasePartialProcessor -class BasePartialSQSProcessor(BasePartialProcessor): +class PartialSQSProcessor(BasePartialProcessor): def __init__(self): self._client = boto3.client("sqs") self.success_messages: List = [] @@ -44,7 +44,8 @@ def _prepare(self): def _clean(self): """ """ - if not self.fail_messages: + # skip only failures or only successes + if not (self.fail_messages and self.success_messages): return queue_url = self.get_queue_url() @@ -53,14 +54,12 @@ def _clean(self): return self._client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) -class DefaultPartialSQSProcessor(BasePartialSQSProcessor): - pass - - @lambda_handler_decorator def partial_sqs_processor(handler, event, context, record_handler, processor=None): records = event["Records"] - processor = processor or DefaultPartialSQSProcessor() + processor = processor or PartialSQSProcessor() with processor(records, record_handler) as ctx: ctx.process() + + return handler(event, context) From e7e36a984ea6402e730c4ef5d275fbdb7ce7c7f9 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:16:25 -0300 Subject: [PATCH 05/26] feat(sqs): improve validation for queue_url --- aws_lambda_powertools/utilities/batch/sqs.py | 18 ++- tests/functional/test_utilities_batch.py | 148 +++++++++++++++++++ tests/unit/test_utilities_batch.py | 126 ++++++++++++++++ 3 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 tests/functional/test_utilities_batch.py create mode 100644 tests/unit/test_utilities_batch.py diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 041bcd8ac36..bb9236284c6 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -15,17 +15,26 @@ class PartialSQSProcessor(BasePartialProcessor): def __init__(self): - self._client = boto3.client("sqs") + self.client = boto3.client("sqs") self.success_messages: List = [] self.fail_messages: List = [] super().__init__() def get_queue_url(self): + """ + Format QueueUrl from first records entry + """ + if not getattr(self, "records", None): + return + *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") - return f"{self._client._endpoint.host}/{account_id}/{queue_name}" + return f"{self.client._endpoint.host}/{account_id}/{queue_name}" def get_entries_to_clean(self): + """ + Format messages to use in batch deletion + """ return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages] def _process_record(self, record): @@ -37,21 +46,22 @@ def _process_record(self, record): def _prepare(self): """ + Remove results from previous executions. """ self.success_messages.clear() self.fail_messages.clear() def _clean(self): """ + Delete messages from Queue in case of partial failure. """ - # skip only failures or only successes if not (self.fail_messages and self.success_messages): return queue_url = self.get_queue_url() entries_to_remove = self.get_entries_to_clean() - return self._client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) + return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) @lambda_handler_decorator diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py new file mode 100644 index 00000000000..507d77f05f1 --- /dev/null +++ b/tests/functional/test_utilities_batch.py @@ -0,0 +1,148 @@ +from typing import Callable +import pytest + +from botocore.stub import Stubber + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, partial_sqs_processor + + +@pytest.fixture +def sqs_event_factory() -> Callable: + def factory(body: str): + return { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": body, + "attributes": {}, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1", + } + + return factory + + +@pytest.fixture +def record_handler() -> Callable: + def handler(record): + body = record["body"] + if "fail" in body: + raise Exception("Failed to process record.") + return body + + return handler + + +def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler): + """ Test processor with one record failing """ + processor = PartialSQSProcessor() + + fail_record = sqs_event_factory("fail") + success_record = sqs_event_factory("success") + + records = [fail_record, success_record] + + response = {"Successful": [{"Id": fail_record["messageId"]},], "Failed": []} + + with Stubber(processor.client) as stubber: + stubber.add_response("delete_message_batch", response) + + with processor(records, record_handler) as ctx: + result = ctx.process() + + stubber.assert_no_pending_responses() + + assert result == [ + ("fail", ("Failed to process record.",), fail_record), + ("success", success_record["body"], success_record), + ] + + +def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler): + """ Test processor without failure """ + processor = PartialSQSProcessor() + + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("success") + + records = [first_record, second_record] + + with processor(records, record_handler) as ctx: + result = ctx.process() + + assert result == [ + ("success", first_record["body"], first_record), + ("success", second_record["body"], second_record), + ] + + +def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_handler): + """ Test processor without failure """ + processor = PartialSQSProcessor() + + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("success") + + records = [first_record, second_record] + + with processor(records, record_handler) as ctx: + ctx.process() + + with processor([first_record], record_handler) as ctx: + ctx.process() + + assert processor.success_messages == [first_record] + + +def test_partial_sqs_processor_middleware_with_default(sqs_event_factory, record_handler): + """ Test middleware with default partial processor """ + processor = PartialSQSProcessor() + + @partial_sqs_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return True + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]},], "Failed": []} + + with Stubber(processor.client) as stubber: + stubber.add_response("delete_message_batch", response) + + result = lambda_handler(event, {}) + + stubber.assert_no_pending_responses() + + assert result is True + + +def test_partial_sqs_processor_middleware_with_custom(capsys, sqs_event_factory, record_handler): + """ Test middle with custom partial processor """ + class CustomProcessor(PartialSQSProcessor): + def failure_handler(self, record, exception): + print("Oh no ! It's a failure.") + return super().failure_handler(record, exception) + + processor = CustomProcessor() + + @partial_sqs_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return True + + fail_record = sqs_event_factory("fail") + + event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} + response = {"Successful": [{"Id": fail_record["messageId"]},], "Failed": []} + + with Stubber(processor.client) as stubber: + stubber.add_response("delete_message_batch", response) + + result = lambda_handler(event, {}) + + stubber.assert_no_pending_responses() + + assert result is True + assert capsys.readouterr().out == "Oh no ! It's a failure.\n" diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py new file mode 100644 index 00000000000..36bb0b42e69 --- /dev/null +++ b/tests/unit/test_utilities_batch.py @@ -0,0 +1,126 @@ +import pytest + +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + + +@pytest.fixture(scope="module") +def sqs_event(): + return { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "", + "attributes": {}, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-1", + } + + +def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event): + expected_url = "https://queue.amazonaws.com/123456789012/my-queue" + + records_mock = mocker.patch.object(PartialSQSProcessor, "records", create=True, new_callable=mocker.PropertyMock) + records_mock.return_value = [sqs_event] + + result = PartialSQSProcessor().get_queue_url() + assert result == expected_url + + +def test_partial_sqs_get_queue_url_without_records(): + assert PartialSQSProcessor().get_queue_url() == None + + +def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event): + expected_entries = [{"Id": sqs_event["messageId"], "ReceiptHandle": sqs_event["receiptHandle"]}] + + success_messages_mock = mocker.patch.object( + PartialSQSProcessor, "success_messages", new_callable=mocker.PropertyMock + ) + success_messages_mock.return_value = [sqs_event] + + result = PartialSQSProcessor().get_entries_to_clean() + + assert result == expected_entries + + +def test_partial_sqs_get_entries_to_clean_without_success(mocker): + expected_entries = [] + + success_messages_mock = mocker.patch.object( + PartialSQSProcessor, "success_messages", new_callable=mocker.PropertyMock + ) + success_messages_mock.return_value = [] + + result = PartialSQSProcessor().get_entries_to_clean() + + assert result == expected_entries + + +def test_partial_sqs_process_record_success(mocker): + expected_value = mocker.sentinel.expected_value + + success_result = mocker.sentinel.success_result + record = mocker.sentinel.record + + handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, return_value=success_result) + success_handler_mock = mocker.patch.object(PartialSQSProcessor, "success_handler", return_value=expected_value) + + result = PartialSQSProcessor()._process_record(record) + + handler_mock.assert_called_once_with(record) + success_handler_mock.assert_called_once_with(record, success_result) + + assert result == expected_value + + +def test_partial_sqs_process_record_failure(mocker): + expected_value = mocker.sentinel.expected_value + + failure_result = Exception() + record = mocker.sentinel.record + + handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, side_effect=failure_result) + failure_handler_mock = mocker.patch.object(PartialSQSProcessor, "failure_handler", return_value=expected_value) + + result = PartialSQSProcessor()._process_record(record) + + handler_mock.assert_called_once_with(record) + failure_handler_mock.assert_called_once_with(record, failure_result) + + assert result == expected_value + + +def test_partial_sqs_prepare(mocker): + processor = PartialSQSProcessor() + + success_messages_mock = mocker.patch.object(processor, "success_messages", spec=list) + failed_messages_mock = mocker.patch.object(processor, "fail_messages", spec=list) + + processor._prepare() + + success_messages_mock.clear.assert_called_once() + failed_messages_mock.clear.assert_called_once() + + +def test_partial_sqs_clean(monkeypatch, mocker): + processor = PartialSQSProcessor() + records = [mocker.sentinel.record] + + monkeypatch.setattr(processor, "fail_messages", records) + monkeypatch.setattr(processor, "success_messages", records) + + queue_url_mock = mocker.patch.object(PartialSQSProcessor, "get_queue_url") + entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "get_entries_to_clean") + + queue_url_mock.return_value = mocker.sentinel.queue_url + entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean + + client_mock = mocker.patch.object(processor, "client", autospec=True) + + processor._clean() + + client_mock.delete_message_batch.assert_called_once_with( + QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean + ) From 8dbaf678d7598da61c37100993e1fe7221bf496c Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:18:21 -0300 Subject: [PATCH 06/26] feat: add package level import for batch utility --- aws_lambda_powertools/utilities/batch/__init__.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 67be909187a..1fe78ce7cee 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -1,3 +1,15 @@ # -*- coding: utf-8 -*- -"""General utilities for Powertools""" +""" +Batch processing utility +""" + +from .base import BasePartialProcessor, BaseProcessor +from .sqs import PartialSQSProcessor, partial_sqs_processor + +__all__ = ( + "BaseProcessor", + "BasePartialProcessor", + "PartialSQSProcessor", + "partial_sqs_processor", +) From 524d27f92a90218c6c2c1450dfe9b22da03a2b3a Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:20:22 -0300 Subject: [PATCH 07/26] refactor: change return for failure/success handlers --- aws_lambda_powertools/utilities/batch/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 8371b58345f..7f8629881d6 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -63,14 +63,14 @@ def success_handler(self, record, result): """ Success callback """ - entry = (result, record) + entry = ("success", result, record) self.success_messages.append(record) return entry - def failure_handler(self, record, error): + def failure_handler(self, record, exception): """ Failure callback """ - entry = (error, record) + entry = ("fail", exception.args, record) self.fail_messages.append(entry) return entry From 7d32e8cd0373867c89e4fcb8ed30d48911f0e6c0 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:22:21 -0300 Subject: [PATCH 08/26] test: add unit tests for partial sqs processor --- tests/unit/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/unit/__init__.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 From f9c53e0030e97e6f9ae3d8654770de212f6368dd Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:23:47 -0300 Subject: [PATCH 09/26] test: add unit tests for partial sqs processor --- tests/unit/__init__.py | 0 tests/unit/test_utilities_batch.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 tests/unit/__init__.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py index 36bb0b42e69..8265248eee9 100644 --- a/tests/unit/test_utilities_batch.py +++ b/tests/unit/test_utilities_batch.py @@ -29,7 +29,7 @@ def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event): def test_partial_sqs_get_queue_url_without_records(): - assert PartialSQSProcessor().get_queue_url() == None + assert PartialSQSProcessor().get_queue_url() is None def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event): From 680ee69df2f84b4a3bf7ac641cf3febe78acc668 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:24:16 -0300 Subject: [PATCH 10/26] test: functional tests for partial sqs processor and its middleware --- tests/functional/test_utilities_batch.py | 32 +++++++++++++++++------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 507d77f05f1..b2f785a745f 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,6 +1,6 @@ from typing import Callable -import pytest +import pytest from botocore.stub import Stubber from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, partial_sqs_processor @@ -36,7 +36,9 @@ def handler(record): def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler): - """ Test processor with one record failing """ + """ + Test processor with one failing record + """ processor = PartialSQSProcessor() fail_record = sqs_event_factory("fail") @@ -44,7 +46,7 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha records = [fail_record, success_record] - response = {"Successful": [{"Id": fail_record["messageId"]},], "Failed": []} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} with Stubber(processor.client) as stubber: stubber.add_response("delete_message_batch", response) @@ -61,7 +63,10 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler): - """ Test processor without failure """ + """ + Test processor without failure + """ + processor = PartialSQSProcessor() first_record = sqs_event_factory("success") @@ -79,7 +84,10 @@ def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_ha def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_handler): - """ Test processor without failure """ + """ + Test processor without failure + """ + processor = PartialSQSProcessor() first_record = sqs_event_factory("success") @@ -97,7 +105,10 @@ def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_ def test_partial_sqs_processor_middleware_with_default(sqs_event_factory, record_handler): - """ Test middleware with default partial processor """ + """ + Test middleware with default partial processor + """ + processor = PartialSQSProcessor() @partial_sqs_processor(record_handler=record_handler, processor=processor) @@ -107,7 +118,7 @@ def lambda_handler(event, context): fail_record = sqs_event_factory("fail") event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]},], "Failed": []} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} with Stubber(processor.client) as stubber: stubber.add_response("delete_message_batch", response) @@ -120,7 +131,10 @@ def lambda_handler(event, context): def test_partial_sqs_processor_middleware_with_custom(capsys, sqs_event_factory, record_handler): - """ Test middle with custom partial processor """ + """ + Test middle with custom partial processor + """ + class CustomProcessor(PartialSQSProcessor): def failure_handler(self, record, exception): print("Oh no ! It's a failure.") @@ -135,7 +149,7 @@ def lambda_handler(event, context): fail_record = sqs_event_factory("fail") event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]},], "Failed": []} + response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} with Stubber(processor.client) as stubber: stubber.add_response("delete_message_batch", response) From 6cddf764264b02f924ae9f236466547fc6957131 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:38:22 -0300 Subject: [PATCH 11/26] feat(sqs): add optional config parameter --- aws_lambda_powertools/utilities/batch/sqs.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index bb9236284c6..4e4e3ab055f 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -4,9 +4,10 @@ Batch SQS utilities """ -from typing import List +from typing import List, Optional import boto3 +from botocore.config import Config from aws_lambda_powertools.middleware_factory import lambda_handler_decorator @@ -14,8 +15,9 @@ class PartialSQSProcessor(BasePartialProcessor): - def __init__(self): - self.client = boto3.client("sqs") + def __init__(self, config: Optional[Config] = None): + config = config or Config() + self.client = boto3.client("sqs", config=config) self.success_messages: List = [] self.fail_messages: List = [] From 13e313272b9ee72a99b2a32fa0547e611422ef00 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 24 Aug 2020 23:57:14 -0300 Subject: [PATCH 12/26] refactor(tests): processor using default config --- tests/functional/test_utilities_batch.py | 50 ++++++++++---------- tests/unit/test_utilities_batch.py | 58 ++++++++++++++---------- 2 files changed, 60 insertions(+), 48 deletions(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index b2f785a745f..7d3e23489d0 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,12 +1,13 @@ from typing import Callable import pytest +from botocore.config import Config from botocore.stub import Stubber from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, partial_sqs_processor -@pytest.fixture +@pytest.fixture(scope="module") def sqs_event_factory() -> Callable: def factory(body: str): return { @@ -24,7 +25,7 @@ def factory(body: str): return factory -@pytest.fixture +@pytest.fixture(scope="module") def record_handler() -> Callable: def handler(record): body = record["body"] @@ -35,11 +36,20 @@ def handler(record): return handler -def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler): +@pytest.fixture(scope="module") +def config() -> Config: + return Config(region_name="us-east-1") + + +@pytest.fixture(scope="function") +def partial_processor(config) -> PartialSQSProcessor: + return PartialSQSProcessor(config=config) + + +def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor): """ Test processor with one failing record """ - processor = PartialSQSProcessor() fail_record = sqs_event_factory("fail") success_record = sqs_event_factory("success") @@ -48,10 +58,10 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - with Stubber(processor.client) as stubber: + with Stubber(partial_processor.client) as stubber: stubber.add_response("delete_message_batch", response) - with processor(records, record_handler) as ctx: + with partial_processor(records, record_handler) as ctx: result = ctx.process() stubber.assert_no_pending_responses() @@ -62,19 +72,17 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha ] -def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler): +def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor): """ Test processor without failure """ - processor = PartialSQSProcessor() - first_record = sqs_event_factory("success") second_record = sqs_event_factory("success") records = [first_record, second_record] - with processor(records, record_handler) as ctx: + with partial_processor(records, record_handler) as ctx: result = ctx.process() assert result == [ @@ -83,35 +91,31 @@ def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_ha ] -def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_handler): +def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_handler, partial_processor): """ Test processor without failure """ - processor = PartialSQSProcessor() - first_record = sqs_event_factory("success") second_record = sqs_event_factory("success") records = [first_record, second_record] - with processor(records, record_handler) as ctx: + with partial_processor(records, record_handler) as ctx: ctx.process() - with processor([first_record], record_handler) as ctx: + with partial_processor([first_record], record_handler) as ctx: ctx.process() - assert processor.success_messages == [first_record] + assert partial_processor.success_messages == [first_record] -def test_partial_sqs_processor_middleware_with_default(sqs_event_factory, record_handler): +def test_partial_sqs_processor_middleware_with_default(sqs_event_factory, record_handler, partial_processor): """ Test middleware with default partial processor """ - processor = PartialSQSProcessor() - - @partial_sqs_processor(record_handler=record_handler, processor=processor) + @partial_sqs_processor(record_handler=record_handler, processor=partial_processor) def lambda_handler(event, context): return True @@ -120,7 +124,7 @@ def lambda_handler(event, context): event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - with Stubber(processor.client) as stubber: + with Stubber(partial_processor.client) as stubber: stubber.add_response("delete_message_batch", response) result = lambda_handler(event, {}) @@ -130,7 +134,7 @@ def lambda_handler(event, context): assert result is True -def test_partial_sqs_processor_middleware_with_custom(capsys, sqs_event_factory, record_handler): +def test_partial_sqs_processor_middleware_with_custom(capsys, sqs_event_factory, record_handler, config): """ Test middle with custom partial processor """ @@ -140,7 +144,7 @@ def failure_handler(self, record, exception): print("Oh no ! It's a failure.") return super().failure_handler(record, exception) - processor = CustomProcessor() + processor = CustomProcessor(config=config) @partial_sqs_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py index 8265248eee9..875ad53e1b2 100644 --- a/tests/unit/test_utilities_batch.py +++ b/tests/unit/test_utilities_batch.py @@ -1,9 +1,10 @@ import pytest +from botocore.config import Config from aws_lambda_powertools.utilities.batch import PartialSQSProcessor -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def sqs_event(): return { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", @@ -18,21 +19,31 @@ def sqs_event(): } -def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event): +@pytest.fixture(scope="module") +def config() -> Config: + return Config(region_name="us-east-1") + + +@pytest.fixture(scope="function") +def partial_sqs_processor(config) -> PartialSQSProcessor: + return PartialSQSProcessor(config=config) + + +def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event, partial_sqs_processor): expected_url = "https://queue.amazonaws.com/123456789012/my-queue" records_mock = mocker.patch.object(PartialSQSProcessor, "records", create=True, new_callable=mocker.PropertyMock) records_mock.return_value = [sqs_event] - result = PartialSQSProcessor().get_queue_url() + result = partial_sqs_processor.get_queue_url() assert result == expected_url -def test_partial_sqs_get_queue_url_without_records(): - assert PartialSQSProcessor().get_queue_url() is None +def test_partial_sqs_get_queue_url_without_records(partial_sqs_processor): + assert partial_sqs_processor.get_queue_url() is None -def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event): +def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event, partial_sqs_processor): expected_entries = [{"Id": sqs_event["messageId"], "ReceiptHandle": sqs_event["receiptHandle"]}] success_messages_mock = mocker.patch.object( @@ -40,12 +51,12 @@ def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event): ) success_messages_mock.return_value = [sqs_event] - result = PartialSQSProcessor().get_entries_to_clean() + result = partial_sqs_processor.get_entries_to_clean() assert result == expected_entries -def test_partial_sqs_get_entries_to_clean_without_success(mocker): +def test_partial_sqs_get_entries_to_clean_without_success(mocker, partial_sqs_processor): expected_entries = [] success_messages_mock = mocker.patch.object( @@ -53,12 +64,12 @@ def test_partial_sqs_get_entries_to_clean_without_success(mocker): ) success_messages_mock.return_value = [] - result = PartialSQSProcessor().get_entries_to_clean() + result = partial_sqs_processor.get_entries_to_clean() assert result == expected_entries -def test_partial_sqs_process_record_success(mocker): +def test_partial_sqs_process_record_success(mocker, partial_sqs_processor): expected_value = mocker.sentinel.expected_value success_result = mocker.sentinel.success_result @@ -67,7 +78,7 @@ def test_partial_sqs_process_record_success(mocker): handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, return_value=success_result) success_handler_mock = mocker.patch.object(PartialSQSProcessor, "success_handler", return_value=expected_value) - result = PartialSQSProcessor()._process_record(record) + result = partial_sqs_processor._process_record(record) handler_mock.assert_called_once_with(record) success_handler_mock.assert_called_once_with(record, success_result) @@ -75,7 +86,7 @@ def test_partial_sqs_process_record_success(mocker): assert result == expected_value -def test_partial_sqs_process_record_failure(mocker): +def test_partial_sqs_process_record_failure(mocker, partial_sqs_processor): expected_value = mocker.sentinel.expected_value failure_result = Exception() @@ -84,7 +95,7 @@ def test_partial_sqs_process_record_failure(mocker): handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, side_effect=failure_result) failure_handler_mock = mocker.patch.object(PartialSQSProcessor, "failure_handler", return_value=expected_value) - result = PartialSQSProcessor()._process_record(record) + result = partial_sqs_processor._process_record(record) handler_mock.assert_called_once_with(record) failure_handler_mock.assert_called_once_with(record, failure_result) @@ -92,24 +103,21 @@ def test_partial_sqs_process_record_failure(mocker): assert result == expected_value -def test_partial_sqs_prepare(mocker): - processor = PartialSQSProcessor() - - success_messages_mock = mocker.patch.object(processor, "success_messages", spec=list) - failed_messages_mock = mocker.patch.object(processor, "fail_messages", spec=list) +def test_partial_sqs_prepare(mocker, partial_sqs_processor): + success_messages_mock = mocker.patch.object(partial_sqs_processor, "success_messages", spec=list) + failed_messages_mock = mocker.patch.object(partial_sqs_processor, "fail_messages", spec=list) - processor._prepare() + partial_sqs_processor._prepare() success_messages_mock.clear.assert_called_once() failed_messages_mock.clear.assert_called_once() -def test_partial_sqs_clean(monkeypatch, mocker): - processor = PartialSQSProcessor() +def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor): records = [mocker.sentinel.record] - monkeypatch.setattr(processor, "fail_messages", records) - monkeypatch.setattr(processor, "success_messages", records) + monkeypatch.setattr(partial_sqs_processor, "fail_messages", records) + monkeypatch.setattr(partial_sqs_processor, "success_messages", records) queue_url_mock = mocker.patch.object(PartialSQSProcessor, "get_queue_url") entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "get_entries_to_clean") @@ -117,9 +125,9 @@ def test_partial_sqs_clean(monkeypatch, mocker): queue_url_mock.return_value = mocker.sentinel.queue_url entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean - client_mock = mocker.patch.object(processor, "client", autospec=True) + client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True) - processor._clean() + partial_sqs_processor._clean() client_mock.delete_message_batch.assert_called_once_with( QueueUrl=mocker.sentinel.queue_url, Entries=mocker.sentinel.entries_to_clean From 4275fb2b1caf321fd57809792187c6135bdf7a78 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Tue, 25 Aug 2020 18:01:33 -0300 Subject: [PATCH 13/26] refactor(sqs): change methods to protected --- aws_lambda_powertools/utilities/batch/sqs.py | 8 ++++---- tests/unit/test_utilities_batch.py | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 4e4e3ab055f..025719d9dc2 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -23,7 +23,7 @@ def __init__(self, config: Optional[Config] = None): super().__init__() - def get_queue_url(self): + def _get_queue_url(self): """ Format QueueUrl from first records entry """ @@ -33,7 +33,7 @@ def get_queue_url(self): *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") return f"{self.client._endpoint.host}/{account_id}/{queue_name}" - def get_entries_to_clean(self): + def _get_entries_to_clean(self): """ Format messages to use in batch deletion """ @@ -60,8 +60,8 @@ def _clean(self): if not (self.fail_messages and self.success_messages): return - queue_url = self.get_queue_url() - entries_to_remove = self.get_entries_to_clean() + queue_url = self._get_queue_url() + entries_to_remove = self._get_entries_to_clean() return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py index 875ad53e1b2..5d4f1dd9e70 100644 --- a/tests/unit/test_utilities_batch.py +++ b/tests/unit/test_utilities_batch.py @@ -35,12 +35,12 @@ def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event, partial_sqs_p records_mock = mocker.patch.object(PartialSQSProcessor, "records", create=True, new_callable=mocker.PropertyMock) records_mock.return_value = [sqs_event] - result = partial_sqs_processor.get_queue_url() + result = partial_sqs_processor._get_queue_url() assert result == expected_url def test_partial_sqs_get_queue_url_without_records(partial_sqs_processor): - assert partial_sqs_processor.get_queue_url() is None + assert partial_sqs_processor._get_queue_url() is None def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event, partial_sqs_processor): @@ -51,7 +51,7 @@ def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event, partia ) success_messages_mock.return_value = [sqs_event] - result = partial_sqs_processor.get_entries_to_clean() + result = partial_sqs_processor._get_entries_to_clean() assert result == expected_entries @@ -64,7 +64,7 @@ def test_partial_sqs_get_entries_to_clean_without_success(mocker, partial_sqs_pr ) success_messages_mock.return_value = [] - result = partial_sqs_processor.get_entries_to_clean() + result = partial_sqs_processor._get_entries_to_clean() assert result == expected_entries @@ -119,8 +119,8 @@ def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor): monkeypatch.setattr(partial_sqs_processor, "fail_messages", records) monkeypatch.setattr(partial_sqs_processor, "success_messages", records) - queue_url_mock = mocker.patch.object(PartialSQSProcessor, "get_queue_url") - entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "get_entries_to_clean") + queue_url_mock = mocker.patch.object(PartialSQSProcessor, "_get_queue_url") + entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "_get_entries_to_clean") queue_url_mock.return_value = mocker.sentinel.queue_url entries_to_clean_mock.return_value = mocker.sentinel.entries_to_clean From 54a76df1aa5ab8557cdf84b03ecf42734fae28fe Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Tue, 25 Aug 2020 18:02:46 -0300 Subject: [PATCH 14/26] fix(base-partial): append record instead of entry --- aws_lambda_powertools/utilities/batch/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 7f8629881d6..33faac579bf 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -72,5 +72,5 @@ def failure_handler(self, record, exception): Failure callback """ entry = ("fail", exception.args, record) - self.fail_messages.append(entry) + self.fail_messages.append(record) return entry From 438ecd544b9ffd3251e67481ff8644a965dd3cd3 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:13:39 -0300 Subject: [PATCH 15/26] docs(sqs): docstrings for PartialSQS --- aws_lambda_powertools/utilities/batch/sqs.py | 71 ++++++++++++++------ 1 file changed, 52 insertions(+), 19 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 025719d9dc2..8d21294c486 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -3,27 +3,63 @@ """ Batch SQS utilities """ - -from typing import List, Optional +from typing import List, Optional, Tuple import boto3 from botocore.config import Config -from aws_lambda_powertools.middleware_factory import lambda_handler_decorator - from .base import BasePartialProcessor class PartialSQSProcessor(BasePartialProcessor): + """ + Amazon SQS batch processor to delete successes from the Queue. + + Only the **special** case of partial failure is handled, thus a batch in + which all records failed is **not** going to be removed from the queue, and + the same is valid for a full success. + + Parameters + ---------- + config: Config + botocore config object + + Example + ------- + **Process batch triggered by SQS** + + >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> def handler(event, context): + >>> records = event["Records"] + >>> processor = PartialSQSProcessor() + >>> + >>> with processor(records=records, handler=record_handler) as ctx: + >>> result = ctx.process() + >>> + >>> # Case a partial failure occurred, all successful executions + >>> # have been deleted from the queue after context's exit. + >>> + >>> return result + """ + def __init__(self, config: Optional[Config] = None): + """ + Initializes sqs client and also success and failure lists + to keep track of record's execution status. + """ config = config or Config() self.client = boto3.client("sqs", config=config) + self.success_messages: List = [] self.fail_messages: List = [] super().__init__() - def _get_queue_url(self): + def _get_queue_url(self) -> str: """ Format QueueUrl from first records entry """ @@ -33,13 +69,21 @@ def _get_queue_url(self): *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") return f"{self.client._endpoint.host}/{account_id}/{queue_name}" - def _get_entries_to_clean(self): + def _get_entries_to_clean(self) -> List: """ Format messages to use in batch deletion """ return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages] - def _process_record(self, record): + def _process_record(self, record) -> Tuple: + """ + Process a record with instance's handler + + Parameters + ---------- + record: Any + An object to be processed. + """ try: result = self.handler(record) return self.success_handler(record, result) @@ -48,7 +92,7 @@ def _process_record(self, record): def _prepare(self): """ - Remove results from previous executions. + Remove results from previous execution. """ self.success_messages.clear() self.fail_messages.clear() @@ -64,14 +108,3 @@ def _clean(self): entries_to_remove = self._get_entries_to_clean() return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) - - -@lambda_handler_decorator -def partial_sqs_processor(handler, event, context, record_handler, processor=None): - records = event["Records"] - processor = processor or PartialSQSProcessor() - - with processor(records, record_handler) as ctx: - ctx.process() - - return handler(event, context) From c7582f3bf225cdd8e22d9e3f1734503de3c8c5b9 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:18:37 -0300 Subject: [PATCH 16/26] docs(sqs-base): docstring for base class --- aws_lambda_powertools/utilities/batch/base.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 33faac579bf..f6bf0d60f23 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -9,6 +9,10 @@ class BaseProcessor(ABC): + """ + Abstract class for batch processors. + """ + @abstractmethod def _prepare(self): """ @@ -24,10 +28,16 @@ def _clean(self): raise NotImplementedError() @abstractmethod - def _process_record(self, record): + def _process_record(self, record: Any): + """ + Process record with handler. + """ raise NotImplementedError() def process(self) -> List[Tuple]: + """ + Call instance's handler for each record. + """ return [self._process_record(record) for record in self.records] def __enter__(self): @@ -43,7 +53,6 @@ def __call__(self, records: Iterable[Any], handler: Callable): Parameters ---------- - records: Iterable[Any] Iterable with objects to be processed. handler: Callable @@ -59,7 +68,7 @@ class BasePartialProcessor(BaseProcessor): success_messages: MutableSequence = None fail_messages: MutableSequence = None - def success_handler(self, record, result): + def success_handler(self, record: Any, result: Any): """ Success callback """ @@ -67,7 +76,7 @@ def success_handler(self, record, result): self.success_messages.append(record) return entry - def failure_handler(self, record, exception): + def failure_handler(self, record: Any, exception: Exception): """ Failure callback """ From f419ef61f4031c857668b9c4123378971bcc0140 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:19:50 -0300 Subject: [PATCH 17/26] refactor(sqs): add module middlewares --- .../utilities/batch/__init__.py | 4 +-- .../utilities/batch/middlewares.py | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 aws_lambda_powertools/utilities/batch/middlewares.py diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 1fe78ce7cee..84b2a52c826 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -3,9 +3,9 @@ """ Batch processing utility """ - from .base import BasePartialProcessor, BaseProcessor -from .sqs import PartialSQSProcessor, partial_sqs_processor +from .middlewares import partial_sqs_processor +from .sqs import PartialSQSProcessor __all__ = ( "BaseProcessor", diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py new file mode 100644 index 00000000000..411eb4d6302 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +""" +Middlewares for batch utilities +""" +from typing import Callable, Dict + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator + +from .sqs import PartialSQSProcessor + + +@lambda_handler_decorator +def partial_sqs_processor( + handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: PartialSQSProcessor = None +): + """ + + Examples + -------- + + """ + records = event["Records"] + processor = processor or PartialSQSProcessor() + + with processor(records, record_handler) as ctx: + ctx.process() + + return handler(event, context) From c999f9604499853ad95ceb9ff79abc4ff3a50170 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:51:28 -0300 Subject: [PATCH 18/26] refactor: changes partial_sqs middleware in favor of a generic interface always expecting a BatchProcessor --- .../utilities/batch/middlewares.py | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index 411eb4d6302..8b5dd98934d 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -7,21 +7,47 @@ from aws_lambda_powertools.middleware_factory import lambda_handler_decorator -from .sqs import PartialSQSProcessor +from .base import BaseProcessor @lambda_handler_decorator -def partial_sqs_processor( - handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: PartialSQSProcessor = None +def batch_processor( + handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BaseProcessor = None ): """ + Middleware to handle batch event processing + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: Dict + Lambda's Context + record_handler: Callable + Callable to process each record from the batch + processor: PartialSQSProcessor + Batch Processor to handle partial failure cases Examples -------- + **Processes Lambda's event with PartialSQSProcessor** + >>> from aws_lambda_powertools.utilities.batch import batch_processor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + + Limitations + ----------- + * Async batch processors """ records = event["Records"] - processor = processor or PartialSQSProcessor() with processor(records, record_handler) as ctx: ctx.process() From 5cb6b8950fea7eb0141b29375a16d658544f6773 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:52:06 -0300 Subject: [PATCH 19/26] refactor(tests): update tests to new batch processor middleware --- tests/functional/test_utilities_batch.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 7d3e23489d0..342d119629b 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -4,7 +4,7 @@ from botocore.config import Config from botocore.stub import Stubber -from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, partial_sqs_processor +from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor @pytest.fixture(scope="module") @@ -50,7 +50,6 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha """ Test processor with one failing record """ - fail_record = sqs_event_factory("fail") success_record = sqs_event_factory("success") @@ -76,7 +75,6 @@ def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_ha """ Test processor without failure """ - first_record = sqs_event_factory("success") second_record = sqs_event_factory("success") @@ -95,7 +93,6 @@ def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_ """ Test processor without failure """ - first_record = sqs_event_factory("success") second_record = sqs_event_factory("success") @@ -110,12 +107,12 @@ def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_ assert partial_processor.success_messages == [first_record] -def test_partial_sqs_processor_middleware_with_default(sqs_event_factory, record_handler, partial_processor): +def test_batch_processor_middleware_with_partial_sqs_processor(sqs_event_factory, record_handler, partial_processor): """ - Test middleware with default partial processor + Test middleware's integration with PartialSQSProcessor """ - @partial_sqs_processor(record_handler=record_handler, processor=partial_processor) + @batch_processor(record_handler=record_handler, processor=partial_processor) def lambda_handler(event, context): return True @@ -134,9 +131,9 @@ def lambda_handler(event, context): assert result is True -def test_partial_sqs_processor_middleware_with_custom(capsys, sqs_event_factory, record_handler, config): +def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config): """ - Test middle with custom partial processor + Test middlewares' integration with custom batch processor """ class CustomProcessor(PartialSQSProcessor): @@ -146,7 +143,7 @@ def failure_handler(self, record, exception): processor = CustomProcessor(config=config) - @partial_sqs_processor(record_handler=record_handler, processor=processor) + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): return True From be061495b312eb5f3c138c8a15f85330a69c69f1 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:57:39 -0300 Subject: [PATCH 20/26] refactor: batch middleware --- aws_lambda_powertools/utilities/batch/__init__.py | 4 ++-- aws_lambda_powertools/utilities/batch/middlewares.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 84b2a52c826..d3ce3aa9bfc 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -4,12 +4,12 @@ Batch processing utility """ from .base import BasePartialProcessor, BaseProcessor -from .middlewares import partial_sqs_processor +from .middlewares import batch_processor from .sqs import PartialSQSProcessor __all__ = ( "BaseProcessor", "BasePartialProcessor", "PartialSQSProcessor", - "partial_sqs_processor", + "batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index 8b5dd98934d..a24122e4194 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -38,7 +38,7 @@ def batch_processor( >>> def record_handler(record): >>> return record["body"] >>> - >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor) + >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) >>> def handler(event, context): >>> return {"StatusCode": 200} From 8ba81b4ec539cb83dc5607b6ac78b96c76f1eb41 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:58:13 -0300 Subject: [PATCH 21/26] docs(partial-processor): add simple docstrings to success/failure handlers --- aws_lambda_powertools/utilities/batch/base.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index f6bf0d60f23..40824edfa1e 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -71,6 +71,11 @@ class BasePartialProcessor(BaseProcessor): def success_handler(self, record: Any, result: Any): """ Success callback + + Returns + ------- + tuple + "success", record processing result and original record """ entry = ("success", result, record) self.success_messages.append(record) @@ -79,6 +84,11 @@ def success_handler(self, record: Any, result: Any): def failure_handler(self, record: Any, exception: Exception): """ Failure callback + + Returns + ------- + tuple + "fail", exceptions args, original record """ entry = ("fail", exception.args, record) self.fail_messages.append(record) From edcc14ae3d7a783f8170f4ad90e438d5e326622d Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 21:58:31 -0300 Subject: [PATCH 22/26] fix: typo in example --- aws_lambda_powertools/utilities/parameters/secrets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/parameters/secrets.py b/aws_lambda_powertools/utilities/parameters/secrets.py index 67cb94c340b..e3981d22bcc 100644 --- a/aws_lambda_powertools/utilities/parameters/secrets.py +++ b/aws_lambda_powertools/utilities/parameters/secrets.py @@ -27,7 +27,7 @@ class SecretsProvider(BaseProvider): >>> from aws_lambda_powertools.utilities.parameters import SecretsProvider >>> secrets_provider = SecretsProvider() >>> - >>> value secrets_provider.get("my-parameter") + >>> value = secrets_provider.get("my-parameter") >>> >>> print(value) My parameter value From ded3d755bd1b11fd62c73dd11ca59a983bef62a1 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Wed, 26 Aug 2020 23:13:02 -0300 Subject: [PATCH 23/26] docs: user specific documentation --- docs/content/index.mdx | 6 ++ docs/content/utilities/batch.mdx | 95 ++++++++++++++++++++++++++++++++ docs/gatsby-config.js | 1 + 3 files changed, 102 insertions(+) create mode 100644 docs/content/utilities/batch.mdx diff --git a/docs/content/index.mdx b/docs/content/index.mdx index 26ab367ba4c..9892e9041ca 100644 --- a/docs/content/index.mdx +++ b/docs/content/index.mdx @@ -24,6 +24,12 @@ Powertools is available in PyPi. You can use your favourite dependency managemen ```bash:title=hello_world.sh sam init --location https://github.com/aws-samples/cookiecutter-aws-sam-python ``` +* [Tracing](./core/tracer) - Decorators and utilities to trace Lambda function handlers, and both synchronous and asynchronous functions +* [Logging](./core/logger) - Structured logging made easier, and decorator to enrich structured logging with key Lambda context details +* [Metrics](./core/metrics) - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF) +* [Bring your own middleware](./utilities/middleware_factory) - Decorator factory to create your own middleware to run logic before, and after each Lambda invocation +* [Parameters utility](./utilities/parameters) - Retrieve parameter values from AWS Systems Manager Parameter Store, AWS Secrets Manager, or Amazon DynamoDB, and cache them for a specific amount of time +* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, with a middleware allow custom record handling ### Lambda Layer diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx new file mode 100644 index 00000000000..cf0536878fd --- /dev/null +++ b/docs/content/utilities/batch.mdx @@ -0,0 +1,95 @@ +--- +title: Batch +description: Utility +--- + +import Note from "../../src/components/Note" + +The batch utility provides an abstraction to process a batch event. Useful for lambda integrations with [AWS SQS](https://aws.amazon.com/sqs/), [AWS Kinesis](https://aws.amazon.com/kinesis/) and [AWS DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). +It also provides base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor. + +**Key Features** + +* Run batch processing logic with a clean interface; +* Middleware and context to handle a batch event; +* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure. + +**IAM Permissions** + +This utility requires additional permissions to work as expected. See the following table: + +Processor | Function/Method | IAM Permission +|---------|-----------------|---------------| +PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch` + +### PartialSQSProcessor + +A special batch processor which aims to `clean` your SQS:Queue if one or more (not all) records of the batch fails. +A batch's partial failure sends back all the records to the queue, reprocessing this batch until all records succed. +This processor exists to improve performance in such cases, deleting successful messages of a batch with partial failure. + +### Middleware + +```python:title=app.py +from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor + +def record_handler(record): + return record["body"] + +# highlight-start +@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) +# highlight-end +def lambda_handler(event, context): + return {"statusCode": 200} +``` + +## Create your own processor + +You can create your own batch processor by inheriting the `BaseProcessor` class, and implementing `_prepare()`, `_clean` and `_process_record()`. +It's also possible to inherit the `BasePartialProcessor` which contains additional logic to handle a partial failure and keep track of record status. + +Here is an example implementation of a DynamoDBStream custom processor: + +```python:title=custom_processor.py +import json + +from aws_lambda_powertools.utilities.batch import BaseProcessor, batch_processor +import boto3 + +class DynamoDBProcessor(BaseProcessor): + + def __init__(self, queue_url: str): + self.queue_url = queue_url + self.client = boto3.client("sqs") + + def _prepare(self): + pass + + def _clean(self): + pass + + def _process_record(self, record): + """ + Process record and send result to sqs + """ + result = self.handler(record) + body = json.dumps(result) + self.client.send_message(QueueUrl=self.queue_url, MessageBody=body) + return result + +def record_handler(record): + return record["Keys"] + +# As context + +processor = DynamoDBProcessor("dummy") +records = {"Records": []} + +with processor(records=records, handler=record_handler) as ctx: + result = ctx.process() + +# As middleware +@batch.batch_processor(record_handler=record_handler, processor=DynamoDBProcessor("dummy")) +def lambda_handler(event, context): + return {"statusCode": 200} +``` diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js index d518ee8e715..06781ded9e2 100644 --- a/docs/gatsby-config.js +++ b/docs/gatsby-config.js @@ -32,6 +32,7 @@ module.exports = { 'Utilities': [ 'utilities/middleware_factory', 'utilities/parameters', + 'utilities/batch', ], }, navConfig: { From 14cc383a06a05a32c6954073cb6398b58df8d2db Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Sat, 29 Aug 2020 17:42:52 -0300 Subject: [PATCH 24/26] docs: fix suggestions made by @heitorlessa --- .../utilities/batch/middlewares.py | 5 +- aws_lambda_powertools/utilities/batch/sqs.py | 4 +- docs/content/index.mdx | 2 +- docs/content/utilities/batch.mdx | 103 ++++++++++++------ 4 files changed, 75 insertions(+), 39 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index a24122e4194..558f0387576 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -3,6 +3,7 @@ """ Middlewares for batch utilities """ + from typing import Callable, Dict from aws_lambda_powertools.middleware_factory import lambda_handler_decorator @@ -49,7 +50,7 @@ def batch_processor( """ records = event["Records"] - with processor(records, record_handler) as ctx: - ctx.process() + with processor(records, record_handler): + processor.process() return handler(event, context) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 8d21294c486..e82924da244 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -37,8 +37,8 @@ class PartialSQSProcessor(BasePartialProcessor): >>> records = event["Records"] >>> processor = PartialSQSProcessor() >>> - >>> with processor(records=records, handler=record_handler) as ctx: - >>> result = ctx.process() + >>> with processor(records=records, handler=record_handler): + >>> result = processor.process() >>> >>> # Case a partial failure occurred, all successful executions >>> # have been deleted from the queue after context's exit. diff --git a/docs/content/index.mdx b/docs/content/index.mdx index 9892e9041ca..fe13020fa23 100644 --- a/docs/content/index.mdx +++ b/docs/content/index.mdx @@ -29,7 +29,7 @@ sam init --location https://github.com/aws-samples/cookiecutter-aws-sam-python * [Metrics](./core/metrics) - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF) * [Bring your own middleware](./utilities/middleware_factory) - Decorator factory to create your own middleware to run logic before, and after each Lambda invocation * [Parameters utility](./utilities/parameters) - Retrieve parameter values from AWS Systems Manager Parameter Store, AWS Secrets Manager, or Amazon DynamoDB, and cache them for a specific amount of time -* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, with a middleware allow custom record handling +* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, handles partial failure. ### Lambda Layer diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index cf0536878fd..8c346b3172f 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -5,14 +5,15 @@ description: Utility import Note from "../../src/components/Note" -The batch utility provides an abstraction to process a batch event. Useful for lambda integrations with [AWS SQS](https://aws.amazon.com/sqs/), [AWS Kinesis](https://aws.amazon.com/kinesis/) and [AWS DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). -It also provides base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor. +One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once. + +The proposed batch utility aims to provide an abstraction to process batch events, providing base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor. +It also provides a useful implementation to handle partial batch failures from the SQS provider. **Key Features** -* Run batch processing logic with a clean interface; -* Middleware and context to handle a batch event; -* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure. +* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure; +* Build your own batch processor using the base classes. **IAM Permissions** @@ -24,13 +25,32 @@ PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch` ### PartialSQSProcessor -A special batch processor which aims to `clean` your SQS:Queue if one or more (not all) records of the batch fails. -A batch's partial failure sends back all the records to the queue, reprocessing this batch until all records succed. -This processor exists to improve performance in such cases, deleting successful messages of a batch with partial failure. +SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations. + +As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the **whole** batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your [configuration][3], until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency. + +A *naive* approach to solving this problem is to delete successful records from the queue before redriving's phase. The `PartialSQSProcessor` class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures ocurred during lambda's execution. Two examples are provided below, displaying the behavior of this class. + +**Examples:** + +```python:title=context_manager.py +from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor + +def record_handler(record): + return record["body"] + +def lambda_handler(event, context): + records = event["Records"] + + # highlight-start + with processor(records, record_handler): + result = processor.process() + # highlight-end -### Middleware + return result +``` -```python:title=app.py +```python:title=middleware.py from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor def record_handler(record): @@ -48,48 +68,63 @@ def lambda_handler(event, context): You can create your own batch processor by inheriting the `BaseProcessor` class, and implementing `_prepare()`, `_clean` and `_process_record()`. It's also possible to inherit the `BasePartialProcessor` which contains additional logic to handle a partial failure and keep track of record status. -Here is an example implementation of a DynamoDBStream custom processor: +**Example:** ```python:title=custom_processor.py -import json +from uuid import uuid4 from aws_lambda_powertools.utilities.batch import BaseProcessor, batch_processor import boto3 -class DynamoDBProcessor(BaseProcessor): +def record_handler(record): + return {"Id": str(uuid4()), "MessageBody": record["body"]} + + +class DDBStreamProcessor(BaseProcessor): + """ + 1. Listens to streams from table A; + 2. Process each record; + 3. Send a batch message to a Queue with the result. + + Parameters + ---------- + queue_name: str + QueueName to send the results + """ - def __init__(self, queue_url: str): - self.queue_url = queue_url + def __init__(self, queue_name: str): + self.queue_name = queue_name + self.queue_url = None self.client = boto3.client("sqs") + self.results = [] def _prepare(self): - pass + # It's called once, *before* processing + # Formats queue_url given a name + # E.g.: + self.queue_url = f"https://queue.amazonaws.com/123456789012/{self.queue_name}" def _clean(self): - pass + # It's called once, *after* closing processing all records (closing the context manager) + # Here we're sending at once all messages to the queue, and cleaning 'results' for future invocations + # E.g.: + self.client.send_message_batch(QueueUrl=self.queue_url, Entries=[self.results]) + self.results.clear() def _process_record(self, record): - """ - Process record and send result to sqs - """ + # It handles how you process your record + # Here we're storing the result of each record in a list + # E.g.: result = self.handler(record) - body = json.dumps(result) - self.client.send_message(QueueUrl=self.queue_url, MessageBody=body) + self.results.append(result) return result -def record_handler(record): - return record["Keys"] - -# As context - -processor = DynamoDBProcessor("dummy") -records = {"Records": []} -with processor(records=records, handler=record_handler) as ctx: - result = ctx.process() - -# As middleware -@batch.batch_processor(record_handler=record_handler, processor=DynamoDBProcessor("dummy")) +@batch_processor(record_handler=record_handler, processor=DDBStreamProcessor("dummy-queue")) def lambda_handler(event, context): return {"statusCode": 200} ``` + +[1]: https://aws.amazon.com/eventbridge/integrations/ +[2]: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html +[3]: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html From 25d67ca8d2d2a070f000a7f6e69265490218f61d Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 31 Aug 2020 17:08:31 -0300 Subject: [PATCH 25/26] refactor: remove references to BaseProcessor. Left BasePartialProcessor --- .../utilities/batch/__init__.py | 4 ++-- aws_lambda_powertools/utilities/batch/base.py | 16 +++++++--------- .../utilities/batch/middlewares.py | 4 ++-- aws_lambda_powertools/utilities/batch/sqs.py | 6 +----- tests/unit/test_utilities_batch.py | 4 ++-- 5 files changed, 14 insertions(+), 20 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index d3ce3aa9bfc..068cdaa9ee9 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -3,12 +3,12 @@ """ Batch processing utility """ -from .base import BasePartialProcessor, BaseProcessor + +from .base import BasePartialProcessor from .middlewares import batch_processor from .sqs import PartialSQSProcessor __all__ = ( - "BaseProcessor", "BasePartialProcessor", "PartialSQSProcessor", "batch_processor", diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 40824edfa1e..a184a879441 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -5,14 +5,18 @@ """ from abc import ABC, abstractmethod -from typing import Any, Callable, Iterable, List, MutableSequence, Tuple +from typing import Any, Callable, Iterable, List, Tuple -class BaseProcessor(ABC): +class BasePartialProcessor(ABC): """ Abstract class for batch processors. """ + def __init__(self): + self.success_messages: List = [] + self.fail_messages: List = [] + @abstractmethod def _prepare(self): """ @@ -62,12 +66,6 @@ def __call__(self, records: Iterable[Any], handler: Callable): self.handler = handler return self - -class BasePartialProcessor(BaseProcessor): - - success_messages: MutableSequence = None - fail_messages: MutableSequence = None - def success_handler(self, record: Any, result: Any): """ Success callback @@ -75,7 +73,7 @@ def success_handler(self, record: Any, result: Any): Returns ------- tuple - "success", record processing result and original record + "success", result, original record """ entry = ("success", result, record) self.success_messages.append(record) diff --git a/aws_lambda_powertools/utilities/batch/middlewares.py b/aws_lambda_powertools/utilities/batch/middlewares.py index 558f0387576..7ea84e0ce02 100644 --- a/aws_lambda_powertools/utilities/batch/middlewares.py +++ b/aws_lambda_powertools/utilities/batch/middlewares.py @@ -8,12 +8,12 @@ from aws_lambda_powertools.middleware_factory import lambda_handler_decorator -from .base import BaseProcessor +from .base import BasePartialProcessor @lambda_handler_decorator def batch_processor( - handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BaseProcessor = None + handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None ): """ Middleware to handle batch event processing diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index e82924da244..8bbb7e5ef77 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -48,15 +48,11 @@ class PartialSQSProcessor(BasePartialProcessor): def __init__(self, config: Optional[Config] = None): """ - Initializes sqs client and also success and failure lists - to keep track of record's execution status. + Initializes sqs client. """ config = config or Config() self.client = boto3.client("sqs", config=config) - self.success_messages: List = [] - self.fail_messages: List = [] - super().__init__() def _get_queue_url(self) -> str: diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py index 5d4f1dd9e70..054cc4099df 100644 --- a/tests/unit/test_utilities_batch.py +++ b/tests/unit/test_utilities_batch.py @@ -47,7 +47,7 @@ def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event, partia expected_entries = [{"Id": sqs_event["messageId"], "ReceiptHandle": sqs_event["receiptHandle"]}] success_messages_mock = mocker.patch.object( - PartialSQSProcessor, "success_messages", new_callable=mocker.PropertyMock + PartialSQSProcessor, "success_messages", create=True, new_callable=mocker.PropertyMock ) success_messages_mock.return_value = [sqs_event] @@ -60,7 +60,7 @@ def test_partial_sqs_get_entries_to_clean_without_success(mocker, partial_sqs_pr expected_entries = [] success_messages_mock = mocker.patch.object( - PartialSQSProcessor, "success_messages", new_callable=mocker.PropertyMock + PartialSQSProcessor, "success_messages", create=True, new_callable=mocker.PropertyMock ) success_messages_mock.return_value = [] From 9caf3d186760dc17833a2694fe1aff9f074726ef Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 31 Aug 2020 18:06:52 -0300 Subject: [PATCH 26/26] docs: refactor example; improve docs about creating your own processor --- docs/content/utilities/batch.mdx | 67 ++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index 8c346b3172f..ef2649e55e3 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -7,8 +7,7 @@ import Note from "../../src/components/Note" One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once. -The proposed batch utility aims to provide an abstraction to process batch events, providing base classes (`BaseProcessor`, `BasePartialProcessor`) allowing you to create your **own** batch processor. -It also provides a useful implementation to handle partial batch failures from the SQS provider. +The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (`BasePartialProcessor`) allowing you to create your **own** batch processor. **Key Features** @@ -63,64 +62,72 @@ def lambda_handler(event, context): return {"statusCode": 200} ``` -## Create your own processor +## Create your own partial processor -You can create your own batch processor by inheriting the `BaseProcessor` class, and implementing `_prepare()`, `_clean` and `_process_record()`. -It's also possible to inherit the `BasePartialProcessor` which contains additional logic to handle a partial failure and keep track of record status. +You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. + +All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively. **Example:** ```python:title=custom_processor.py -from uuid import uuid4 +from random import randint -from aws_lambda_powertools.utilities.batch import BaseProcessor, batch_processor +from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor import boto3 def record_handler(record): - return {"Id": str(uuid4()), "MessageBody": record["body"]} + return randint(0, 100) -class DDBStreamProcessor(BaseProcessor): +class MyPartialProcessor(BasePartialProcessor): """ - 1. Listens to streams from table A; - 2. Process each record; - 3. Send a batch message to a Queue with the result. + Process a record and stores successful results at a DDB Table Parameters ---------- - queue_name: str - QueueName to send the results + table_name: str + Table name to write results """ - def __init__(self, queue_name: str): - self.queue_name = queue_name - self.queue_url = None - self.client = boto3.client("sqs") - self.results = [] + def __init__(self, table_name: str): + self.table_name = table_name + + super().__init__() def _prepare(self): # It's called once, *before* processing - # Formats queue_url given a name + # Creates table resource and clean previous results # E.g.: - self.queue_url = f"https://queue.amazonaws.com/123456789012/{self.queue_name}" + self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) + self.success_messages.clear() def _clean(self): # It's called once, *after* closing processing all records (closing the context manager) - # Here we're sending at once all messages to the queue, and cleaning 'results' for future invocations + # Here we're sending, at once, all successful messages to a ddb table # E.g.: - self.client.send_message_batch(QueueUrl=self.queue_url, Entries=[self.results]) - self.results.clear() + with ddb_table.batch_writer() as batch: + for result in self.success_messages: + batch.put_item(Item=result) def _process_record(self, record): - # It handles how you process your record - # Here we're storing the result of each record in a list + # It handles how your record is processed + # Here we're keeping the status of each run # E.g.: - result = self.handler(record) - self.results.append(result) - return result + try: + result = self.handler(record) + return self.success_handler(record, result) + except Exception as exc: + return self.failure_handler(record, exc) + + def success_handler(self, record): + entry = ("success", result, record) + message = {"age": result} + self.success_messages.append(message) + return entry -@batch_processor(record_handler=record_handler, processor=DDBStreamProcessor("dummy-queue")) +@batch_processor(record_handler=record_handler, processor=MyPartialProcessor("dummy-table")) def lambda_handler(event, context): return {"statusCode": 200} ```