Skip to content

feat: SQS Partial failure #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2e00554
feat: add batch module
gmcrocetti Aug 13, 2020
9015d43
feat: include base processors
gmcrocetti Aug 13, 2020
48a7687
feat: add sqs failure processors
gmcrocetti Aug 13, 2020
9c7324b
fix: include proposed suggestions
gmcrocetti Aug 18, 2020
e7e36a9
feat(sqs): improve validation for queue_url
gmcrocetti Aug 25, 2020
8dbaf67
feat: add package level import for batch utility
gmcrocetti Aug 25, 2020
524d27f
refactor: change return for failure/success handlers
gmcrocetti Aug 25, 2020
7d32e8c
test: add unit tests for partial sqs processor
gmcrocetti Aug 25, 2020
f9c53e0
test: add unit tests for partial sqs processor
gmcrocetti Aug 25, 2020
680ee69
test: functional tests for partial sqs processor and its middleware
gmcrocetti Aug 25, 2020
6cddf76
feat(sqs): add optional config parameter
gmcrocetti Aug 25, 2020
13e3132
refactor(tests): processor using default config
gmcrocetti Aug 25, 2020
4275fb2
refactor(sqs): change methods to protected
gmcrocetti Aug 25, 2020
54a76df
fix(base-partial): append record instead of entry
gmcrocetti Aug 25, 2020
438ecd5
docs(sqs): docstrings for PartialSQS
gmcrocetti Aug 27, 2020
c7582f3
docs(sqs-base): docstring for base class
gmcrocetti Aug 27, 2020
f419ef6
refactor(sqs): add module middlewares
gmcrocetti Aug 27, 2020
c999f96
refactor: changes partial_sqs middleware in favor of a generic interf…
gmcrocetti Aug 27, 2020
5cb6b89
refactor(tests): update tests to new batch processor middleware
gmcrocetti Aug 27, 2020
be06149
refactor: batch middleware
gmcrocetti Aug 27, 2020
8ba81b4
docs(partial-processor): add simple docstrings to success/failure han…
gmcrocetti Aug 27, 2020
edcc14a
fix: typo in example
gmcrocetti Aug 27, 2020
ded3d75
docs: user specific documentation
gmcrocetti Aug 27, 2020
14cc383
docs: fix suggestions made by @heitorlessa
gmcrocetti Aug 29, 2020
25d67ca
refactor: remove references to BaseProcessor. Left BasePartialProcessor
gmcrocetti Aug 31, 2020
9caf3d1
docs: refactor example; improve docs about creating your own processor
gmcrocetti Aug 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-

"""
Batch processing utility
"""

from .base import BasePartialProcessor
from .middlewares import batch_processor
from .sqs import PartialSQSProcessor

__all__ = (
"BasePartialProcessor",
"PartialSQSProcessor",
"batch_processor",
)
93 changes: 93 additions & 0 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-

"""
Batch processing utilities
"""

from abc import ABC, abstractmethod
from typing import Any, Callable, Iterable, List, Tuple


class BasePartialProcessor(ABC):
"""
Abstract class for batch processors.
"""

def __init__(self):
self.success_messages: List = []
self.fail_messages: List = []

@abstractmethod
def _prepare(self):
"""
Prepare context manager.
"""
raise NotImplementedError()

@abstractmethod
def _clean(self):
"""
Clear context manager.
"""
raise NotImplementedError()

@abstractmethod
def _process_record(self, record: Any):
"""
Process record with handler.
"""
raise NotImplementedError()

def process(self) -> List[Tuple]:
"""
Call instance's handler for each record.
"""
return [self._process_record(record) for record in self.records]

def __enter__(self):
self._prepare()
return self

def __exit__(self, exception_type, exception_value, traceback):
self._clean()

def __call__(self, records: Iterable[Any], handler: Callable):
"""
Set instance attributes before execution

Parameters
----------
records: Iterable[Any]
Iterable with objects to be processed.
handler: Callable
Callable to process "records" entries.
"""
self.records = records
self.handler = handler
return self

def success_handler(self, record: Any, result: Any):
"""
Success callback

Returns
-------
tuple
"success", result, original record
"""
entry = ("success", result, record)
self.success_messages.append(record)
return entry

