-
Notifications
You must be signed in to change notification settings - Fork 421
Feature request: Async BatchProcessor (use case: slow processing of each item) #1708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks for opening your first issue here! We'll come back to you as soon as we can. |
This is awesome @BakasuraRCE I was secretly waiting for a customer to ask for async. Would you be up to contribute as a PR? I can help with docs and improve UX.
UPDATE: I take it back - this will cause troubles on strict-typing later + when using stacked decorators like making a record handler Idempotency -- separate class it is. I also like having a separate class due to coloured functions and room to grow any async processors. I'd interested to hear your thoughts |
Great! I also think that it is best to separate Sync from Async to be clear, however, the logic of idempotence in the BatchProcessor can be reused. I also thought something more beautiful to use an asynchronous handler, allow me to show you a sketch of the code, tell me what you think: Async main handler decorator: import asyncio
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
@lambda_handler_decorator
def async_lambda_handler(handler, event, context):
return asyncio.run(handler(event, context)) aws_lambda_powertools/utilities/batch/base.py class BasePartialProcessor(ABC):
# bla bla, same code WITHOUT process and _process_record methods
class SyncBasePartialProcessor(BasePartialProcessor):
@abstractmethod
def _process_record(self, record: dict):
"""
Process record with handler.
"""
raise NotImplementedError()
def process(self) -> List[Tuple]:
"""
Call instance's handler for each record.
"""
return [self._process_record(record) for record in self.records]
class AsyncBasePartialProcessor(BasePartialProcessor):
@abstractmethod
async def _async_process_record(self, record: dict):
"""
Process record with handler.
"""
raise NotImplementedError()
async def async_process(self) -> List[Tuple]:
"""
Call instance's handler for each record.
"""
return list(await asyncio.gather(*[self._async_process_record(record) for record in self.records]))
@lambda_handler_decorator
def batch_processor(
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: SyncBasePartialProcessor
):
# code code
class BaseBatchProcessorMixin(BasePartialProcessor):
# same code from BatchProcessor WITHOUT _process_record method
class BatchProcessor(SyncBasePartialProcessor, BaseBatchProcessorMixin): # Keep old name for compatibility
# only include sync _process_record
def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler
Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
if self._handler_accepts_lambda_context:
result = self.handler(record=data, lambda_context=self.lambda_context)
else:
result = self.handler(record=data)
return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())
@lambda_handler_decorator
async def async_batch_processor(
handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: AsyncBasePartialProcessor
):
records = event["Records"]
with processor(records, record_handler, lambda_context=context):
await processor.async_process()
return await handler(event, context)
class AsyncBatchProcessor(AsyncBasePartialProcessor, BaseBatchProcessorMixin):
async def _async_process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler
Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
if self._handler_accepts_lambda_context:
result = await self.handler(record=data, lambda_context=self.lambda_context)
else:
result = await self.handler(record=data)
return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info()) lambda_function.py # imports...
async def async_record_handler(record: SQSRecord):
"""
Process here each record
"""
payload: str = record.body
if not payload:
raise ValueError
# code code code
processor = AsyncBatchProcessor(event_type=EventType.SQS)
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@async_lambda_handler
@async_batch_processor(record_handler=async_record_handler, processor=processor)
async def lambda_handler(event, context: LambdaContext):
return processor.response() |
That's quite interesting!!!! Back then, I had issues with async lambda handler as I noticed most customers had issues with remembering the order of decorators -- some tests could help it as I don't yet have advanced knowledge on asyncio. At a first glance, the only challenge I see here is the use of Mixins - multiple inheritance doesn't work nicely with compilation (we plan to use Mypyc). It has a small performance overhead due to super() too (MRO to an extent).. but truly is microseconds. If you could create a PR with an example how customers could use and/or test, I'd love to include it for this week's release - no rush if you can't too. |
Thank you so much for the PR @BakasuraRCE <3. I've changed the status to I'll make some suggestions and contribute code as soon as I'm able! |
Made comments in the PR. With your permission @BakasuraRCE, I can help expedite by pushing changes to your fork |
Please check my comment in the PR @heitorlessa :) |
hey @BakasuraRCE following up on this as Ruben is on personal leave since last week. We'll defer this feature to January when all maintainers are back. I'll make direct comments in the PR on what's pending for merge to accelerate review and amendments next month. If we don't speak again, have a great holiday/Xmas |
hey @heitorlessa ! Hard weeks, here we are in the close of projects and holidays, I did a bit changes to the code for your review in January. Merry Christmas! |
Updating here too - we're looking to release this by EOW in 2.8.0 |
|
Use case
I would like to process asynchronically the messages that reach a BatchProcessor, sometimes process messages from an SQS queue depends on HTTP calls or similar that can take some time, if all are done at the same time, it would not have an accumulated delay.
Solution/User Experience
I have made a slight hack to allow this, extending the BatchProcessor class:
and the main code to run:
Alternative solutions
No response
Acknowledgment
The text was updated successfully, but these errors were encountered: