Skip to content

Commit 60d0363

Browse files
authored
feat(batch): reduce boilerplate with process_partial_response (#2090)
1 parent c3e25d6 commit 60d0363

14 files changed

+589
-201
lines changed

Diff for: aws_lambda_powertools/utilities/batch/__init__.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
EventType,
1313
FailureResponse,
1414
SuccessResponse,
15+
)
16+
from aws_lambda_powertools.utilities.batch.decorators import (
1517
async_batch_processor,
18+
async_process_partial_response,
1619
batch_processor,
20+
process_partial_response,
1721
)
1822
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
1923
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
@@ -22,6 +26,10 @@
2226
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels
2327

2428
__all__ = (
29+
"async_batch_processor",
30+
"async_process_partial_response",
31+
"batch_processor",
32+
"process_partial_response",
2533
"BatchProcessor",
2634
"AsyncBatchProcessor",
2735
"BasePartialProcessor",
@@ -32,6 +40,4 @@
3240
"FailureResponse",
3341
"SuccessResponse",
3442
"SqsFifoPartialProcessor",
35-
"batch_processor",
36-
"async_batch_processor",
3743
)

Diff for: aws_lambda_powertools/utilities/batch/base.py

+1-106
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,8 @@
1111
import sys
1212
from abc import ABC, abstractmethod
1313
from enum import Enum
14-
from typing import (
15-
Any,
16-
Awaitable,
17-
Callable,
18-
Dict,
19-
List,
20-
Optional,
21-
Tuple,
22-
Union,
23-
overload,
24-
)
14+
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload
2515

26-
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
2716
from aws_lambda_powertools.shared import constants
2817
from aws_lambda_powertools.utilities.batch.exceptions import (
2918
BatchProcessingError,
@@ -513,51 +502,6 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons
513502
return self.failure_handler(record=data, exception=sys.exc_info())
514503

515504

516-
@lambda_handler_decorator
517-
def batch_processor(
518-
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor
519-
):
520-
"""
521-
Middleware to handle batch event processing
522-
523-
Parameters
524-
----------
525-
handler: Callable
526-
Lambda's handler
527-
event: Dict
528-
Lambda's Event
529-
context: LambdaContext
530-
Lambda's Context
531-
record_handler: Callable
532-
Callable or corutine to process each record from the batch
533-
processor: BatchProcessor
534-
Batch Processor to handle partial failure cases
535-
536-
Examples
537-
--------
538-
**Processes Lambda's event with a BasePartialProcessor**
539-
540-
>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
541-
>>>
542-
>>> def record_handler(record):
543-
>>> return record["body"]
544-
>>>
545-
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
546-
>>> def handler(event, context):
547-
>>> return {"StatusCode": 200}
548-
549-
Limitations
550-
-----------
551-
* Async batch processors. Use `async_batch_processor` instead.
552-
"""
553-
records = event["Records"]
554-
555-
with processor(records, record_handler, lambda_context=context):
556-
processor.process()
557-
558-
return handler(event, context)
559-
560-
561505
class AsyncBatchProcessor(BasePartialBatchProcessor):
562506
"""Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously.
563507
@@ -694,52 +638,3 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa
694638
return self._register_model_validation_error_record(record)
695639
except Exception:
696640
return self.failure_handler(record=data, exception=sys.exc_info())
697-
698-
699-
@lambda_handler_decorator
700-
def async_batch_processor(
701-
handler: Callable,
702-
event: Dict,
703-
context: LambdaContext,
704-
record_handler: Callable[..., Awaitable[Any]],
705-
processor: AsyncBatchProcessor,
706-
):
707-
"""
708-
Middleware to handle batch event processing
709-
Parameters
710-
----------
711-
handler: Callable
712-
Lambda's handler
713-
event: Dict
714-
Lambda's Event
715-
context: LambdaContext
716-
Lambda's Context
717-
record_handler: Callable[..., Awaitable[Any]]
718-
Callable to process each record from the batch
719-
processor: AsyncBatchProcessor
720-
Batch Processor to handle partial failure cases
721-
Examples
722-
--------
723-
**Processes Lambda's event with a BasePartialProcessor**
724-
>>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
725-
>>>
726-
>>> async def async_record_handler(record):
727-
>>> payload: str = record.body
728-
>>> return payload
729-
>>>
730-
>>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
731-
>>>
732-
>>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
733-
>>> async def lambda_handler(event, context: LambdaContext):
734-
>>> return processor.response()
735-
736-
Limitations
737-
-----------
738-
* Sync batch processors. Use `batch_processor` instead.
739-
"""
740-
records = event["Records"]
741-
742-
with processor(records, record_handler, lambda_context=context):
743-
processor.async_process()
744-
745-
return handler(event, context)

0 commit comments

Comments
 (0)