Skip to content

Commit 42cda1f

Browse files
committed
feat: add async_process_partial_response
1 parent ee05373 commit 42cda1f

File tree

3 files changed

+100
-1
lines changed

3 files changed

+100
-1
lines changed

aws_lambda_powertools/utilities/batch/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from aws_lambda_powertools.utilities.batch.decorators import (
1717
async_batch_processor,
18+
async_process_partial_response,
1819
batch_processor,
1920
process_partial_response,
2021
)
@@ -26,6 +27,7 @@
2627

2728
__all__ = (
2829
"async_batch_processor",
30+
"async_process_partial_response",
2931
"batch_processor",
3032
"process_partial_response",
3133
"BatchProcessor",

aws_lambda_powertools/utilities/batch/decorators.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def async_batch_processor(
5454
>>> return payload
5555
>>>
5656
>>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
57-
>>> async def lambda_handler(event, context: LambdaContext):
57+
>>> def lambda_handler(event, context):
5858
>>> return processor.response()
5959
6060
Limitations
@@ -183,3 +183,67 @@ def handler(event, context):
183183
processor.process()
184184

185185
return processor.response()
186+
187+
188+
def async_process_partial_response(
189+
event: Dict,
190+
record_handler: Callable,
191+
processor: AsyncBatchProcessor,
192+
context: LambdaContext | None = None,
193+
) -> PartialItemFailureResponse:
194+
"""
195+
Higher level function to handle batch event processing asynchronously.
196+
197+
Parameters
198+
----------
199+
event: Dict
200+
Lambda's original event
201+
record_handler: Callable
202+
Callable to process each record from the batch
203+
processor: AsyncBatchProcessor
204+
Batch Processor to handle partial failure cases
205+
context: LambdaContext
206+
Lambda's context, used to optionally inject in record handler
207+
208+
Returns
209+
-------
210+
result: PartialItemFailureResponse
211+
Lambda Partial Batch Response
212+
213+
Examples
214+
--------
215+
**Processes Lambda's SQS event**
216+
217+
```python
218+
from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response
219+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
220+
221+
processor = BatchProcessor(EventType.SQS)
222+
223+
async def record_handler(record: SQSRecord):
224+
return record.body
225+
226+
def handler(event, context):
227+
return async_process_partial_response(
228+
event=event, record_handler=record_handler, processor=processor, context=context
229+
)
230+
```
231+
232+
Limitations
233+
-----------
234+
* Sync batch processors. Use `process_partial_response` instead.
235+
"""
236+
try:
237+
records: List[Dict] = event.get("Records", [])
238+
except AttributeError:
239+
event_types = ", ".join(list(EventType.__members__))
240+
docs = "https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
241+
raise ValueError(
242+
f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
243+
f"See sample events in our documentation for either {event_types}: \n {docs}"
244+
)
245+
246+
with processor(records, record_handler, context):
247+
processor.async_process()
248+
249+
return processor.response()

tests/functional/test_utilities_batch.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
EventType,
1313
SqsFifoPartialProcessor,
1414
async_batch_processor,
15+
async_process_partial_response,
1516
batch_processor,
1617
process_partial_response,
1718
)
@@ -808,3 +809,35 @@ def test_process_partial_response_invalid_input(record_handler: Callable, batch:
808809
# WHEN/THEN
809810
with pytest.raises(ValueError):
810811
process_partial_response(batch, record_handler, processor)
812+
813+
814+
def test_async_process_partial_response(sqs_event_factory, async_record_handler):
815+
# GIVEN
816+
records = [sqs_event_factory("success"), sqs_event_factory("success")]
817+
batch = {"Records": records}
818+
processor = AsyncBatchProcessor(event_type=EventType.SQS)
819+
820+
# WHEN
821+
ret = async_process_partial_response(batch, async_record_handler, processor)
822+
823+
# THEN
824+
assert ret == {"batchItemFailures": []}
825+
826+
827+
@pytest.mark.parametrize(
828+
"batch",
829+
[
830+
pytest.param(123456789, id="num"),
831+
pytest.param([], id="list"),
832+
pytest.param(False, id="bool"),
833+
pytest.param(object, id="object"),
834+
pytest.param(lambda x: x, id="callable"),
835+
],
836+
)
837+
def test_async_process_partial_response_invalid_input(async_record_handler: Callable, batch: Any):
838+
# GIVEN
839+
processor = AsyncBatchProcessor(event_type=EventType.SQS)
840+
841+
# WHEN/THEN
842+
with pytest.raises(ValueError):
843+
async_process_partial_response(batch, record_handler, processor)

0 commit comments

Comments
 (0)