diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 463f6f7fbff..08c35560b3f 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -13,7 +13,6 @@ batch_processor, ) from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo -from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor __all__ = ( "BatchProcessor", @@ -21,8 +20,6 @@ "ExceptionInfo", "EventType", "FailureResponse", - "PartialSQSProcessor", "SuccessResponse", "batch_processor", - "sqs_batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 1122bea4c03..6e5a0ce1d85 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -170,19 +170,19 @@ def batch_processor( Lambda's Context record_handler: Callable Callable to process each record from the batch - processor: PartialSQSProcessor + processor: BasePartialProcessor Batch Processor to handle partial failure cases Examples -------- - **Processes Lambda's event with PartialSQSProcessor** + **Processes Lambda's event with a BasePartialProcessor** - >>> from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor + >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor >>> >>> def record_handler(record): >>> return record["body"] >>> - >>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) + >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor()) >>> def handler(event, context): >>> return {"StatusCode": 200} diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py index d90c25f12bc..d541d18d18f 100644 --- a/aws_lambda_powertools/utilities/batch/exceptions.py +++ b/aws_lambda_powertools/utilities/batch/exceptions.py @@ -24,19 +24,6 @@ def format_exceptions(self, parent_exception_str): return "\n".join(exception_list) -class SQSBatchProcessingError(BaseBatchProcessingError): - """When at least one message within a batch could not be processed""" - - def __init__(self, msg="", child_exceptions: Optional[List[ExceptionInfo]] = None): - super().__init__(msg, child_exceptions) - - # Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent - # errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275 - def __str__(self): - parent_exception_str = super(SQSBatchProcessingError, self).__str__() - return self.format_exceptions(parent_exception_str) - - class BatchProcessingError(BaseBatchProcessingError): """When all batch records failed to be processed""" diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py deleted file mode 100644 index 7b234c1372e..00000000000 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ /dev/null @@ -1,250 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -Batch SQS utilities -""" -import logging -import math -import sys -import warnings -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Callable, Dict, List, Optional, Tuple, cast - -import boto3 -from botocore.config import Config - -from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - -from ...middleware_factory import lambda_handler_decorator -from .base import BasePartialProcessor -from .exceptions import SQSBatchProcessingError - -logger = logging.getLogger(__name__) - - -class PartialSQSProcessor(BasePartialProcessor): - """ - Amazon SQS batch processor to delete successes from the Queue. - - The whole batch will be processed, even if failures occur. After all records are processed, - SQSBatchProcessingError will be raised if there were any failures, causing messages to - be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception. - - Parameters - ---------- - config: Config - botocore config object - suppress_exception: bool, optional - Supress exception raised if any messages fail processing, by default False - boto3_session : boto3.session.Session, optional - Boto3 session to use for AWS API communication - - - Example - ------- - **Process batch triggered by SQS** - - >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - >>> - >>> def record_handler(record): - >>> return record["body"] - >>> - >>> def handler(event, context): - >>> records = event["Records"] - >>> processor = PartialSQSProcessor() - >>> - >>> with processor(records=records, handler=record_handler): - >>> result = processor.process() - >>> - >>> # Case a partial failure occurred, all successful executions - >>> # have been deleted from the queue after context's exit. - >>> - >>> return result - - """ - - def __init__( - self, - config: Optional[Config] = None, - suppress_exception: bool = False, - boto3_session: Optional[boto3.session.Session] = None, - ): - """ - Initializes sqs client. - """ - config = config or Config() - session = boto3_session or boto3.session.Session() - self.client = session.client("sqs", config=config) - self.suppress_exception = suppress_exception - self.max_message_batch = 10 - - warnings.warn( - "The sqs_batch_processor decorator and PartialSQSProcessor class are now deprecated, " - "and will be removed in the next major version. " - "Please follow the upgrade guide at " - "https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/#legacy " - "to use the native batch_processor decorator or BatchProcessor class." - ) - - super().__init__() - - def _get_queue_url(self) -> Optional[str]: - """ - Format QueueUrl from first records entry - """ - if not getattr(self, "records", None): - return None - - *_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") - return f"{self.client._endpoint.host}/{account_id}/{queue_name}" - - def _get_entries_to_clean(self) -> List[Dict[str, str]]: - """ - Format messages to use in batch deletion - """ - entries = [] - # success_messages has generic type of union of SQS, Dynamodb and Kinesis Streams records or Pydantic models. - # Here we get SQS Record only - messages = cast(List[SQSRecord], self.success_messages) - for msg in messages: - entries.append({"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]}) - return entries - - def _process_record(self, record) -> Tuple: - """ - Process a record with instance's handler - - Parameters - ---------- - record: Any - An object to be processed. - """ - try: - result = self.handler(record=record) - return self.success_handler(record=record, result=result) - except Exception: - return self.failure_handler(record=record, exception=sys.exc_info()) - - def _prepare(self): - """ - Remove results from previous execution. - """ - self.success_messages.clear() - self.fail_messages.clear() - - def _clean(self) -> Optional[List]: - """ - Delete messages from Queue in case of partial failure. - """ - - # If all messages were successful, fall back to the default SQS - - # Lambda behavior which deletes messages if Lambda responds successfully - if not self.fail_messages: - logger.debug(f"All {len(self.success_messages)} records successfully processed") - return None - - queue_url = self._get_queue_url() - if queue_url is None: - logger.debug("No queue url found") - return None - - entries_to_remove = self._get_entries_to_clean() - # Batch delete up to 10 messages at a time (SQS limit) - max_workers = math.ceil(len(entries_to_remove) / self.max_message_batch) - - if entries_to_remove: - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures, results = [], [] - while entries_to_remove: - futures.append( - executor.submit( - self._delete_messages, queue_url, entries_to_remove[: self.max_message_batch], self.client - ) - ) - entries_to_remove = entries_to_remove[self.max_message_batch :] - for future in as_completed(futures): - try: - logger.debug("Deleted batch of processed messages from SQS") - results.append(future.result()) - except Exception: - logger.exception("Couldn't remove batch of processed messages from SQS") - raise - if self.suppress_exception: - logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed") - else: - logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception") - raise SQSBatchProcessingError( - msg=f"Not all records processed successfully. {len(self.exceptions)} individual errors logged " - f"separately below.", - child_exceptions=self.exceptions, - ) - - return results - - def _delete_messages(self, queue_url: str, entries_to_remove: List, sqs_client: Any): - delete_message_response = sqs_client.delete_message_batch( - QueueUrl=queue_url, - Entries=entries_to_remove, - ) - return delete_message_response - - -@lambda_handler_decorator -def sqs_batch_processor( - handler: Callable, - event: Dict, - context: Dict, - record_handler: Callable, - config: Optional[Config] = None, - suppress_exception: bool = False, - boto3_session: Optional[boto3.session.Session] = None, -): - """ - Middleware to handle SQS batch event processing - - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: Dict - Lambda's Context - record_handler: Callable - Callable to process each record from the batch - config: Config - botocore config object - suppress_exception: bool, optional - Supress exception raised if any messages fail processing, by default False - boto3_session : boto3.session.Session, optional - Boto3 session to use for AWS API communication - - Examples - -------- - **Processes Lambda's event with PartialSQSProcessor** - - >>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor - >>> - >>> def record_handler(record): - >>> return record["body"] - >>> - >>> @sqs_batch_processor(record_handler=record_handler) - >>> def handler(event, context): - >>> return {"StatusCode": 200} - - Limitations - ----------- - * Async batch processors - - """ - config = config or Config() - session = boto3_session or boto3.session.Session() - - processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception, boto3_session=session) - - records = event["Records"] - - with processor(records, record_handler): - processor.process() - - return handler(event, context) diff --git a/docs/upgrade.md b/docs/upgrade.md index 91ad54e42d3..20cf4aa25a6 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -11,6 +11,7 @@ The transition from Powertools for Python v1 to v2 is as painless as possible, a Changes at a glance: * The API for **event handler's `Response`** has minor changes to support multi value headers and cookies. +* The **legacy SQS batch processor** was removed. ???+ important Powertools for Python v2 drops suport for Python 3.6, following the Python 3.6 End-Of-Life (EOL) reached on December 23, 2021. @@ -55,3 +56,89 @@ def get_todos(): cookies=["CookieName=CookieValue"] ) ``` + +## Legacy SQS Batch Processor + +The deprecated `PartialSQSProcessor` and `sqs_batch_processor` were removed. +You can migrate to the [native batch processing](https://aws.amazon.com/about-aws/whats-new/2021/11/aws-lambda-partial-batch-response-sqs-event-source/) capability by: + +1. If you use **`sqs_batch_decorator`** you can now use **`batch_processor`** decorator +2. If you use **`PartialSQSProcessor`** you can now use **`BatchProcessor`** +3. [Enable the functionality](../utilities/batch#required-resources) on SQS +4. Change your Lambda Handler to return the new response format + +=== "Decorator: Before" + + ```python hl_lines="1 6" + from aws_lambda_powertools.utilities.batch import sqs_batch_processor + + def record_handler(record): + return do_something_with(record["body"]) + + @sqs_batch_processor(record_handler=record_handler) + def lambda_handler(event, context): + return {"statusCode": 200} + ``` + +=== "Decorator: After" + + ```python hl_lines="3 5 11" + import json + + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + + processor = BatchProcessor(event_type=EventType.SQS) + + + def record_handler(record): + return do_something_with(record["body"]) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + ``` + +=== "Context manager: Before" + + ```python hl_lines="1-2 4 14 19" + from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + from botocore.config import Config + + config = Config(region_name="us-east-1") + + def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value + + + def lambda_handler(event, context): + records = event["Records"] + + processor = PartialSQSProcessor(config=config) + + with processor(records, record_handler): + result = processor.process() + + return result + ``` + +=== "Context manager: After" + + ```python hl_lines="1 11" + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + + + def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value + + def lambda_handler(event, context): + records = event["Records"] + + processor = BatchProcessor(event_type=EventType.SQS) + + with processor(records, record_handler): + result = processor.process() + + return processor.response() + ``` diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 6241179ed4e..2476a8d5ef5 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -5,11 +5,6 @@ description: Utility The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. -???+ warning - The legacy `sqs_batch_processor` decorator and `PartialSQSProcessor` class are deprecated and are going to be removed soon. - - Please check the [migration guide](#migration-guide) for more information. - ## Key Features * Reports batch item failures to reduce number of retries for a record upon errors @@ -1146,215 +1141,16 @@ class MyProcessor(BatchProcessor): return super().failure_handler(record, exception) ``` -## Legacy - -???+ tip - This is kept for historical purposes. Use the new [BatchProcessor](#processing-messages-from-sqs) instead. - -### Migration guide - -???+ info - Keep reading if you are using `sqs_batch_processor` or `PartialSQSProcessor`. - -[As of Nov 2021](https://aws.amazon.com/about-aws/whats-new/2021/11/aws-lambda-partial-batch-response-sqs-event-source/){target="_blank"}, this is no longer needed as both SQS, Kinesis, and DynamoDB Streams offer this capability natively with one caveat - it's an [opt-in feature](#required-resources). - -Being a native feature, we no longer need to instantiate boto3 nor other customizations like exception suppressing – this lowers the cost of your Lambda function as you can delegate deleting partial failures to Lambda. - -???+ tip - It's also easier to test since it's mostly a [contract based response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sqs-batchfailurereporting-syntax){target="_blank"}. - -You can migrate in three steps: - -1. If you are using **`sqs_batch_decorator`** you can now use **`batch_processor`** decorator -2. If you were using **`PartialSQSProcessor`** you can now use **`BatchProcessor`** -3. Change your Lambda Handler to return the new response format - -=== "Decorator: Before" - - ```python hl_lines="1 6" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - - def record_handler(record): - return do_something_with(record["body"]) - - @sqs_batch_processor(record_handler=record_handler) - def lambda_handler(event, context): - return {"statusCode": 200} - ``` - -=== "Decorator: After" - - ```python hl_lines="3 5 11" - import json - - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - - processor = BatchProcessor(event_type=EventType.SQS) - - - def record_handler(record): - return do_something_with(record["body"]) - - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context): - return processor.response() - ``` - -=== "Context manager: Before" - - ```python hl_lines="1-2 4 14 19" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - from botocore.config import Config - - config = Config(region_name="us-east-1") - - def record_handler(record): - return_value = do_something_with(record["body"]) - return return_value - - - def lambda_handler(event, context): - records = event["Records"] - - processor = PartialSQSProcessor(config=config) - - with processor(records, record_handler): - result = processor.process() - - return result - ``` - -=== "Context manager: After" - - ```python hl_lines="1 11" - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - - - def record_handler(record): - return_value = do_something_with(record["body"]) - return return_value - - def lambda_handler(event, context): - records = event["Records"] - - processor = BatchProcessor(event_type=EventType.SQS) - - with processor(records, record_handler): - result = processor.process() - - return processor.response() - ``` - -### Customizing boto configuration - -The **`config`** and **`boto3_session`** parameters enable you to pass in a custom [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) -or a custom [boto3 session](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html) when using the `sqs_batch_processor` -decorator or `PartialSQSProcessor` class. - -> Custom config example - -=== "Decorator" - - ```python hl_lines="4 12" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - from botocore.config import Config - - config = Config(region_name="us-east-1") - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - @sqs_batch_processor(record_handler=record_handler, config=config) - def lambda_handler(event, context): - return {"statusCode": 200} - ``` - -=== "Context manager" - - ```python hl_lines="4 16" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - from botocore.config import Config - - config = Config(region_name="us-east-1") - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - - def lambda_handler(event, context): - records = event["Records"] - - processor = PartialSQSProcessor(config=config) - - with processor(records, record_handler): - result = processor.process() - - return result - ``` - -> Custom boto3 session example - -=== "Decorator" - - ```python hl_lines="4 12" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor - from botocore.config import Config - - session = boto3.session.Session() - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - @sqs_batch_processor(record_handler=record_handler, boto3_session=session) - def lambda_handler(event, context): - return {"statusCode": 200} - ``` - -=== "Context manager" - - ```python hl_lines="4 16" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - import boto3 - - session = boto3.session.Session() - - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - - - def lambda_handler(event, context): - records = event["Records"] - - processor = PartialSQSProcessor(boto3_session=session) - - with processor(records, record_handler): - result = processor.process() - - return result - ``` - ### Suppressing exceptions -If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument. +If you want to disable the default behavior where `BatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument. === "Decorator" ```python hl_lines="3" - from aws_lambda_powertools.utilities.batch import sqs_batch_processor + from aws_lambda_powertools.utilities.batch import batch_processor - @sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) + @batch_processor(record_handler=record_handler, suppress_exception=True) def lambda_handler(event, context): return {"statusCode": 200} ``` @@ -1362,9 +1158,9 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r === "Context manager" ```python hl_lines="3" - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType - processor = PartialSQSProcessor(config=config, suppress_exception=True) + processor = BatchProcessor(event_type=EventType.SQS, suppress_exception=True) with processor(records, record_handler): result = processor.process() diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index a5e1e706437..2a7b0d3375f 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,21 +1,12 @@ import json -import math from random import randint from typing import Callable, Dict, Optional -from unittest.mock import patch import pytest from botocore.config import Config -from botocore.stub import Stubber - -from aws_lambda_powertools.utilities.batch import ( - BatchProcessor, - EventType, - PartialSQSProcessor, - batch_processor, - sqs_batch_processor, -) -from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError, SQSBatchProcessingError + +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord @@ -135,30 +126,6 @@ def config() -> Config: return Config(region_name="us-east-1") -@pytest.fixture(scope="function") -def partial_processor(config) -> PartialSQSProcessor: - return PartialSQSProcessor(config=config) - - -@pytest.fixture(scope="function") -def partial_processor_suppressed(config) -> PartialSQSProcessor: - return PartialSQSProcessor(config=config, suppress_exception=True) - - -@pytest.fixture(scope="function") -def stubbed_partial_processor(config) -> PartialSQSProcessor: - processor = PartialSQSProcessor(config=config) - with Stubber(processor.client) as stubber: - yield stubber, processor - - -@pytest.fixture(scope="function") -def stubbed_partial_processor_suppressed(config) -> PartialSQSProcessor: - processor = PartialSQSProcessor(config=config, suppress_exception=True) - with Stubber(processor.client) as stubber: - yield stubber, processor - - @pytest.fixture(scope="module") def order_event_factory() -> Callable: def factory(item: Dict) -> str: @@ -167,258 +134,6 @@ def factory(item: Dict) -> str: return factory -@pytest.mark.parametrize( - "success_messages_count", - ([1, 18, 34]), -) -def test_partial_sqs_processor_context_with_failure( - success_messages_count, sqs_event_factory, record_handler, partial_processor -): - """ - Test processor with one failing record and multiple processed records - """ - fail_record = sqs_event_factory("fail") - success_records = [sqs_event_factory("success") for i in range(0, success_messages_count)] - - records = [fail_record, *success_records] - - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - - with Stubber(partial_processor.client) as stubber: - for _ in range(0, math.ceil((success_messages_count / partial_processor.max_message_batch))): - stubber.add_response("delete_message_batch", response) - with pytest.raises(SQSBatchProcessingError) as error: - with partial_processor(records, record_handler) as ctx: - ctx.process() - - assert len(error.value.child_exceptions) == 1 - stubber.assert_no_pending_responses() - - -def test_partial_sqs_processor_context_with_failure_exception(sqs_event_factory, record_handler, partial_processor): - """ - Test processor with one failing record - """ - fail_record = sqs_event_factory("fail") - success_record = sqs_event_factory("success") - - records = [fail_record, success_record] - - with Stubber(partial_processor.client) as stubber: - stubber.add_client_error( - method="delete_message_batch", service_error_code="ServiceUnavailable", http_status_code=503 - ) - with pytest.raises(Exception) as error: - with partial_processor(records, record_handler) as ctx: - ctx.process() - - assert "ServiceUnavailable" in str(error.value) - stubber.assert_no_pending_responses() - - -def test_partial_sqs_processor_context_only_success(sqs_event_factory, record_handler, partial_processor): - """ - Test processor without failure - """ - first_record = sqs_event_factory("success") - second_record = sqs_event_factory("success") - - records = [first_record, second_record] - - with partial_processor(records, record_handler) as ctx: - result = ctx.process() - - assert result == [ - ("success", first_record["body"], first_record), - ("success", second_record["body"], second_record), - ] - - -def test_partial_sqs_processor_context_multiple_calls(sqs_event_factory, record_handler, partial_processor): - """ - Test processor without failure - """ - first_record = sqs_event_factory("success") - second_record = sqs_event_factory("success") - - records = [first_record, second_record] - - with partial_processor(records, record_handler) as ctx: - ctx.process() - - with partial_processor([first_record], record_handler) as ctx: - ctx.process() - - assert partial_processor.success_messages == [first_record] - - -def test_batch_processor_middleware_with_partial_sqs_processor(sqs_event_factory, record_handler, partial_processor): - """ - Test middleware's integration with PartialSQSProcessor - """ - - @batch_processor(record_handler=record_handler, processor=partial_processor) - def lambda_handler(event, context): - return True - - fail_record = sqs_event_factory("fail") - - event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - - with Stubber(partial_processor.client) as stubber: - stubber.add_response("delete_message_batch", response) - with pytest.raises(SQSBatchProcessingError) as error: - lambda_handler(event, {}) - - assert len(error.value.child_exceptions) == 2 - stubber.assert_no_pending_responses() - - -@patch("aws_lambda_powertools.utilities.batch.sqs.PartialSQSProcessor") -def test_sqs_batch_processor_middleware( - patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor -): - """ - Test middleware's integration with PartialSQSProcessor - """ - - @sqs_batch_processor(record_handler=record_handler) - def lambda_handler(event, context): - return True - - stubber, processor = stubbed_partial_processor - patched_sqs_processor.return_value = processor - - fail_record = sqs_event_factory("fail") - - event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - stubber.add_response("delete_message_batch", response) - with pytest.raises(SQSBatchProcessingError) as error: - lambda_handler(event, {}) - - assert len(error.value.child_exceptions) == 1 - stubber.assert_no_pending_responses() - - -def test_batch_processor_middleware_with_custom_processor(capsys, sqs_event_factory, record_handler, config): - """ - Test middlewares' integration with custom batch processor - """ - - class CustomProcessor(PartialSQSProcessor): - def failure_handler(self, record, exception): - print("Oh no ! It's a failure.") - return super().failure_handler(record, exception) - - processor = CustomProcessor(config=config) - - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context): - return True - - fail_record = sqs_event_factory("fail") - - event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - - with Stubber(processor.client) as stubber: - stubber.add_response("delete_message_batch", response) - with pytest.raises(SQSBatchProcessingError) as error: - lambda_handler(event, {}) - - stubber.assert_no_pending_responses() - - assert len(error.value.child_exceptions) == 1 - assert capsys.readouterr().out == "Oh no ! It's a failure.\n" - - -def test_batch_processor_middleware_suppressed_exceptions( - sqs_event_factory, record_handler, partial_processor_suppressed -): - """ - Test middleware's integration with PartialSQSProcessor - """ - - @batch_processor(record_handler=record_handler, processor=partial_processor_suppressed) - def lambda_handler(event, context): - return True - - fail_record = sqs_event_factory("fail") - - event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - - with Stubber(partial_processor_suppressed.client) as stubber: - stubber.add_response("delete_message_batch", response) - result = lambda_handler(event, {}) - - stubber.assert_no_pending_responses() - assert result is True - - -def test_partial_sqs_processor_suppressed_exceptions(sqs_event_factory, record_handler, partial_processor_suppressed): - """ - Test processor without failure - """ - - first_record = sqs_event_factory("success") - second_record = sqs_event_factory("fail") - records = [first_record, second_record] - - fail_record = sqs_event_factory("fail") - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - - with Stubber(partial_processor_suppressed.client) as stubber: - stubber.add_response("delete_message_batch", response) - with partial_processor_suppressed(records, record_handler) as ctx: - ctx.process() - - assert partial_processor_suppressed.success_messages == [first_record] - - -@patch("aws_lambda_powertools.utilities.batch.sqs.PartialSQSProcessor") -def test_sqs_batch_processor_middleware_suppressed_exception( - patched_sqs_processor, sqs_event_factory, record_handler, stubbed_partial_processor_suppressed -): - """ - Test middleware's integration with PartialSQSProcessor - """ - - @sqs_batch_processor(record_handler=record_handler) - def lambda_handler(event, context): - return True - - stubber, processor = stubbed_partial_processor_suppressed - patched_sqs_processor.return_value = processor - - fail_record = sqs_event_factory("fail") - - event = {"Records": [sqs_event_factory("fail"), sqs_event_factory("success")]} - response = {"Successful": [{"Id": fail_record["messageId"]}], "Failed": []} - stubber.add_response("delete_message_batch", response) - result = lambda_handler(event, {}) - - stubber.assert_no_pending_responses() - assert result is True - - -def test_partial_sqs_processor_context_only_failure(sqs_event_factory, record_handler, partial_processor): - """ - Test processor with only failures - """ - first_record = sqs_event_factory("fail") - second_record = sqs_event_factory("fail") - - records = [first_record, second_record] - with pytest.raises(SQSBatchProcessingError) as error: - with partial_processor(records, record_handler) as ctx: - ctx.process() - - assert len(error.value.child_exceptions) == 2 - - def test_batch_processor_middleware_success_only(sqs_event_factory, record_handler): # GIVEN first_record = SQSRecord(sqs_event_factory("success")) diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py deleted file mode 100644 index 8cc4f0b0225..00000000000 --- a/tests/unit/test_utilities_batch.py +++ /dev/null @@ -1,141 +0,0 @@ -import pytest -from botocore.config import Config - -from aws_lambda_powertools.utilities.batch import PartialSQSProcessor -from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError - -# Maintenance: This will be deleted as part of legacy Batch deprecation - - -@pytest.fixture(scope="function") -def sqs_event(): - return { - "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", - "body": "", - "attributes": {}, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", - "awsRegion": "us-east-1", - } - - -@pytest.fixture(scope="module") -def config() -> Config: - return Config(region_name="us-east-1") - - -@pytest.fixture(scope="function") -def partial_sqs_processor(config) -> PartialSQSProcessor: - return PartialSQSProcessor(config=config) - - -def test_partial_sqs_get_queue_url_with_records(mocker, sqs_event, partial_sqs_processor): - expected_url = "https://queue.amazonaws.com/123456789012/my-queue" - - records_mock = mocker.patch.object(PartialSQSProcessor, "records", create=True, new_callable=mocker.PropertyMock) - records_mock.return_value = [sqs_event] - - result = partial_sqs_processor._get_queue_url() - assert result == expected_url - - -def test_partial_sqs_get_queue_url_without_records(partial_sqs_processor): - assert partial_sqs_processor._get_queue_url() is None - - -def test_partial_sqs_get_entries_to_clean_with_success(mocker, sqs_event, partial_sqs_processor): - expected_entries = [{"Id": sqs_event["messageId"], "ReceiptHandle": sqs_event["receiptHandle"]}] - - success_messages_mock = mocker.patch.object( - PartialSQSProcessor, "success_messages", create=True, new_callable=mocker.PropertyMock - ) - success_messages_mock.return_value = [sqs_event] - - result = partial_sqs_processor._get_entries_to_clean() - - assert result == expected_entries - - -def test_partial_sqs_get_entries_to_clean_without_success(mocker, partial_sqs_processor): - expected_entries = [] - - success_messages_mock = mocker.patch.object( - PartialSQSProcessor, "success_messages", create=True, new_callable=mocker.PropertyMock - ) - success_messages_mock.return_value = [] - - result = partial_sqs_processor._get_entries_to_clean() - - assert result == expected_entries - - -def test_partial_sqs_process_record_success(mocker, partial_sqs_processor): - expected_value = mocker.sentinel.expected_value - - success_result = mocker.sentinel.success_result - record = mocker.sentinel.record - - handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, return_value=success_result) - success_handler_mock = mocker.patch.object(PartialSQSProcessor, "success_handler", return_value=expected_value) - - result = partial_sqs_processor._process_record(record) - - handler_mock.assert_called_once_with(record=record) - success_handler_mock.assert_called_once_with(record=record, result=success_result) - - assert result == expected_value - - -def test_partial_sqs_process_record_failure(mocker, partial_sqs_processor): - expected_value = mocker.sentinel.expected_value - - failure_result = Exception() - record = mocker.sentinel.record - - handler_mock = mocker.patch.object(PartialSQSProcessor, "handler", create=True, side_effect=failure_result) - failure_handler_mock = mocker.patch.object(PartialSQSProcessor, "failure_handler", return_value=expected_value) - - result = partial_sqs_processor._process_record(record) - - handler_mock.assert_called_once_with(record=record) - - _, failure_handler_called_with_args = failure_handler_mock.call_args - failure_handler_mock.assert_called_once() - assert (failure_handler_called_with_args["record"]) == record - assert isinstance(failure_handler_called_with_args["exception"], tuple) - assert failure_handler_called_with_args["exception"][1] == failure_result - assert result == expected_value - - -def test_partial_sqs_prepare(mocker, partial_sqs_processor): - success_messages_mock = mocker.patch.object(partial_sqs_processor, "success_messages", spec=list) - failed_messages_mock = mocker.patch.object(partial_sqs_processor, "fail_messages", spec=list) - - partial_sqs_processor._prepare() - - success_messages_mock.clear.assert_called_once() - failed_messages_mock.clear.assert_called_once() - - -def test_partial_sqs_clean(monkeypatch, mocker, partial_sqs_processor): - records = [mocker.sentinel.record] - - monkeypatch.setattr(partial_sqs_processor, "fail_messages", records) - monkeypatch.setattr(partial_sqs_processor, "success_messages", records) - - queue_url_mock = mocker.patch.object(PartialSQSProcessor, "_get_queue_url") - entries_to_clean_mock = mocker.patch.object(PartialSQSProcessor, "_get_entries_to_clean") - - queue_url_mock.return_value = mocker.sentinel.queue_url - entries_to_clean_mock.return_value = [mocker.sentinel.entries_to_clean] - - client_mock = mocker.patch.object(partial_sqs_processor, "client", autospec=True) - with pytest.raises(SQSBatchProcessingError): - partial_sqs_processor._clean() - - client_mock.delete_message_batch.assert_called_once_with( - QueueUrl=mocker.sentinel.queue_url, Entries=[mocker.sentinel.entries_to_clean] - )