Skip to content

Commit 3be8d03

Browse files
authored
Merge pull request #155 from awslabs/docs/batch_processing_util
fix: batch processing util
2 parents 9c09099 + 285054d commit 3be8d03

File tree

8 files changed

+435
-126
lines changed

8 files changed

+435
-126
lines changed

aws_lambda_powertools/utilities/batch/__init__.py

+3-8
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@
44
Batch processing utility
55
"""
66

7-
from .base import BasePartialProcessor
8-
from .middlewares import batch_processor
9-
from .sqs import PartialSQSProcessor
7+
from .base import BasePartialProcessor, batch_processor
8+
from .sqs import PartialSQSProcessor, sqs_batch_processor
109

11-
__all__ = (
12-
"BasePartialProcessor",
13-
"PartialSQSProcessor",
14-
"batch_processor",
15-
)
10+
__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor")

aws_lambda_powertools/utilities/batch/base.py

+54-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44
Batch processing utilities
55
"""
66

7+
import logging
78
from abc import ABC, abstractmethod
8-
from typing import Any, Callable, Iterable, List, Tuple
9+
from typing import Any, Callable, Dict, Iterable, List, Tuple
10+
11+
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
12+
13+
logger = logging.getLogger(__name__)
914

1015

1116
class BasePartialProcessor(ABC):
@@ -16,6 +21,7 @@ class BasePartialProcessor(ABC):
1621
def __init__(self):
1722
self.success_messages: List = []
1823
self.fail_messages: List = []
24+
self.exceptions: List = []
1925

2026
@abstractmethod
2127
def _prepare(self):
@@ -89,5 +95,52 @@ def failure_handler(self, record: Any, exception: Exception):
8995
"fail", exceptions args, original record
9096
"""
9197
entry = ("fail", exception.args, record)
98+
logger.debug("Record processing exception: ", exception)
99+
self.exceptions.append(exception)
92100
self.fail_messages.append(record)
93101
return entry
102+
103+
104+
@lambda_handler_decorator
105+
def batch_processor(
106+
handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None
107+
):
108+
"""
109+
Middleware to handle batch event processing
110+
111+
Parameters
112+
----------
113+
handler: Callable
114+
Lambda's handler
115+
event: Dict
116+
Lambda's Event
117+
context: Dict
118+
Lambda's Context
119+
record_handler: Callable
120+
Callable to process each record from the batch
121+
processor: PartialSQSProcessor
122+
Batch Processor to handle partial failure cases
123+
124+
Examples
125+
--------
126+
**Processes Lambda's event with PartialSQSProcessor**
127+
>>> from aws_lambda_powertools.utilities.batch import batch_processor
128+
>>>
129+
>>> def record_handler(record):
130+
>>> return record["body"]
131+
>>>
132+
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor())
133+
>>> def handler(event, context):
134+
>>> return {"StatusCode": 200}
135+
136+
Limitations
137+
-----------
138+
* Async batch processors
139+
140+
"""
141+
records = event["Records"]
142+
143+
with processor(records, record_handler):
144+
processor.process()
145+
146+
return handler(event, context)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""
2+
Batch processing exceptions
3+
"""
4+
5+
6+
class SQSBatchProcessingError(Exception):
7+
"""When at least one message within a batch could not be processed"""

aws_lambda_powertools/utilities/batch/middlewares.py

-56
This file was deleted.

aws_lambda_powertools/utilities/batch/sqs.py

+82-7
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,34 @@
33
"""
44
Batch SQS utilities
55
"""
6-
from typing import List, Optional, Tuple
6+
import logging
7+
from typing import Callable, Dict, List, Optional, Tuple
78

89
import boto3
910
from botocore.config import Config
1011

12+
from ...middleware_factory import lambda_handler_decorator
1113
from .base import BasePartialProcessor
14+
from .exceptions import SQSBatchProcessingError
15+
16+
logger = logging.getLogger(__name__)
1217

1318

1419
class PartialSQSProcessor(BasePartialProcessor):
1520
"""
1621
Amazon SQS batch processor to delete successes from the Queue.
1722
18-
Only the **special** case of partial failure is handled, thus a batch in
19-
which all records failed is **not** going to be removed from the queue, and
20-
the same is valid for a full success.
23+
The whole batch will be processed, even if failures occur. After all records are processed,
24+
SQSBatchProcessingError will be raised if there were any failures, causing messages to
25+
be returned to the SQS queue. This behaviour can be disabled by passing suppress_exception.
2126
2227
Parameters
2328
----------
2429
config: Config
2530
botocore config object
31+
suppress_exception: bool, optional
32+
Supress exception raised if any messages fail processing, by default False
33+
2634
2735
Example
2836
-------
@@ -46,12 +54,13 @@ class PartialSQSProcessor(BasePartialProcessor):
4654
>>> return result
4755
"""
4856

49-
def __init__(self, config: Optional[Config] = None):
57+
def __init__(self, config: Optional[Config] = None, suppress_exception: bool = False):
5058
"""
5159
Initializes sqs client.
5260
"""
5361
config = config or Config()
5462
self.client = boto3.client("sqs", config=config)
63+
self.suppress_exception = suppress_exception
5564

5665
super().__init__()
5766

@@ -97,10 +106,76 @@ def _clean(self):
97106
"""
98107
Delete messages from Queue in case of partial failure.
99108
"""
100-
if not (self.fail_messages and self.success_messages):
109+
# If all messages were successful, fall back to the default SQS -
110+
# Lambda behaviour which deletes messages if Lambda responds successfully
111+
if not self.fail_messages:
112+
logger.debug(f"All {len(self.success_messages)} records successfully processed")
101113
return
102114

103115
queue_url = self._get_queue_url()
104116
entries_to_remove = self._get_entries_to_clean()
105117

106-
return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
118+
delete_message_response = self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove)
119+
120+
if self.suppress_exception:
121+
logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
122+
else:
123+
logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception")
124+
raise SQSBatchProcessingError(list(self.exceptions))
125+
126+
return delete_message_response
127+
128+
129+
@lambda_handler_decorator
130+
def sqs_batch_processor(
131+
handler: Callable,
132+
event: Dict,
133+
context: Dict,
134+
record_handler: Callable,
135+
config: Optional[Config] = None,
136+
suppress_exception: bool = False,
137+
):
138+
"""
139+
Middleware to handle SQS batch event processing
140+
141+
Parameters
142+
----------
143+
handler: Callable
144+
Lambda's handler
145+
event: Dict
146+
Lambda's Event
147+
context: Dict
148+
Lambda's Context
149+
record_handler: Callable
150+
Callable to process each record from the batch
151+
config: Config
152+
botocore config object
153+
suppress_exception: bool, optional
154+
Supress exception raised if any messages fail processing, by default False
155+
156+
Examples
157+
--------
158+
**Processes Lambda's event with PartialSQSProcessor**
159+
>>> from aws_lambda_powertools.utilities.batch import sqs_batch_processor
160+
>>>
161+
>>> def record_handler(record):
162+
>>> return record["body"]
163+
>>>
164+
>>> @sqs_batch_processor(record_handler=record_handler)
165+
>>> def handler(event, context):
166+
>>> return {"StatusCode": 200}
167+
168+
Limitations
169+
-----------
170+
* Async batch processors
171+
172+
"""
173+
config = config or Config()
174+
processor = PartialSQSProcessor(config=config, suppress_exception=suppress_exception)
175+
176+
records = event["Records"]
177+
178+
with processor(records, record_handler):
179+
processor.process()
180+
181+
return handler(event, context)

0 commit comments

Comments
 (0)