Skip to content

fix: batch processing util #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 3, 2020
8 changes: 2 additions & 6 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
"""

from .base import BasePartialProcessor
from .middlewares import batch_processor
from .middlewares import batch_processor, sqs_batch_processor
from .sqs import PartialSQSProcessor

__all__ = (
"BasePartialProcessor",
"PartialSQSProcessor",
"batch_processor",
)
__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor")
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class BasePartialProcessor(ABC):
def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []
self.exceptions: List = []

@abstractmethod
def _prepare(self):
Expand Down Expand Up @@ -89,5 +90,6 @@ def failure_handler(self, record: Any, exception: Exception):
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
self.exceptions.append(exception)
self.fail_messages.append(record)
return entry
7 changes: 7 additions & 0 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Batch processing exceptions
"""


class SQSBatchProcessingError(Exception):
"""When at least one message within a batch could not be processed"""
59 changes: 58 additions & 1 deletion aws_lambda_powertools/utilities/batch/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
"""
Middlewares for batch utilities
"""
from typing import Callable, Dict, Optional

from typing import Callable, Dict
from botocore.config import Config

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

from .base import BasePartialProcessor
from .sqs import PartialSQSProcessor


@lambda_handler_decorator
Expand Down Expand Up @@ -54,3 +56,58 @@ def batch_processor(
processor.process()

return handler(event, context)


@lambda_handler_decorator
def sqs_batch_processor(
handler: Callable,
event: Dict,
context: Dict,
record_handler: Callable,
config: Optional[Config] = None,
suppress_exception: bool = False,
):
"""
Middleware to handle SQS batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
record_handler: Callable
Callable to process each record from the batch
config: Config
botocore config object
suppress_exception: bool, optional
Supress exception raised if any messages fail processing, by default False

Examples
--------
**Processes Lambda's event with PartialSQSProcessor**
>>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @sqs_batch_processor(record_handler=record_handler)
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors

"""
config = config or Config()
processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception)

records = event["Records"]

with processor(records, record_handler):
processor.process()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a logger.debug here to provide a message that you're processing another record. We don't need to log the record for security reasons, just a message suffices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is outside of the loop for processing single records, but I've added debug logging elsewhere which should suffice.


return handler(event, context)
24 changes: 18 additions & 6 deletions aws_lambda_powertools/utilities/batch/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
from botocore.config import Config

from .base import BasePartialProcessor
from .exceptions import SQSBatchProcessingError


class PartialSQSProcessor(BasePartialProcessor):
"""
Amazon SQS batch processor to delete successes from the Queue.

Only the **special** case of partial failure is handled, thus a batch in
which all records failed is **not** going to be removed from the queue, and
the same is valid for a full success.
The whole batch will be processed, even if failures occur. After all records are processed,
SQSBatchProcessingError will be raised if there were any failures, causing messages to
be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.

Parameters
----------
config: Config
botocore config object
suppress_exception: bool, optional
Supress exception raised if any messages fail processing, by default False


Example
-------
Expand All @@ -46,12 +50,13 @@ class PartialSQSProcessor(BasePartialProcessor):
>>> return result
"""

def __init__(self, config: Optional[Config] = None):
def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False):
"""
Initializes sqs client.
"""
config = config or Config()
self.client = boto3.client("sqs", config=config)
self.suppress_exception = suppress_exception

super().__init__()

Expand Down Expand Up @@ -97,10 +102,17 @@ def _clean(self):
"""
Delete messages from Queue in case of partial failure.
"""
if not (self.fail_messages and self.success_messages):
# If all messages were successful, fall back to the default SQS -
# Lambda behaviour which deletes messages if Lambda responds successfully
if not self.fail_messages:
return

queue_url = self._get_queue_url()
entries_to_remove = self._get_entries_to_clean()

return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)

if self.fail_messages and not self.suppress_exception:
raise SQSBatchProcessingError(list(self.exceptions))

return delete_message_response
148 changes: 117 additions & 31 deletions docs/content/utilities/batch.mdx
Original file line number Diff line number Diff line change
@@ -1,73 +1,163 @@
---
title: Batch
title: SQS Batch Processing
description: Utility
---

import Note from "../../src/components/Note"

One very attractive feature of Lambda functions is the ability to integrate them with a plethora of internal and external [event sources][1]. Some of these event providers allows a feature called "Batch processing" in which [predefined number][2] of events is sent to lambda function at once.

The proposed batch utility aims to provide an abstraction to handle a partial failure during a batch execution from a SQS event source, providing a base class (`BasePartialProcessor`) allowing you to create your **own** batch processor.
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.

**Key Features**

* Removal of successful messages for [AWS SQS](https://aws.amazon.com/sqs/) batch - in case of partial failure;
* Build your own batch processor using the base classes.
* Prevent succesfully processed messages being returned to SQS
* Simple interface for individually processing messages from a batch
* Build your own batch processor using the base classes

**Background**

When using SQS as a Lambda event source mapping, functions can be triggered with a batch of messages from SQS. If the Lambda function fails when processing the batch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some line breaks to make it easier to read. The docs theme kinda forces you to do that or else it cramps the lines

all messages in the batch will be returned to the queue. With this utility, messages within a batch are handled individually - only messages that were not successfully processed
are returned to the queue. More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html).

