From 40d775dbe2accf92d6882ce171b465da7b55b2f6 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Thu, 27 Feb 2025 16:26:57 +0100 Subject: [PATCH] fix(batch): clear message group references after request --- packages/batch/src/SqsFifoPartialProcessor.ts | 1 + .../batch/src/SqsFifoPartialProcessorAsync.ts | 1 + packages/batch/src/SqsFifoProcessor.ts | 8 +++++ .../unit/SqsFifoPartialProcessor.test.ts | 36 +++++++++++++++++++ 4 files changed, 46 insertions(+) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 5854e75e42..fded442669 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -92,6 +92,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { */ public processSync(): (SuccessResponse | FailureResponse)[] { this.prepare(); + this.#processor.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 98d47a3bb1..59001b6b4b 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -91,6 +91,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { */ public async process(): Promise<(SuccessResponse | FailureResponse)[]> { this.prepare(); + this.#processor.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; diff --git a/packages/batch/src/SqsFifoProcessor.ts b/packages/batch/src/SqsFifoProcessor.ts index d24510eba1..f98eed0ec1 100644 --- a/packages/batch/src/SqsFifoProcessor.ts +++ b/packages/batch/src/SqsFifoProcessor.ts @@ -32,6 +32,14 @@ class SqsFifoProcessor { this.#failedGroupIds.add(group); } + /** + * Prepares the processor for a new batch of messages. + */ + public prepare(): void { + this.#currentGroupId = undefined; + this.#failedGroupIds.clear(); + } + /** * Sets the current group ID for the message being processed. * diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index fc871ecf39..3c6cc35e76 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -185,4 +185,40 @@ describe('SQS FIFO Processors', () => { }); }); } + + it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => { + // Prepare + const firstRecord = sqsRecordFactory('fail', '1'); + const secondRecord = sqsRecordFactory('success', '2'); + const firstRecordAgain = sqsRecordFactory('success', '1'); + const event1 = { + Records: [firstRecord, secondRecord], + }; + const event2 = { + Records: [firstRecordAgain], + }; + const processor = new SqsFifoPartialProcessor(); + const fn = vi.fn((record) => { + if (record.body.includes('fail')) { + throw new Error('Processing failed'); + } + + return record; + }); + + // Act + const result1 = processPartialResponseSync(event1, fn, processor, { + skipGroupOnError: true, + throwOnFullBatchFailure: false, + }); + const result2 = processPartialResponseSync(event2, fn, processor, { + skipGroupOnError: true, + throwOnFullBatchFailure: false, + }); + + // Assess + expect(result1.batchItemFailures.length).toBe(1); + expect(result2.batchItemFailures.length).toBe(0); + expect(fn).toHaveBeenCalledTimes(3); + }); });