Skip to content

Commit 270115e

Browse files
authored
fix(batch): clear message group references after request (#3674)
1 parent f55127b commit 270115e

File tree

4 files changed

+46
-0
lines changed

4 files changed

+46
-0
lines changed

Diff for: packages/batch/src/SqsFifoPartialProcessor.ts

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
9292
*/
9393
public processSync(): (SuccessResponse | FailureResponse)[] {
9494
this.prepare();
95+
this.#processor.prepare();
9596

9697
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
9798
let currentIndex = 0;

Diff for: packages/batch/src/SqsFifoPartialProcessorAsync.ts

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
9191
*/
9292
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
9393
this.prepare();
94+
this.#processor.prepare();
9495

9596
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
9697
let currentIndex = 0;

Diff for: packages/batch/src/SqsFifoProcessor.ts

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ class SqsFifoProcessor {
3232
this.#failedGroupIds.add(group);
3333
}
3434

35+
/**
36+
* Prepares the processor for a new batch of messages.
37+
*/
38+
public prepare(): void {
39+
this.#currentGroupId = undefined;
40+
this.#failedGroupIds.clear();
41+
}
42+
3543
/**
3644
* Sets the current group ID for the message being processed.
3745
*

Diff for: packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts

+36
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,40 @@ describe('SQS FIFO Processors', () => {
185185
});
186186
});
187187
}
188+
189+
it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => {
190+
// Prepare
191+
const firstRecord = sqsRecordFactory('fail', '1');
192+
const secondRecord = sqsRecordFactory('success', '2');
193+
const firstRecordAgain = sqsRecordFactory('success', '1');
194+
const event1 = {
195+
Records: [firstRecord, secondRecord],
196+
};
197+
const event2 = {
198+
Records: [firstRecordAgain],
199+
};
200+
const processor = new SqsFifoPartialProcessor();
201+
const fn = vi.fn((record) => {
202+
if (record.body.includes('fail')) {
203+
throw new Error('Processing failed');
204+
}
205+
206+
return record;
207+
});
208+
209+
// Act
210+
const result1 = processPartialResponseSync(event1, fn, processor, {
211+
skipGroupOnError: true,
212+
throwOnFullBatchFailure: false,
213+
});
214+
const result2 = processPartialResponseSync(event2, fn, processor, {
215+
skipGroupOnError: true,
216+
throwOnFullBatchFailure: false,
217+
});
218+
219+
// Assess
220+
expect(result1.batchItemFailures.length).toBe(1);
221+
expect(result2.batchItemFailures.length).toBe(0);
222+
expect(fn).toHaveBeenCalledTimes(3);
223+
});
188224
});

0 commit comments

Comments
 (0)