Skip to content

Commit b28c47e

Browse files
authored
Merge pull request #100 from gmcrocetti/partial-sqs-batch
feat: SQS Partial failure
2 parents c5e7085 + 9caf3d1 commit b28c47e

File tree

10 files changed

+712
-1
lines changed

10 files changed

+712
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Batch processing utility
5+
"""
6+
7+
from .base import BasePartialProcessor
8+
from .middlewares import batch_processor
9+
from .sqs import PartialSQSProcessor
10+
11+
__all__ = (
12+
"BasePartialProcessor",
13+
"PartialSQSProcessor",
14+
"batch_processor",
15+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Batch processing utilities
5+
"""
6+
7+
from abc import ABC, abstractmethod
8+
from typing import Any, Callable, Iterable, List, Tuple
9+
10+
11+
class BasePartialProcessor(ABC):
12+
"""
13+
Abstract class for batch processors.
14+
"""
15+
16+
def __init__(self):
17+
self.success_messages: List = []
18+
self.fail_messages: List = []
19+
20+
@abstractmethod
21+
def _prepare(self):
22+
"""
23+
Prepare context manager.
24+
"""
25+
raise NotImplementedError()
26+
27+
@abstractmethod
28+
def _clean(self):
29+
"""
30+
Clear context manager.
31+
"""
32+
raise NotImplementedError()
33+
34+
@abstractmethod
35+
def _process_record(self, record: Any):
36+
"""
37+
Process record with handler.
38+
"""
39+
raise NotImplementedError()
40+
41+
def process(self) -> List[Tuple]:
42+
"""
43+
Call instance's handler for each record.
44+
"""
45+
return [self._process_record(record) for record in self.records]
46+
47+
def __enter__(self):
48+
self._prepare()
49+
return self
50+
51+
def __exit__(self, exception_type, exception_value, traceback):
52+
self._clean()
53+
54+
def __call__(self, records: Iterable[Any], handler: Callable):
55+
"""
56+
Set instance attributes before execution
57+
58+
Parameters
59+
----------
60+
records: Iterable[Any]
61+
Iterable with objects to be processed.
62+
handler: Callable
63+
Callable to process "records" entries.
64+
"""
65+
self.records = records
66+
self.handler = handler
67+
return self
68+
69+
def success_handler(self, record: Any, result: Any):
70+
"""
71+
Success callback
72+
73+
Returns
74+
-------
75+
tuple
76+
"success", result, original record
77+
"""
78+
entry = ("success", result, record)
79+
self.success_messages.append(record)
80+
return entry
81+
82+
def failure_handler(self, record: Any, exception: Exception):
83+
"""
84+
Failure callback
85+
86+
Returns
87+
-------
88+
tuple
89+
"fail", exceptions args, original record
90+
"""
91+
entry = ("fail", exception.args, record)
92+
self.fail_messages.append(record)
93+
return entry
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Middlewares for batch utilities
5+
"""
6+
7+
from typing import Callable, Dict
8+
9+
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
10+
11+
from .base import BasePartialProcessor
12+
13+
14+
@lambda_handler_decorator
15+
def batch_processor(
16+
handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
17+
):
18+
"""
19+
Middleware to handle batch event processing
20+
21+
Parameters
22+
----------
23+
handler: Callable
24+
Lambda's handler
25+
event: Dict
26+
Lambda's Event
27+
context: Dict
28+
Lambda's Context
29+
record_handler: Callable
30+
Callable to process each record from the batch
31+
processor: PartialSQSProcessor
32+
Batch Processor to handle partial failure cases
33+
34+
Examples
35+
--------
36+
**Processes Lambda's event with PartialSQSProcessor**
37+
>>> from aws_lambda_powertools.utilities.batch import batch_processor
38+
>>>
39+
>>> def record_handler(record):
40+
>>> return record["body"]
41+
>>>
42+
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
43+
>>> def handler(event, context):
44+
>>> return {"StatusCode": 200}
45+
46+
Limitations
47+
-----------
48+
* Async batch processors
49+
50+
"""
51+
records = event["Records"]
52+
53+
with processor(records, record_handler):
54+
processor.process()
55+
56+
return handler(event, context)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Batch SQS utilities
5+
"""
6+
from typing import List, Optional, Tuple
7+
8+
import boto3
9+
from botocore.config import Config
10+
11+
from .base import BasePartialProcessor
12+
13+
14+
class PartialSQSProcessor(BasePartialProcessor):
15+
"""
16+
Amazon SQS batch processor to delete successes from the Queue.
17+
18+
Only the **special** case of partial failure is handled, thus a batch in
19+
which all records failed is **not** going to be removed from the queue, and
20+
the same is valid for a full success.
21+
22+
Parameters
23+
----------
24+
config: Config
25+
botocore config object
26+
27+
Example
28+
-------
29+
**Process batch triggered by SQS**
30+
31+
>>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
32+
>>>
33+
>>> def record_handler(record):
34+
>>> return record["body"]
35+
>>>
36+
>>> def handler(event, context):
37+
>>> records = event["Records"]
38+
>>> processor = PartialSQSProcessor()
39+
>>>
40+
>>> with processor(records=records, handler=record_handler):
41+
>>> result = processor.process()
42+
>>>
43+
>>> # Case a partial failure occurred, all successful executions
44+
>>> # have been deleted from the queue after context's exit.
45+
>>>
46+
>>> return result
47+
"""
48+
49+
def __init__(self, config: Optional[Config] = None):
50+
"""
51+
Initializes sqs client.
52+
"""
53+
config = config or Config()
54+
self.client = boto3.client("sqs", config=config)
55+
56+
super().__init__()
57+
58+
def _get_queue_url(self) -> str:
59+
"""
60+
Format QueueUrl from first records entry
61+
"""
62+
if not getattr(self, "records", None):
63+
return
64+
65+
*_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":")
66+
return f"{self.client._endpoint.host}/{account_id}/{queue_name}"
67+
68+
def _get_entries_to_clean(self) -> List:
69+
"""
70+
Format messages to use in batch deletion
71+
"""
72+
return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages]
73+
74+
def _process_record(self, record) -> Tuple:
75+
"""
76+
Process a record with instance's handler
77+
78+
Parameters
79+
----------
80+
record: Any
81+
An object to be processed.
82+
"""
83+
try:
84+
result = self.handler(record)
85+
return self.success_handler(record, result)
86+
except Exception as exc:
87+
return self.failure_handler(record, exc)
88+
89+
def _prepare(self):
90+
"""
91+
Remove results from previous execution.
92+
"""
93+
self.success_messages.clear()
94+
self.fail_messages.clear()
95+
96+
def _clean(self):
97+
"""
98+
Delete messages from Queue in case of partial failure.
99+
"""
100+
if not (self.fail_messages and self.success_messages):
101+
return
102+
103+
queue_url = self._get_queue_url()
104+
entries_to_remove = self._get_entries_to_clean()
105+
106+
return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)