def failure_handler(self, record: Any, exception: Exception):
"""
Failure callback

Returns
-------
tuple
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
self.fail_messages.append(record)
return entry
56 changes: 56 additions & 0 deletions aws_lambda_powertools/utilities/batch/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-

"""
Middlewares for batch utilities
"""

from typing import Callable, Dict

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

from .base import BasePartialProcessor


@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
):
"""
Middleware to handle batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
record_handler: Callable
Callable to process each record from the batch
processor: PartialSQSProcessor
Batch Processor to handle partial failure cases

Examples
--------
**Processes Lambda's event with PartialSQSProcessor**
>>> from aws_lambda_powertools.utilities.batch import batch_processor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors

"""
records = event["Records"]

with processor(records, record_handler):
processor.process()

return handler(event, context)
106 changes: 106 additions & 0 deletions aws_lambda_powertools/utilities/batch/sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-

"""
Batch SQS utilities
"""
from typing import List, Optional, Tuple

import boto3
from botocore.config import Config

from .base import BasePartialProcessor


class PartialSQSProcessor(BasePartialProcessor):
"""
Amazon SQS batch processor to delete successes from the Queue.

Only the **special** case of partial failure is handled, thus a batch in
which all records failed is **not** going to be removed from the queue, and
the same is valid for a full success.

Parameters
----------
config: Config
botocore config object

Example
-------
**Process batch triggered by SQS**

>>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> def handler(event, context):
>>> records = event["Records"]
>>> processor = PartialSQSProcessor()
>>>
>>> with processor(records=records, handler=record_handler):
>>> result = processor.process()
>>>
>>> # Case a partial failure occurred, all successful executions
>>> # have been deleted from the queue after context's exit.
>>>
>>> return result
"""

def __init__(self, config: Optional[Config] = None):
"""
Initializes sqs client.
"""
config = config or Config()
self.client = boto3.client("sqs", config=config)

super().__init__()

def _get_queue_url(self) -> str:
"""
Format QueueUrl from first records entry
"""
if not getattr(self, "records", None):
return

*_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":")
return f"{self.client._endpoint.host}/{account_id}/{queue_name}"

def _get_entries_to_clean(self) -> List:
"""
Format messages to use in batch deletion
"""
return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages]

def _process_record(self, record) -> Tuple:
"""
Process a record with instance's handler

Parameters
----------
record: Any
An object to be processed.
"""
try:
result = self.handler(record)
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)

def _prepare(self):
"""
Remove results from previous execution.
"""
self.success_messages.clear()
self.fail_messages.clear()

def _clean(self):
"""
Delete messages from Queue in case of partial failure.
"""
if not (self.fail_messages and self.success_messages):
return

queue_url = self._get_queue_url()
entries_to_remove = self._get_entries_to_clean()

return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
2 changes: 1 addition & 1 deletion aws_lambda_powertools/utilities/parameters/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SecretsProvider(BaseProvider):
>>> from aws_lambda_powertools.utilities.parameters import SecretsProvider
>>> secrets_provider = SecretsProvider()
>>>
>>> value secrets_provider.get("my-parameter")
>>> value = secrets_provider.get("my-parameter")
>>>
>>> print(value)
My parameter value
Expand Down
6 changes: 6 additions & 0 deletions docs/content/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ Powertools is available in PyPi. You can use your favourite dependency managemen
```bash:title=hello_world.sh
sam init --location https://github.com/aws-samples/cookiecutter-aws-sam-python
```
* [Tracing](./core/tracer) - Decorators and utilities to trace Lambda function handlers, and both synchronous and asynchronous functions
* [Logging](./core/logger) - Structured logging made easier, and decorator to enrich structured logging with key Lambda context details
* [Metrics](./core/metrics) - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF)
* [Bring your own middleware](./utilities/middleware_factory) - Decorator factory to create your own middleware to run logic before, and after each Lambda invocation
* [Parameters utility](./utilities/parameters) - Retrieve parameter values from AWS Systems Manager Parameter Store, AWS Secrets Manager, or Amazon DynamoDB, and cache them for a specific amount of time
* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, handles partial failure.

### Lambda Layer

Expand Down
Loading