Skip to content

Feature request: Async BatchProcessor (use case: slow processing of each item) #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 of 2 tasks
BakasuraRCE opened this issue Nov 11, 2022 · 11 comments · Fixed by #1724
Closed
1 of 2 tasks

Feature request: Async BatchProcessor (use case: slow processing of each item) #1708

BakasuraRCE opened this issue Nov 11, 2022 · 11 comments · Fixed by #1724
Assignees
Labels
batch Batch processing utility feature-request feature request

Comments

@BakasuraRCE
Copy link
Contributor

BakasuraRCE commented Nov 11, 2022

Use case

I would like to process asynchronically the messages that reach a BatchProcessor, sometimes process messages from an SQS queue depends on HTTP calls or similar that can take some time, if all are done at the same time, it would not have an accumulated delay.

Solution/User Experience

I have made a slight hack to allow this, extending the BatchProcessor class:

import asyncio
import sys
from typing import List, Tuple, Union

from aws_lambda_powertools.utilities.batch import BatchProcessor, SuccessResponse, FailureResponse


class AsyncBatchProcessor(BatchProcessor):

    async def _aprocess_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
        try:
            if self._handler_accepts_lambda_context:
                result = await self.handler(record=data, lambda_context=self.lambda_context)
            else:
                result = await self.handler(record=data)

            return self.success_handler(record=record, result=result)
        except Exception:
            return self.failure_handler(record=data, exception=sys.exc_info())

    async def aprocess(self) -> List[Tuple]:
        return list(await asyncio.gather(*[self._aprocess_record(record) for record in self.records]))

and the main code to run:

import asyncio
import json

from aws_lambda_powertools.utilities.batch import EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

from aws_lambda_powertools import Tracer, Logger
from .async_batch_preprocessor import AsyncBatchProcessor

tracer = Tracer()
logger = Logger()

aprocessor = AsyncBatchProcessor(event_type=EventType.SQS)

a = 1


async def record_handler(record: SQSRecord):
    global a
    """
    Process here each record
    """
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
        print(item)
        a += 1
        print(f'sleeping for {a} s')
        await asyncio.sleep(a)
        print('awaited!')
        # code code code...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return asyncio.run(alambda_handler(event, context))


async def alambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with aprocessor(records=batch, handler=record_handler):
        await aprocessor.aprocess()  # kick off processing, return list[tuple]
    return aprocessor.response()

Alternative solutions

No response

Acknowledgment

@BakasuraRCE BakasuraRCE added feature-request feature request triage Pending triage from maintainers labels Nov 11, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 11, 2022

Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #python channel on our AWS Lambda Powertools Discord: Invite link

@heitorlessa
Copy link
Contributor

heitorlessa commented Nov 11, 2022

This is awesome @BakasuraRCE I was secretly waiting for a customer to ask for async.

Would you be up to contribute as a PR? I can help with docs and improve UX.

I wonder if there are any other operations that might block the event loop if we were to have the async part in the same BatchProcessor (iscoroutine...)

UPDATE: I take it back - this will cause troubles on strict-typing later + when using stacked decorators like making a record handler Idempotency -- separate class it is.

I also like having a separate class due to coloured functions and room to grow any async processors.

I'd interested to hear your thoughts

@heitorlessa heitorlessa removed the triage Pending triage from maintainers label Nov 11, 2022
@BakasuraRCE
Copy link
Contributor Author

Great!

I also think that it is best to separate Sync from Async to be clear, however, the logic of idempotence in the BatchProcessor can be reused.

I also thought something more beautiful to use an asynchronous handler, allow me to show you a sketch of the code, tell me what you think:


Async main handler decorator:

import asyncio

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator


@lambda_handler_decorator
def async_lambda_handler(handler, event, context):
    return asyncio.run(handler(event, context))

aws_lambda_powertools/utilities/batch/base.py

class BasePartialProcessor(ABC):
  # bla bla, same code WITHOUT process and _process_record methods

class SyncBasePartialProcessor(BasePartialProcessor):
    @abstractmethod
    def _process_record(self, record: dict):
        """
        Process record with handler.
        """
        raise NotImplementedError()

    def process(self) -> List[Tuple]:
        """
        Call instance's handler for each record.
        """
        return [self._process_record(record) for record in self.records]