aws_lambda_powertools/utilities/parameters/secrets.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class SecretsProvider(BaseProvider):
2727
>>> from aws_lambda_powertools.utilities.parameters import SecretsProvider
2828
>>> secrets_provider = SecretsProvider()
2929
>>>
30-
>>> value secrets_provider.get("my-parameter")
30+
>>> value = secrets_provider.get("my-parameter")
3131
>>>
3232
>>> print(value)
3333
My parameter value

docs/content/index.mdx

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ Powertools is available in PyPi. You can use your favourite dependency managemen
2424
```bash:title=hello_world.sh
2525
sam init --location https://github.com/aws-samples/cookiecutter-aws-sam-python
2626
```
27+
* [Tracing](./core/tracer) - Decorators and utilities to trace Lambda function handlers, and both synchronous and asynchronous functions
28+
* [Logging](./core/logger) - Structured logging made easier, and decorator to enrich structured logging with key Lambda context details
29+
* [Metrics](./core/metrics) - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF)
30+
* [Bring your own middleware](./utilities/middleware_factory) - Decorator factory to create your own middleware to run logic before, and after each Lambda invocation
31+
* [Parameters utility](./utilities/parameters) - Retrieve parameter values from AWS Systems Manager Parameter Store, AWS Secrets Manager, or Amazon DynamoDB, and cache them for a specific amount of time
32+
* [Batch utility](./utilities/batch) - Batch processing for AWS SQS, handles partial failure.
2733

2834
### Lambda Layer
2935

0 commit comments

Comments
 (0)