-
Notifications
You must be signed in to change notification settings - Fork 421
Feature request: Handle SQS FIFO queues with the Bach Processing utility #1927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I've reduced the implementation this this class SQSFifoCircuitBreakerError(Exception):
pass
class SQSFifoPartialProcessor(BasePartialBatchProcessor):
def _process_record(self, record: dict):
if self.fail_messages:
return self.failure_handler(
record=record, exception=(SQSFifoCircuitBreakerError, SQSFifoCircuitBreakerError(), None)
)
return super()._process_record(record)
async def _async_process_record(self, record: dict):
raise NotImplementedError() However, I'm unsure about two things:
|
Atm, only SQS offers FIFO guarantees while DynamoDB/Kinesis offers ordering guarantees on a per shard-level (this goes to the drain when handling batch processing and various producer patterns) - so I'd keep it focused for SQS only for now, keeping it reusable so we could eventually add to others if they improve (by they I mean Kinesis/DynamoDB Stream). The only semantic I'm not sure is whether it'd be useful to have a flag (like you said) in the If the answer isn't clear, I'd vote for having a specialized class for SQS to avoid confusion in other fronts, then port it to others once we have a clearer semantic in place. PS: Implementation will be slightly more elaborated than that (you'll need to collect all message IDs to fail not process subsequent ones). I know you know, editing just in case. |
removed |
|
This is now released under 2.9.0 version! |
Discussed in #1913
Originally posted by whoDoneItAgain February 8, 2023
I've been reading over the documentation and I cant find an answer.
Does Powertools stop processing items after a failed and marks the failed plus all subsequent as failed in its response back to SQS?
Ex. r1,r2,r3,r4,r5 is sent. function fails on r3. ReportBatchItemFailures includes r3 as well as r4 and r5.
The text was updated successfully, but these errors were encountered: