Skip to content

Feature request: Handle SQS FIFO queues with the Bach Processing utility #1927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rubenfonseca opened this issue Feb 14, 2023 Discussed in #1913 · 5 comments · Fixed by #1934
Closed

Feature request: Handle SQS FIFO queues with the Bach Processing utility #1927

rubenfonseca opened this issue Feb 14, 2023 Discussed in #1913 · 5 comments · Fixed by #1934
Assignees
Labels
batch Batch processing utility feature New feature or functionality

Comments

@rubenfonseca
Copy link
Contributor

Discussed in #1913

Originally posted by whoDoneItAgain February 8, 2023
I've been reading over the documentation and I cant find an answer.

Does Powertools stop processing items after a failed and marks the failed plus all subsequent as failed in its response back to SQS?

Ex. r1,r2,r3,r4,r5 is sent. function fails on r3. ReportBatchItemFailures includes r3 as well as r4 and r5.

@rubenfonseca rubenfonseca added triage Pending triage from maintainers feature New feature or functionality batch Batch processing utility labels Feb 14, 2023
@rubenfonseca rubenfonseca self-assigned this Feb 14, 2023
@rubenfonseca
Copy link
Contributor Author

rubenfonseca commented Feb 15, 2023

I've reduced the implementation this this

class SQSFifoCircuitBreakerError(Exception):
    pass

class SQSFifoPartialProcessor(BasePartialBatchProcessor):
    def _process_record(self, record: dict):
        if self.fail_messages:
            return self.failure_handler(
                record=record, exception=(SQSFifoCircuitBreakerError, SQSFifoCircuitBreakerError(), None)
            )

        return super()._process_record(record)

    async def _async_process_record(self, record: dict):
        raise NotImplementedError()

However, I'm unsure about two things:

  1. The implementation is so simple that I wonder if it makes sense to add a fifo flag to the PartialBatchProcessor instead, and tweak the implementation to support this
  2. Does it make sense to support this operation for other backends? (Kinesis, DynamoDB, etc), or should we keep the SQS name only?

@heitorlessa @leandrodamascena

@heitorlessa
Copy link
Contributor

heitorlessa commented Feb 15, 2023

Atm, only SQS offers FIFO guarantees while DynamoDB/Kinesis offers ordering guarantees on a per shard-level (this goes to the drain when handling batch processing and various producer patterns) - so I'd keep it focused for SQS only for now, keeping it reusable so we could eventually add to others if they improve (by they I mean Kinesis/DynamoDB Stream).

The only semantic I'm not sure is whether it'd be useful to have a flag (like you said) in the PartialBatchProcessor to indicate that we will stop at the first failure while marking subsequent unprocessed messages as failed too.

If the answer isn't clear, I'd vote for having a specialized class for SQS to avoid confusion in other fronts, then port it to others once we have a clearer semantic in place.

PS: Implementation will be slightly more elaborated than that (you'll need to collect all message IDs to fail not process subsequent ones). I know you know, editing just in case.

@heitorlessa heitorlessa removed the triage Pending triage from maintainers label Feb 16, 2023
@heitorlessa
Copy link
Contributor

removed triage label

@github-actions
Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

@github-actions github-actions bot added the pending-release Fix or implementation already in dev waiting to be released label Feb 20, 2023
@github-actions
Copy link
Contributor

This is now released under 2.9.0 version!

@github-actions github-actions bot removed the pending-release Fix or implementation already in dev waiting to be released label Feb 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch Batch processing utility feature New feature or functionality
Projects
None yet
2 participants