<Note type="warning">
While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible.
</Note><br/>


**IAM Permissions**

This utility requires additional permissions to work as expected. See the following table:
This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission.

## Processing messages from SQS

Processor | Function/Method | IAM Permission
|---------|-----------------|---------------|
PartialSQSProcessor | `_clean` | `sqs:DeleteMessageBatch`
There are 2 ways to use this utility for processing SQS messages:

### PartialSQSProcessor
**With a decorator:**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a sub-heading #### as opposed to bold text - It allows customers and us to easily reference it in conversations


SQS integration with Lambda is one of the most well established ones and pretty useful when building asynchronous applications. One common approach to maximize the performance of this integration is to enable the batch processing feature, resulting in higher throughput with less invocations.
Using the `sqs_batch_processor` decorator with your lambda handler function, you provide a `record_handler` which is responsible for processing individual messages. It should raise an exception if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add line breaks here to make it easier to read in our docs theme

it is unable to process the record. All records in the batch will be passed to this handler for processing, even if exceptions are thrown. After all messages are processed, any successfully processed
ones will be deleted from the queue. If there were any messages the `record_handler` couldn't process, `SQSBatchProcessingError` will be raised. You will not have accessed to the _processed_ messages
within the lambda handler - all processing logic should be performed by the `record_handler` function.

As any function call, you may face errors during execution, in one or more records belonging to a batch. SQS's native behavior is to redrive the **whole** batch to the queue again, reprocessing all of them again, including successful ones. This cycle can happen multiple times depending on your [configuration][3], until the whole batch succeeds or the maximum number of attempts is reached. Your application may face some problems with such behavior, especially if there's no idempotency.
```python:title=app.py
from aws_lambda_powertools.utilities.batch import sqs_batch_processor

def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value

@sqs_batch_processor(record_handler=record_handler)
def lambda_handler(event, context):
return {"statusCode": 200}
```

A *naive* approach to solving this problem is to delete successful records from the queue before redriving's phase. The `PartialSQSProcessor` class offers this solution both as context manager and middleware, removing all successful messages from the queue case one or more failures occurred during lambda's execution. Two examples are provided below, displaying the behavior of this class.
**With a context manager:**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a sub-heading #### as opposed to bold text - It allows customers and us to easily reference it in conversations


**Examples:**
If you require access to the result of processed messages, you can use the context manager. The result from calling `process()` on the context manager will be a list of
all the return values from your `record_handler` function.

```python:title=context_manager.py
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
```python:title=app.py
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

def record_handler(record):
return record["body"]
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


def lambda_handler(event, context):
records = event["Records"]

# highlight-start
processor = PartialSQSProcessor()

with processor(records, record_handler):
result = processor.process()
# highlight-end
result = processor.process() # Returns a list of all results from record_handler

return result
```

```python:title=middleware.py
from aws_lambda_powertools.utilities.batch import batch_processor, PartialSQSProcessor
## Passing custom boto3 config

If you need to pass custom configuration such as region to the SDK, you can pass your own [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) to
the `sqs_batch_processor` decorator:

```python:title=app.py
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
from botocore.config import Config

config = Config(region_name="us-east-1") # highlight-line

def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value

@sqs_batch_processor(record_handler=record_handler, config=config) # highlight-line
def lambda_handler(event, context):
return {"statusCode": 200}
```

Or to the `PartialSQSProcessor` class:
```python:title=app.py
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

from botocore.config import Config

config = Config(region_name="us-east-1") # highlight-line

def record_handler(record):
return record["body"]
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


# highlight-start
@batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
# highlight-end
def lambda_handler(event, context):
records = event["Records"]

processor = PartialSQSProcessor(config=config) # highlight-line

with processor(records, record_handler):
result = processor.process()

return result
```


## Suppressing exceptions

If you want to disable the defualt behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` argument.

<Note type="warning">
If your Lambda function executes successfully and returns a response, all messages in the batch will be deleted from the queue.
</Note><br/>

```python:title=app.py
...
@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) # highlight-line
def lambda_handler(event, context):
return {"statusCode": 200}
```
or
```python:title=app.py
processor = PartialSQSProcessor(config=config, suppress_exception=True) # highlight-line

with processor(records, record_handler):
result = processor.process()
```

## Create your own partial processor

You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`.

All processing logic is handled by `_process_record()` whilst `_prepare()` and `clean()` take care of doing a setup/teardown of the processor, being called at start/end of processor's execution, respectively.

You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function.

**Example:**

```python:title=custom_processor.py
Expand Down Expand Up @@ -131,7 +221,3 @@ class MyPartialProcessor(BasePartialProcessor):
def lambda_handler(event, context):
return {"statusCode": 200}
```

[1]: https://aws.amazon.com/eventbridge/integrations/
[2]: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html
[3]: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Loading