-
Notifications
You must be signed in to change notification settings - Fork 421
feat(batch): add async_batch_processor for concurrent processing #1724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): add async_batch_processor for concurrent processing #1724
Conversation
feat: implement async_lambda_handler decorator
Thanks a lot for your first contribution! Please check out our contributing guidelines and don't hesitate to ask whatever you need. |
I didn't know where to put the new asynchronous decorator, so I created some tests in asynchronous version, and examples, let me know your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you once again for the heavy lifting - I like the overall direction! It even brought up that we should (separate PR) have a way to use a ThreadPoolExecutor
as an opt-in feature like concurrent=True
for the synchronous processor.
Major comments summarized:
- Breaking change. Removing
process()
fromBasePartialProcessor
breaks custom processors. Turns out (TIL) we don't have a test for it hence why it wasn't caught up earlier. - Let's keep Lambda handler synchronous. We can prevent a call stack slowdown, make
async_lambda_handler
redundant, and prevent a class of unforeseen issues for customers stacking decorators of different colours (sync/async) since ordering matters. - Single inheritance only. As I mentioned earlier, we can't use multiple inheritance as that will impact our ability to use Mypyc later.
Codecov ReportBase: 97.52% // Head: 97.44% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## develop #1724 +/- ##
===========================================
- Coverage 97.52% 97.44% -0.08%
===========================================
Files 143 143
Lines 6573 6606 +33
Branches 468 471 +3
===========================================
+ Hits 6410 6437 +27
- Misses 128 132 +4
- Partials 35 37 +2
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
fix: remove async_lambda_handler
Hey!
|
I'm glad you called it out - thank you for keeping us honest ;) Answering them
I couldn't agree more. Because we optimize to run in a constrained environment (almost like embedded), we have to make these decisions which would be sub-optimal in different contexts. We're trading a more beautiful implementation with future speed gains that could save $ for customers.
Mind sharing what the big problems were so we can learn from it? I'm new to asyncio in Python from a library authorship point of view - if there's a balance we can strike to avoid decorator stacking confusion and increased call stack while avoiding these scars you've earned, it'd be lovely!
I worded that poorly. What I meant for Since |
It is something that I still can't explain well, I have worked for a good time with this function in docker environments without memory or disk limitations and always following the documentation: However, in the AWS Lambda environment, even with high limits, it simply fails when for some reason an unexpected error occurs(prematurely closing the main loop without allowing the error to spread up). Imagine an error However, python's recommendation is to use
Exactly, it is a matter of being explicit and not giving rise to confusion, it is also due to maintenance of the code, imagine in the future for some reason processing messages using the class The I'm sorry for not having a more technical detail about asyncio, I wait for your thoughts :) |
Aaah, that's highly likely due to the nature of container/process freezing in Lambda. There are similar side effects in NodeJS too. Agreed on the unforeseeable nature of I'm going to re:Invent today and will be mostly away for a week, feel free to reach out to @rubenfonseca for anything. We can help write the docs, do a thorough review, and contribute any possible gaps to make this quicker. Thank you so much again for helping out with this (Gracias!!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last comments before winter holidays.
In summary:
- We need a summary of the changes in the PR body. This helps future maintainers/contributors look up the reason for certain changes and for reviewer's understanding
- Made comments on doc strings. It will likely be foreign to other maintainers on the async closure so a more complete docstring helps remove ambiguity.
- We need a more realistic async batch example. This will ease writing docs and better emphasize the usefulness of this feature for other customers ;)
- [Optional] We need a test to prevent custom processor regression. Earlier we manually caught that custom processors use methods that were removed as part of the initial PR. I'm pretty sure we will get sidetracked and not add after this PR is merged, so let's add it - whoever has the bandwidth in January.
Thank you so much again
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
async def async_record_handler(record: SQSRecord): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np: what would be a realistic example here? maybe an async crawler?
As part of docs refactoring, we haven't reached Batch yet, we're trying to include more complete examples.
Co-authored-by: Heitor Lessa <[email protected]> Signed-off-by: Bakasura <[email protected]>
Back from holidays and our team offsite ends this week. I'll resume this PR next week so we can get it merged for our next release ;-) Hope you had a great end of the year, and look forward to getting this to the finish line! Thank you for the prolonged patience |
* develop: (24 commits) chore(deps): bump docker/setup-buildx-action from 2.4.0 to 2.4.1 (aws-powertools#1903) chore(deps-dev): bump aws-cdk-lib from 2.63.0 to 2.63.2 (aws-powertools#1904) update changelog with latest changes docs(idempotency): add IAM permissions section (aws-powertools#1902) chore(deps-dev): bump mkdocs-material from 9.0.10 to 9.0.11 (aws-powertools#1896) chore(deps-dev): bump mypy-boto3-appconfig from 1.26.0.post1 to 1.26.63 (aws-powertools#1895) chore(maintainers): fix release workflow rename update changelog with latest changes docs(homepage): set url for end-of-support in announce block (aws-powertools#1893) chore(deps-dev): bump mkdocs-material from 9.0.9 to 9.0.10 (aws-powertools#1888) chore(deps-dev): bump mypy-boto3-s3 from 1.26.58 to 1.26.62 (aws-powertools#1889) chore(deps-dev): bump black from 22.12.0 to 23.1.0 (aws-powertools#1886) chore(deps-dev): bump aws-cdk-lib from 2.62.2 to 2.63.0 (aws-powertools#1887) update changelog with latest changes feat(metrics): add default_dimensions to single_metric (aws-powertools#1880) chore: update v2 layer ARN on documentation bump version to 2.7.1 update changelog with latest changes docs(homepage): add banner for end-of-support v1 (aws-powertools#1879) fix(license): correction to MIT + MIT-0 (no proprietary anymore) (aws-powertools#1883) ...
Pushed some changes to improve maintenance. Next: I'll create a summary of the changes in the PR before refactoring the doc examples, and create docs for this. If nothing else drags me over meetings and such, this should be in the next release 2.8.0 ;) |
Pull request description changed to reflect all changes, and to make it easier for maintainers to reason what, why, and the effect of these changes. Added two mermaid diagrams to more easily grasp it as a summary Before classDiagram
class BasePartialProcessor {
<<interface>>
+success_handler(record, result) SuccessResponse
+failure_handler(record, exception) FailureResponse
+process()
+_prepare()
+_clean()
+_process_record_(record)
+_async_process() List~Tuple~
+__enter__()
+__exit__()
+__call__(records, handler, lambda_context)
}
class BatchProcessor {
+Dict batch_response
+response() Dict
+_process_record()
+_prepare()
+_clean()
}
BasePartialProcessor <|-- BatchProcessor : implement
note for BatchProcessor "implements entire interface and logic for batch processing"
New classDiagram
class BasePartialProcessor {
<<interface>>
+success_handler(record, result) SuccessResponse
+failure_handler(record, exception) FailureResponse
+process()
+_prepare()
+_clean()
+_process_record_(record)
+_async_process_record_(record)
+_async_process() List~Tuple~
+__enter__()
+__exit__()
+__call__(records, handler, lambda_context)
}
class BasePartialBatchProcessor {
+Dict batch_response
+response() Dict
+_prepare()
+_clean()
}
class BatchProcessor {
+_process_record()
+_async_process_record() NotImplementedError
}
class AsyncBatchProcessor {
+_process_record() NotImplementedError
+_async_process_record()
}
BasePartialProcessor <|-- BasePartialBatchProcessor : implement
BasePartialBatchProcessor <|-- BatchProcessor : inherit
BasePartialBatchProcessor <|-- AsyncBatchProcessor : inherit
note for BasePartialBatchProcessor "implements shared logic for batch processing used for sync and async processing"
note for BatchProcessor "only implements logic to call customers' record handler synchronously"
note for AsyncBatchProcessor "only implements logic to call customers' record handler asynchronously"
|
ALL DONE 🎉 🚀 @BakasuraRCE -- I'll wait until Thursday for any last comments before we merge and push a new release ;) I wanted to thank you one more time for this big effort. Since you last touched, I've made the following changes:
|
@BakasuraRCE Hey, I'm looking into writing a load test for this today/tomorrow to check perf :) |
I created a simple load test. I created a SQS queue containing 5500 messages. When all of the messages were in the queue, I set the event source of my lambda to enabled. My event source had the following config:
I ran two versions of the function (sync and async using the code in this PR). The sync version took on average 1.2 seconds, the async version took around 300ms. It’s a vast improvement. The function setup:
|
Awesome work, congrats on your first merged pull request and thank you for helping improve everyone's experience! |
Issue number: #1708
Summary
Add new
AsyncBatchProcessor
andasync_batch_processor
decorator to support async batch processing for a customer's record handler.Before
New
Changes
New
AsyncBatchProcessor class
New class to encapsulate async processing handlers by implementing
_async_process_record
interface. Same logic asBatchProcessor
but async. It expects record handlers to be async functions now.async_batch_processor decorator
New decorator that instantiates
AsyncBatchProcessor
, also sync to prevent forcing customers to make their Handlers async causing side effects to existing middlewares.AsyncBatchProcessor
handles sync->async operation transparently via closures.BasePartialBatchProcessor class
Contains shared logic for all processors be synchronous or asynchronous. By shared logic, this means success handler, failure handler, message ID collection from SQS, Kinesis, DynamoDB Streams, etc.
For a future major version, we can further decompose by creating more specialized classes, for example
KinesisStreamsBatchProcessor
. This will become more relevant when we support beyond Lambda runtime, where their logic and debugging will benefit from a clear separation.Async methods and ABCs
BasePartialProcessor
base class add a new methodasync_process
and a new async ABCasync_process_record()
.def async_process()
. Synchronous function with an async closure to process records asynchronously. Despite the name, its outer function is sync to prevent forcing customers to make their Handlers async, otherwise it'd impact existing middlewares we're unaware of.async def _async_process_record()
. Async version of_process_record
abstract methodMajor changes
BatchProcessor
We move all logic to
BasePartialBatchProcessor
to make it easier to split Sync vs Async processors.Both processors
BatchProcessor
andAsyncBatchProcessor
have the same logic, except one handles asyncIO when calling an async record handler.For a future where we want to support a threaded version, we can create a specialized
ThreadedBatchProcessor
class, inheritBasePartialBatchProcessor
, and implement_process_record
.User experience
Checklist
If your change doesn't seem to apply, please leave them unchecked.
Is this a breaking change?
RFC issue number:
Checklist:
Acknowledgment
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.