class AsyncBasePartialProcessor(BasePartialProcessor):
    @abstractmethod
    async def _async_process_record(self, record: dict):
        """
        Process record with handler.
        """
        raise NotImplementedError()

    async def async_process(self) -> List[Tuple]:
        """
        Call instance's handler for each record.
        """
        return list(await asyncio.gather(*[self._async_process_record(record) for record in self.records]))

@lambda_handler_decorator
def batch_processor(
    handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: SyncBasePartialProcessor
):
  # code code

class BaseBatchProcessorMixin(BasePartialProcessor):
  # same code from BatchProcessor WITHOUT _process_record method

class BatchProcessor(SyncBasePartialProcessor, BaseBatchProcessorMixin): # Keep old name for compatibility
    # only include sync _process_record
    def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
        try:
            if self._handler_accepts_lambda_context:
                result = self.handler(record=data, lambda_context=self.lambda_context)
            else:
                result = self.handler(record=data)

            return self.success_handler(record=record, result=result)
        except Exception:
            return self.failure_handler(record=data, exception=sys.exc_info())

@lambda_handler_decorator
async def async_batch_processor(
        handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: AsyncBasePartialProcessor
):
    records = event["Records"]

    with processor(records, record_handler, lambda_context=context):
        await processor.async_process()

    return await handler(event, context)


class AsyncBatchProcessor(AsyncBasePartialProcessor, BaseBatchProcessorMixin):

    async def _async_process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
        try:
            if self._handler_accepts_lambda_context:
                result = await self.handler(record=data, lambda_context=self.lambda_context)
            else:
                result = await self.handler(record=data)

            return self.success_handler(record=record, result=result)
        except Exception:
            return self.failure_handler(record=data, exception=sys.exc_info())

lambda_function.py

# imports...
async def async_record_handler(record: SQSRecord):
    """
    Process here each record
    """
    payload: str = record.body
    if not payload:
        raise ValueError
    # code code code

processor = AsyncBatchProcessor(event_type=EventType.SQS)


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@async_lambda_handler
@async_batch_processor(record_handler=async_record_handler, processor=processor)
async def lambda_handler(event, context: LambdaContext):
    return processor.response()

@heitorlessa
Copy link
Contributor

That's quite interesting!!!! Back then, I had issues with async lambda handler as I noticed most customers had issues with remembering the order of decorators -- some tests could help it as I don't yet have advanced knowledge on asyncio.

At a first glance, the only challenge I see here is the use of Mixins - multiple inheritance doesn't work nicely with compilation (we plan to use Mypyc). It has a small performance overhead due to super() too (MRO to an extent).. but truly is microseconds.

If you could create a PR with an example how customers could use and/or test, I'd love to include it for this week's release - no rush if you can't too.

@heitorlessa
Copy link
Contributor

Thank you so much for the PR @BakasuraRCE <3. I've changed the status to Next iteration to make sure we prioritize after this week's release - we'll only prioritize Feature Flags, this feature, and an upcoming S3 Streaming one.

I'll make some suggestions and contribute code as soon as I'm able!

@heitorlessa heitorlessa self-assigned this Nov 17, 2022
@heitorlessa
Copy link
Contributor

Made comments in the PR. With your permission @BakasuraRCE, I can help expedite by pushing changes to your fork

@BakasuraRCE
Copy link
Contributor Author

Please check my comment in the PR @heitorlessa :)

@heitorlessa
Copy link
Contributor

hey @BakasuraRCE following up on this as Ruben is on personal leave since last week. We'll defer this feature to January when all maintainers are back.

I'll make direct comments in the PR on what's pending for merge to accelerate review and amendments next month.

If we don't speak again, have a great holiday/Xmas

@BakasuraRCE
Copy link
Contributor Author

hey @heitorlessa !

Hard weeks, here we are in the close of projects and holidays, I did a bit changes to the code for your review in January.

Merry Christmas!

@heitorlessa heitorlessa added the batch Batch processing utility label Jan 19, 2023
@heitorlessa heitorlessa linked a pull request Feb 7, 2023 that will close this issue
7 tasks
@heitorlessa
Copy link
Contributor

Updating here too - we're looking to release this by EOW in 2.8.0

#1724 (comment)

@github-actions
Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch Batch processing utility feature-request feature request
Projects
None yet
4 participants