Skip to content

fix: batch processing util #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 3, 2020
Merged
11 changes: 3 additions & 8 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
Batch processing utility
"""

from .base import BasePartialProcessor
from .middlewares import batch_processor
from .sqs import PartialSQSProcessor
from .base import BasePartialProcessor, batch_processor
from .sqs import PartialSQSProcessor, sqs_batch_processor

__all__ = (
"BasePartialProcessor",
"PartialSQSProcessor",
"batch_processor",
)
__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor")
55 changes: 54 additions & 1 deletion aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
Batch processing utilities
"""

import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, List, Tuple
from typing import Any, Callable, Dict, Iterable, List, Tuple

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

logger = logging.getLogger(__name__)


class BasePartialProcessor(ABC):
Expand All @@ -16,6 +21,7 @@ class BasePartialProcessor(ABC):
def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []
self.exceptions: List = []

@abstractmethod
def _prepare(self):
Expand Down Expand Up @@ -89,5 +95,52 @@ def failure_handler(self, record: Any, exception: Exception):
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
logger.debug("Record processing exception: ", exception)
self.exceptions.append(exception)
self.fail_messages.append(record)
return entry


@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
):
"""
Middleware to handle batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
record_handler: Callable
Callable to process each record from the batch
processor: PartialSQSProcessor
Batch Processor to handle partial failure cases

Examples
--------
**Processes Lambda's event with PartialSQSProcessor**
>>> from aws_lambda_powertools.utilities.batch import batch_processor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors

"""
records = event["Records"]

with processor(records, record_handler):
processor.process()

return handler(event, context)
7 changes: 7 additions & 0 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Batch processing exceptions
"""


class SQSBatchProcessingError(Exception):
"""When at least one message within a batch could not be processed"""
56 changes: 0 additions & 56 deletions aws_lambda_powertools/utilities/batch/middlewares.py

This file was deleted.

89 changes: 82 additions & 7 deletions aws_lambda_powertools/utilities/batch/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,34 @@
"""
Batch SQS utilities
"""
from typing import List, Optional, Tuple
import logging
from typing import Callable, Dict, List, Optional, Tuple

import boto3
from botocore.config import Config

from ...middleware_factory import lambda_handler_decorator
from .base import BasePartialProcessor
from .exceptions import SQSBatchProcessingError

logger = logging.getLogger(__name__)


class PartialSQSProcessor(BasePartialProcessor):
"""
Amazon SQS batch processor to delete successes from the Queue.

Only the **special** case of partial failure is handled, thus a batch in
which all records failed is **not** going to be removed from the queue, and
the same is valid for a full success.
The whole batch will be processed, even if failures occur. After all records are processed,
SQSBatchProcessingError will be raised if there were any failures, causing messages to
be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.

Parameters
----------
config: Config
botocore config object
suppress_exception: bool, optional
Supress exception raised if any messages fail processing, by default False


Example
-------
Expand All @@ -46,12 +54,13 @@ class PartialSQSProcessor(BasePartialProcessor):
>>> return result
"""

def __init__(self, config: Optional[Config] = None):
def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False):
"""
Initializes sqs client.
"""
config = config or Config()
self.client = boto3.client("sqs", config=config)
self.suppress_exception = suppress_exception

super().__init__()

Expand Down Expand Up @@ -97,10 +106,76 @@ def _clean(self):
"""
Delete messages from Queue in case of partial failure.
"""
if not (self.fail_messages and self.success_messages):
# If all messages were successful, fall back to the default SQS -
# Lambda behaviour which deletes messages if Lambda responds successfully
if not self.fail_messages:
logger.debug(f"All {len(self.success_messages)} records successfully processed")
return

queue_url = self._get_queue_url()
entries_to_remove = self._get_entries_to_clean()

return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)

if self.suppress_exception:
logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
else:
logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception")
raise SQSBatchProcessingError(list(self.exceptions))

return delete_message_response


@lambda_handler_decorator
def sqs_batch_processor(
handler: Callable,
event: Dict,
context: Dict,
record_handler: Callable,
config: Optional[Config] = None,
suppress_exception: bool = False,
):
"""
Middleware to handle SQS batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
record_handler: Callable
Callable to process each record from the batch
config: Config
botocore config object
suppress_exception: bool, optional
Supress exception raised if any messages fail processing, by default False

Examples
--------
**Processes Lambda's event with PartialSQSProcessor**
>>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @sqs_batch_processor(record_handler=record_handler)
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors

"""
config = config or Config()
processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception)

records = event["Records"]

with processor(records, record_handler):
processor.process()

return handler(event, context)
Loading