-
Notifications
You must be signed in to change notification settings - Fork 421
Feature request: Concurrency control (avoidance of duplicated runs) #2023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
hey @pmarko1711 - thank you for a great feature request (details), I have an initial question: Have you considered Step Functions instead? I've seen this a number of times where a Semaphore pattern would work out great, but instead done in an upstream component - a workflow of sorts. I can guess a few reasons why you'd want to do in Lambda instead, so I'd love to hear more from you on why inside Lambda runtime vs an orchestration service like Step Functions. Hope that makes sense |
Hi @heitorlessa , many thanks for replying. To be honest, I did not notice this pattern, so many thanks for sharing. Indeed, it seems doable this way. Nevertheless; I still feel that in my case where I wanted to limit concurrency to 1 concurrent run (of the same payload of the same lambda; while allowing concurrent runs if different payload), it's just much easier with a decorator. Once I have it defined and I share it in my projects, activating such a concurrency control is just adding one or two lines of code. And I don't need to understand state machines (although those are arguably easier than a custom decorator :) ) Btw, my current logic looks this way: from collections import namedtuple
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
MutexSettings = namedtuple(
"MutexSettings",
(
"dyndb_resource", "dyndb_table",
"attribute_partition_key", "attribute_sort_key", "attribute_ttl", "ttl",
"attribute_mutex", # (optional) name of the mutex attribute
"mutex_locked", # (optional) value to indicate a locked mutex
"mutex_unlocked", # (optional) value to indicate an unlocked mutex
"suffix_partition_key", # (optional) suffix to add to the partition key values for mutex items
"suffix_sort_key", # (optional) suffix to add to the sort key values for mutex items
),
defaults=(
"mutex",
"LOCKED",
"UNLOCKED",
"",
"",
), # defaults are applied to the rightmost parameters
)
@lambda_handler_decorator()
def add_mutex(handler, event, context):
# MUTEX_SETTINGS - MutexSettings namedtuple instance
# here I also parse the event to get my partition_key and sort_key for which I want to
# to set the lock
# this could be replaced with something like idempontency_key
# try locking (this raises an exception if already locked)
# my own function that does a conditional update_item, setting a ttl item property
acquire_mutex(partition_key, sort_key, MUTEX_SETTINGS)
# run the handler and release the lock when the handler
# - succeeds, but also when it fails
try:
return_val = handler(event, context)
# my own function that releases the lock
release_mutex(partition_key, sort_key, MUTEX_SETTINGS)
return return_val
except Exception as err:
# my own function that releases the lock
release_mutex(partition_key, sort_key, MUTEX_SETTINGS)
raise err
@lambda_handler_decorator
def lambda_handler(event, context):
pass I can also provide my |
yeah I figured that was the reason. It's about trade-off at the end. In simplistic terms: one one hand, this makes Lambda responsible for many things and may go out of sync if that spans to other functions, while delegating to an orchestrator requires domain knowledge and additional failure modes to be aware of (but solves semantics, concurrent invocations etc). I'd love to hear whether this is useful for other customers too (I'll ping on Discord), so we can prioritize it. If anything, I'd love to find a better umbrella for this feature, so we could incrementally improve operational posture for customers - think retry mechanisms (a thin wrapper for tenacity), a circuit breaker, and whatnot. If we work on a I don't have any answers for now, just thinking out and loud. |
@pmarko1711 I'd love to have a call with you to learn a bit more your mutex use - I think there's a pattern across all new feature requests, RFCs, and customer interviews we've done recently... a call will be quicker than writing a narrative. If you're interested and free next week and onwards, please ping us an email and we'll reply with a link to schedule a meeting on your earliest convenience: [email protected] Thank you!! |
🤔 hmmmm this seems to be a FIFO based requirement. Could FIFO SQS be used as a buffer here with message group id customised to group like messages? That way can also get batch execution safely within the lambda concurrency for efficiencies. Although it also depends on other constraints and requirements regarding ordering of event processing, once you start using virtual FIFO queues ordering outside of groups goes away. FIFO SQS message group id docs I think we always need to look for the infrastructure to drive our code rather than add more actual code. |
Well, I'm not sure one can rely on SQS FIFO queues alone:
https://ably.com/blog/sqs-fifo-queues-message-ordering-and-exactly-once-processing-guaranteed It gets complex, I don't fully understand it, but what I get out of it is that if I want exactly once execution of my lambda function (with a given payload), I need to implement it in my lambda function. (The group ids would help me to put the same payloads together, but I'd still be exposed to these problems.) Please let me know if I'm wrong. |
Peter, quickly sharing the example we just discussed on the call to satisfy processing the same payload after X time (e.g., pre-aggregation use case) while limiting concurrency: https://github.com/awslabs/aws-lambda-powertools-python/blob/develop/tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py#L12 Contrary to DynamoDB TTL that is primarily used to remove DynamoDB items to save on cost + other use cases like archiving, Idempotency As for the output, whatever output your function returns we'll use it for later. But in your case, you can simply return an empty string to save on DynamoDB costs, since you're not gonna use it either way -- https://awslabs.github.io/aws-lambda-powertools-python/2.10.0/utilities/idempotency/#idempotent_function-decorator |
Took me longer than expected but I've just completed a documentation revamp in Idempotency to prevent this confusion from happening again - pending @leandrodamascena review. What did I change? I've added a new terminology that was missing - Idempotency Record. While it's internal to us, it's a key information to later address the confusion between our expiration validation mechanism vs DynamoDB TTL (that deletes the actual record within 48 hours of expiration). I've added new and revamped existing sequence diagrams to explain how this feature works in different scenarios. For this issue, the key one is the new I've created an explicit documentation banner to call out the differences between Idempotency Record expiration vs DynamoDB TTL. In tandem, this could've saved Peter a few days. I'll close this issue once the PR is merged as this feature will no longer be needed (e.g., Thank you @pmarko1711 for your patience and more importantly for taking the time to meet with me earlier this week to clarify this - hope you like this improvement. New and revamped sequence diagrams depicting Idempotency Record Status too New Idempotency Record terminology Address confusion between Idempotency Record expiration vs DynamoDB time-to-live (TTL) |
Latest docs are now rebuilt with these changes - closing, please feel free to reopen if that didn't work, Peter! Once again, thank you for jumping on a call to clarify the need, and help us improve the documentation for everyone. |
|
This is now released under 2.12.0 version! |
Use case
Sometimes, I need to limit concurrency of a lambda function on the level of its payload. Concurrent invocations are fine but only if they have different payloads.
Background:
I already faced this need in two of my projects:
Solution/User Experience
Implement a decorator similar to
@idempotent
that would:@idempotent
, it would not cache result and again, it'd release the lock on successSimilarly to
@idempotent
, payload should be either the entire event or its subset.Optionally
Alternative solutions
Modify the
@idempotent
decorator to allow for switching off caching and releasing the lock on success. However, then the decorator can hardly be calledidempotent
...Acknowledgment
The text was updated successfully, but these errors were encountered: