Skip to content

feat(batch): add async_batch_processor for concurrent processing #1724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions aws_lambda_powertools/utilities/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@

from aws_lambda_powertools.utilities.batch.base import (
BasePartialProcessor,
BasePartialBatchProcessor,
BatchProcessor,
AsyncBatchProcessor,
EventType,
FailureResponse,
SuccessResponse,
batch_processor,
async_batch_processor,
)
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo

__all__ = (
"BatchProcessor",
"AsyncBatchProcessor",
"BasePartialProcessor",
"BasePartialBatchProcessor",
"ExceptionInfo",
"EventType",
"FailureResponse",
"SuccessResponse",
"batch_processor",
"async_batch_processor",
)
234 changes: 165 additions & 69 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
"""
Batch processing utilities
"""
import asyncio
import copy
import inspect
import logging
import sys
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload, Awaitable

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.batch.exceptions import (
Expand Down Expand Up @@ -100,6 +101,26 @@ def process(self) -> List[Tuple]:
"""
return [self._process_record(record) for record in self.records]

@abstractmethod
async def _async_process_record(self, record: dict):
"""
Async process record with handler.
"""
raise NotImplementedError()

def async_process(self) -> List[Tuple]:
"""
Async call instance's handler for each record.
"""

async def async_process():
return list(await asyncio.gather(*[self._async_process_record(record) for record in self.records]))

# WARNING: Do not use "asyncio.run(async_process())", there are cases in which for some reason the main
# loop closes while using, causing the error "Event Loop is closed"
loop = asyncio.get_event_loop()
return loop.run_until_complete(async_process())

def __enter__(self):
self._prepare()
return self
Expand Down Expand Up @@ -191,53 +212,7 @@ def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
return entry


@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BasePartialProcessor
):
"""
Middleware to handle batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: LambdaContext
Lambda's Context
record_handler: Callable
Callable to process each record from the batch
processor: BasePartialProcessor
Batch Processor to handle partial failure cases

Examples
--------
**Processes Lambda's event with a BasePartialProcessor**

>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors

"""
records = event["Records"]

with processor(records, record_handler, lambda_context=context):
processor.process()

return handler(event, context)


class BatchProcessor(BasePartialProcessor):
class BasePartialBatchProcessor(BasePartialProcessor): # noqa
"""Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.


Expand Down Expand Up @@ -392,26 +367,6 @@ def _prepare(self):
self.exceptions.clear()
self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE)

def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler

Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
if self._handler_accepts_lambda_context:
result = self.handler(record=data, lambda_context=self.lambda_context)
else:
result = self.handler(record=data)

return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())

def _clean(self):
"""
Report messages to be deleted in case of partial failure.
Expand All @@ -423,7 +378,7 @@ def _clean(self):
if self._entire_batch_failed():
raise BatchProcessingError(
msg=f"All records failed processing. {len(self.exceptions)} individual errors logged "
f"separately below.",
f"separately below.",
child_exceptions=self.exceptions,
)

Expand Down Expand Up @@ -481,3 +436,144 @@ def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["B
if model is not None:
return model.parse_obj(record)
return self._DATA_CLASS_MAPPING[event_type](record)


class BatchProcessor(BasePartialBatchProcessor): # Keep old name for compatibility
async def _async_process_record(self, record: dict):
raise NotImplementedError()

def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler

Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
if self._handler_accepts_lambda_context:
result = self.handler(record=data, lambda_context=self.lambda_context)
else:
result = self.handler(record=data)

return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())


@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor
):
"""
Middleware to handle batch event processing

Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: LambdaContext
Lambda's Context
record_handler: Callable
Callable or corutine to process each record from the batch
processor: BatchProcessor
Batch Processor to handle partial failure cases

Examples
--------
**Processes Lambda's event with a BasePartialProcessor**

>>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor
>>>
>>> def record_handler(record):
>>> return record["body"]
>>>
>>> @batch_processor(record_handler=record_handler, processor=BatchProcessor())
>>> def handler(event, context):
>>> return {"StatusCode": 200}

Limitations
-----------
* Async batch processors. Use `async_batch_processor` instead.
"""
records = event["Records"]

with processor(records, record_handler, lambda_context=context):
processor.process()

return handler(event, context)


class AsyncBatchProcessor(BasePartialBatchProcessor):

def _process_record(self, record: dict):
raise NotImplementedError()

async def _async_process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler

Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
if self._handler_accepts_lambda_context:
result = await self.handler(record=data, lambda_context=self.lambda_context)
else:
result = await self.handler(record=data)

return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())


@lambda_handler_decorator
def async_batch_processor(
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable[..., Awaitable[Any]], processor: AsyncBatchProcessor
):
"""
Middleware to handle batch event processing
Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: LambdaContext
Lambda's Context
record_handler: Callable[..., Awaitable[Any]]
Callable to process each record from the batch
processor: AsyncBatchProcessor
Batch Processor to handle partial failure cases
Examples
--------
**Processes Lambda's event with a BasePartialProcessor**
>>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor
>>>
>>> async def async_record_handler(record):
>>> payload: str = record.body
>>> return payload
>>>
>>> processor = AsyncBatchProcessor(event_type=EventType.SQS)
>>>
>>> @async_batch_processor(record_handler=async_record_handler, processor=processor)
>>> async def lambda_handler(event, context: LambdaContext):
>>> return processor.response()

Limitations
-----------
* Sync batch processors. Use `batch_processor` instead.
"""
records = event["Records"]

with processor(records, record_handler, lambda_context=context):
processor.async_process()

return handler(event, context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from aws_lambda_powertools.utilities.batch import async_batch_processor, EventType, AsyncBatchProcessor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


async def async_record_handler(record: SQSRecord):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np: what would be a realistic example here? maybe an async crawler?

As part of docs refactoring, we haven't reached Batch yet, we're trying to include more complete examples.

Example: https://awslabs.github.io/aws-lambda-powertools-python/2.4.0/core/tracer/#ignoring-certain-http-endpoints

"""
Process here each record
"""
payload: str = record.body
if not payload:
raise ValueError
# code code code


processor = AsyncBatchProcessor(event_type=EventType.SQS)


@async_batch_processor(record_handler=async_record_handler, processor=processor)
async def lambda_handler(event, context: LambdaContext):
return processor.response()
21 changes: 21 additions & 0 deletions examples/batch_processing/src/getting_started_batch_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from aws_lambda_powertools.utilities.batch import EventType, batch_processor, BatchProcessor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


def record_handler(record: SQSRecord):
"""
Process here each record
"""
payload: str = record.body
if not payload:
raise ValueError
# code code code


processor = BatchProcessor(event_type=EventType.SQS)


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
Loading