From e0c1a1f97d25d9f8afe21b0998ca7f8a14c350be Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 15:05:09 +0600 Subject: [PATCH 01/17] feat: `SqsFifo` mixin class --- packages/batch/src/SqsFifo.ts | 119 ++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 packages/batch/src/SqsFifo.ts diff --git a/packages/batch/src/SqsFifo.ts b/packages/batch/src/SqsFifo.ts new file mode 100644 index 0000000000..dd1a0b0c4e --- /dev/null +++ b/packages/batch/src/SqsFifo.ts @@ -0,0 +1,119 @@ +import type { BatchProcessor } from './BatchProcessor.js'; +import type { BatchProcessorSync } from './BatchProcessorSync.js'; +import { EventType } from './constants.js'; +import { + type BatchProcessingError, + SqsFifoShortCircuitError, +} from './errors.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; + +/** + * A type alias for a generic constructor function. + * @template T - The type of the instance that the constructor creates. + */ +// biome-ignore lint/suspicious/noExplicitAny: This is a generic type that is intentionally open +type GenericConstructor = new (...args: any[]) => T; + +export function SqsFifo< + TBase extends GenericConstructor, +>(Base: TBase) { + return class extends Base { + /** + * The ID of the current message group being processed. + */ + _currentGroupId?: string; + /** + * A set of group IDs that have already encountered failures. + */ + _failedGroupIds: Set; + + // biome-ignore lint/suspicious/noExplicitAny: + public constructor(...args: any[]) { + super(EventType.SQS, ...args); + this._failedGroupIds = new Set(); + } + + /** + * Handles a failure for a given record. + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. + * @param record - The record that failed. + * @param exception - The error that occurred. + * @returns The failure response. + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + if (this.options?.skipGroupOnError && this._currentGroupId) { + this._addToFailedGroup(this._currentGroupId); + } + + return super.failureHandler(record, exception); + } + + /** + * Starting from the first failure index, fail all remaining messages regardless + * of their group ID. + * + * This short circuit mechanism is used when we detect a failed message in the batch. + * + * Since messages in a FIFO queue are processed in order, we must stop processing any + * remaining messages in the batch to prevent out-of-order processing. + * + * @param firstFailureIndex Index of first message that failed + * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully + */ + _shortCircuitProcessing( + firstFailureIndex: number, + processedRecords: (SuccessResponse | FailureResponse)[] + ): (SuccessResponse | FailureResponse)[] { + const remainingRecords = this.records.slice(firstFailureIndex); + + for (const record of remainingRecords) { + this._processFailRecord(record, new SqsFifoShortCircuitError()); + } + + this.clean(); + + return processedRecords; + } + + /** + * Adds the specified group ID to the set of failed group IDs. + * + * @param group - The group ID to be added to the set of failed group IDs. + */ + _addToFailedGroup(group: string): void { + this._failedGroupIds.add(group); + } + + /** + * Processes a fail record. + * + * @param record - The record that failed. + * @param exception - The error that occurred. + */ + _processFailRecord( + record: BaseRecord, + exception: BatchProcessingError + ): FailureResponse { + const data = this.toBatchType(record, this.eventType); + + return this.failureHandler(data, exception); + } + + /** + * Sets the current group ID for the message being processed. + * + * @param group - The group ID of the current message being processed. + */ + _setCurrentGroup(group?: string): void { + this._currentGroupId = group; + } + }; +} From c4c7acf63d980d2d98820320c524eb9bc5f688ba Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 15:07:03 +0600 Subject: [PATCH 02/17] refactor: use `SqsFifo` mixin inside `SqsFifoPartialProcessor` --- packages/batch/src/SqsFifoPartialProcessor.ts | 118 ++---------------- 1 file changed, 9 insertions(+), 109 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 6a9546fcb6..99fdcd2f17 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,17 +1,9 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessorSync } from './BatchProcessorSync.js'; +import { SqsFifo } from './SqsFifo.js'; import { EventType } from './constants.js'; -import { - type BatchProcessingError, - SqsFifoMessageGroupShortCircuitError, - SqsFifoShortCircuitError, -} from './errors.js'; -import type { - BaseRecord, - EventSourceDataClassTypes, - FailureResponse, - SuccessResponse, -} from './types.js'; +import { SqsFifoMessageGroupShortCircuitError } from './errors.js'; +import type { FailureResponse, SuccessResponse } from './types.js'; /** * Batch processor for SQS FIFO queues @@ -44,39 +36,7 @@ import type { * }); * ``` */ -class SqsFifoPartialProcessor extends BatchProcessorSync { - /** - * The ID of the current message group being processed. - */ - #currentGroupId?: string; - /** - * A set of group IDs that have already encountered failures. - */ - #failedGroupIds: Set; - - public constructor() { - super(EventType.SQS); - this.#failedGroupIds = new Set(); - } - - /** - * Handles a failure for a given record. - * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. - * @param record - The record that failed. - * @param exception - The error that occurred. - * @returns The failure response. - */ - public failureHandler( - record: EventSourceDataClassTypes, - exception: Error - ): FailureResponse { - if (this.options?.skipGroupOnError && this.#currentGroupId) { - this.#addToFailedGroup(this.#currentGroupId); - } - - return super.failureHandler(record, exception); - } - +class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) { /** * Process a record with a synchronous handler * @@ -101,25 +61,25 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); // If we have any failed messages, we should then short circuit the process and // fail remaining messages unless `skipGroupOnError` is true const shouldShortCircuit = !this.options?.skipGroupOnError && this.failureMessages.length !== 0; if (shouldShortCircuit) { - return this.shortCircuitProcessing(currentIndex, processedRecords); + return this._shortCircuitProcessing(currentIndex, processedRecords); } // If `skipGroupOnError` is true and the current group has previously failed, // then we should skip processing the current group. const shouldSkipCurrentGroup = this.options?.skipGroupOnError && - this.#currentGroupId && - this.#failedGroupIds.has(this.#currentGroupId); + this._currentGroupId && + this._failedGroupIds.has(this._currentGroupId); const result = shouldSkipCurrentGroup - ? this.#processFailRecord( + ? this._processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() ) @@ -133,66 +93,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { return processedRecords; } - - /** - * Starting from the first failure index, fail all remaining messages regardless - * of their group ID. - * - * This short circuit mechanism is used when we detect a failed message in the batch. - * - * Since messages in a FIFO queue are processed in order, we must stop processing any - * remaining messages in the batch to prevent out-of-order processing. - * - * @param firstFailureIndex Index of first message that failed - * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully - */ - protected shortCircuitProcessing( - firstFailureIndex: number, - processedRecords: (SuccessResponse | FailureResponse)[] - ): (SuccessResponse | FailureResponse)[] { - const remainingRecords = this.records.slice(firstFailureIndex); - - for (const record of remainingRecords) { - this.#processFailRecord(record, new SqsFifoShortCircuitError()); - } - - this.clean(); - - return processedRecords; - } - - /** - * Adds the specified group ID to the set of failed group IDs. - * - * @param group - The group ID to be added to the set of failed group IDs. - */ - #addToFailedGroup(group: string): void { - this.#failedGroupIds.add(group); - } - - /** - * Processes a fail record. - * - * @param record - The record that failed. - * @param exception - The error that occurred. - */ - #processFailRecord( - record: BaseRecord, - exception: BatchProcessingError - ): FailureResponse { - const data = this.toBatchType(record, this.eventType); - - return this.failureHandler(data, exception); - } - - /** - * Sets the current group ID for the message being processed. - * - * @param group - The group ID of the current message being processed. - */ - #setCurrentGroup(group?: string): void { - this.#currentGroupId = group; - } } export { SqsFifoPartialProcessor }; From c3b2bb46e929412264e78c22483eb5bd3085ef17 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 15:27:59 +0600 Subject: [PATCH 03/17] fix: put back `SqsFifoPartialProcessor` constructor --- packages/batch/src/SqsFifo.ts | 2 +- packages/batch/src/SqsFifoPartialProcessor.ts | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/batch/src/SqsFifo.ts b/packages/batch/src/SqsFifo.ts index dd1a0b0c4e..20926cc17d 100644 --- a/packages/batch/src/SqsFifo.ts +++ b/packages/batch/src/SqsFifo.ts @@ -34,7 +34,7 @@ export function SqsFifo< // biome-ignore lint/suspicious/noExplicitAny: public constructor(...args: any[]) { - super(EventType.SQS, ...args); + super(...args); this._failedGroupIds = new Set(); } diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 99fdcd2f17..94a7383d68 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -37,6 +37,9 @@ import type { FailureResponse, SuccessResponse } from './types.js'; * ``` */ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) { + public constructor() { + super(EventType.SQS); + } /** * Process a record with a synchronous handler * From 85b7b00ac55141c9b2457d34b627811d2d61aa8f Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 15:30:47 +0600 Subject: [PATCH 04/17] feat: `SqsFifoPartialProcessorAsync` for asynchronous FIFO record processing --- .../batch/src/SqsFifoPartialProcessorAsync.ts | 101 ++++++++++++++++++ packages/batch/src/index.ts | 1 + 2 files changed, 102 insertions(+) create mode 100644 packages/batch/src/SqsFifoPartialProcessorAsync.ts diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts new file mode 100644 index 0000000000..645794296e --- /dev/null +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -0,0 +1,101 @@ +import type { SQSRecord } from 'aws-lambda'; +import { BatchProcessor } from './BatchProcessor.js'; +import { SqsFifo } from './SqsFifo.js'; +import { EventType } from './constants.js'; +import { SqsFifoMessageGroupShortCircuitError } from './errors.js'; +import type { FailureResponse, SuccessResponse } from './types.js'; + +/** + * Batch processor for SQS FIFO queues + * + * This class extends the {@link BatchProcessor} class and provides + * a mechanism to process records from SQS FIFO queues asynchronously. + * + * By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. + * + * However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. + * + * @example + * ```typescript + * import { + * BatchProcessor, + * SqsFifoPartialProcessorAsync, + * processPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import type { SQSRecord, SQSHandler } from 'aws-lambda'; + * + * const processor = new SqsFifoPartialProcessorAsync(); + * + * const recordHandler = async (record: SQSRecord): Promise => { + * const payload = JSON.parse(record.body); + * }; + * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponse(event, recordHandler, processor, { + * context, + * }); + * ``` + */ +class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) { + public constructor() { + super(EventType.SQS); + } + /** + * Process a record with a asynchronous handler + * + * This method orchestrates the processing of a batch of records asynchronously + * for SQS FIFO queues. + * + * The method calls the prepare hook to initialize the processor and then + * iterates over each record in the batch, processing them one by one. + * + * If one of them fails and `skipGroupOnError` is not true, the method short circuits + * the processing and fails the remaining records in the batch. + * + * If one of them fails and `skipGroupOnError` is true, then the method fails the current record + * if the message group has any previous failure, otherwise keeps processing. + * + * Then, it calls the clean hook to clean up the processor and returns the + * processed records. + */ + public async process(): Promise<(SuccessResponse | FailureResponse)[]> { + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + let currentIndex = 0; + for (const record of this.records) { + this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + + // If we have any failed messages, we should then short circuit the process and + // fail remaining messages unless `skipGroupOnError` is true + const shouldShortCircuit = + !this.options?.skipGroupOnError && this.failureMessages.length !== 0; + if (shouldShortCircuit) { + return this._shortCircuitProcessing(currentIndex, processedRecords); + } + + // If `skipGroupOnError` is true and the current group has previously failed, + // then we should skip processing the current group. + const shouldSkipCurrentGroup = + this.options?.skipGroupOnError && + this._currentGroupId && + this._failedGroupIds.has(this._currentGroupId); + + const result = shouldSkipCurrentGroup + ? this._processFailRecord( + record, + new SqsFifoMessageGroupShortCircuitError() + ) + : await this.processRecord(record); + + processedRecords.push(result); + currentIndex++; + } + + this.clean(); + + return processedRecords; + } +} + +export { SqsFifoPartialProcessorAsync }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 499202f722..ceb2b114fb 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -12,3 +12,4 @@ export { BatchProcessor } from './BatchProcessor.js'; export { processPartialResponseSync } from './processPartialResponseSync.js'; export { processPartialResponse } from './processPartialResponse.js'; export { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; +export { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; From 632b8f9abaf257293f2b3236abc66364d9be383d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 15:39:32 +0600 Subject: [PATCH 05/17] refactor: `BatchProcessingOptions` & `processPartialResponse` for `SqsFifoPartialProcessorAsync` --- packages/batch/src/SqsFifo.ts | 1 - packages/batch/src/processPartialResponse.ts | 8 ++++---- packages/batch/src/types.ts | 15 ++++++++++++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/packages/batch/src/SqsFifo.ts b/packages/batch/src/SqsFifo.ts index 20926cc17d..9fea970d3c 100644 --- a/packages/batch/src/SqsFifo.ts +++ b/packages/batch/src/SqsFifo.ts @@ -1,6 +1,5 @@ import type { BatchProcessor } from './BatchProcessor.js'; import type { BatchProcessorSync } from './BatchProcessorSync.js'; -import { EventType } from './constants.js'; import { type BatchProcessingError, SqsFifoShortCircuitError, diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index daba2e862a..99331b9fff 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -70,13 +70,13 @@ import type { * @param event The event object containing the batch of records * @param recordHandler Async function to process each record from the batch * @param processor Batch processor instance to handle the batch processing - * @param options Batch processing options + * @param options Batch processing options, which can vary with chosen batch processor implementation */ -const processPartialResponse = async ( +const processPartialResponse = async ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, - processor: BasePartialBatchProcessor, - options?: BatchProcessingOptions + processor: T, + options?: BatchProcessingOptions ): Promise => { if (!event.Records || !Array.isArray(event.Records)) { throw new UnexpectedBatchTypeError(); diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 3748c03e95..e94b811f6c 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -6,6 +6,7 @@ import type { } from 'aws-lambda'; import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; +import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; /** * Options for batch processing @@ -23,10 +24,14 @@ type BatchProcessingOptions = { */ context?: Context; /** - * This option is only available for SqsFifoPartialProcessor. + * This option is only available for SqsFifoPartialProcessor & SqsFifoPartialProcessorAsync. * If true skip the group on error during processing. */ - skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; + skipGroupOnError?: T extends + | SqsFifoPartialProcessor + | SqsFifoPartialProcessorAsync + ? boolean + : never; /** * Set this to false to prevent throwing an error if the entire batch fails. */ @@ -36,7 +41,11 @@ type BatchProcessingOptions = { * When set to `true`, the records will be processed in parallel using `Promise.all`. * When set to `false`, the records will be processed sequentially. */ - processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean; + processInParallel?: T extends + | SqsFifoPartialProcessor + | SqsFifoPartialProcessorAsync + ? never + : boolean; }; /** From c298f8b36c2714a351d125f3100bb62e57e1f745 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 15:59:27 +0600 Subject: [PATCH 06/17] tests: tests for `SqsFifoPartialProcessorAsync`, similar to `SqsFifoPartialProcessor` --- .../unit/SqsFifoPartialProcessorAsync.test.ts | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts new file mode 100644 index 0000000000..dfd262d045 --- /dev/null +++ b/packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts @@ -0,0 +1,155 @@ +import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + SqsFifoMessageGroupShortCircuitError, + SqsFifoPartialProcessorAsync, + SqsFifoShortCircuitError, + processPartialResponse, +} from '../../src/index.js'; +import { sqsRecordFactory } from '../helpers/factories.js'; +import { sqsRecordHandler } from '../helpers/handlers.js'; + +describe('Class: SqsFifoPartialProcessorAsync', () => { + const ENVIRONMENT_VARIABLES = process.env; + + beforeEach(() => { + vi.clearAllMocks(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Asynchronous SQS FIFO batch processing', () => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord] }; + const processor = new SqsFifoPartialProcessorAsync(); + + // Act + const result = await processPartialResponse( + event, + sqsRecordHandler, + processor + ); + + // Assess + expect(result.batchItemFailures).toStrictEqual([]); + }); + + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail'); + const thirdRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord, thirdRecord] }; + const processor = new SqsFifoPartialProcessorAsync(); + + // Act + const result = await processPartialResponse( + event, + sqsRecordHandler, + processor + ); + + // Assess + expect(result.batchItemFailures.length).toBe(2); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); + + it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => { + // Prepare + const firstRecord = sqsRecordFactory('fail', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('success', '3'); + const event = { + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], + }; + const processor = new SqsFifoPartialProcessorAsync(); + + // Act + const result = await processPartialResponse( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: true, + } + ); + + // Assess + expect(result.batchItemFailures.length).toBe(4); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + firstRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[2].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(result.batchItemFailures[3].itemIdentifier).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(4); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + expect(processor.errors[3]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + it('short circuits on the first failure when `skipGroupOnError` is false', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('fail', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessorAsync(); + + // Act + const result = await processPartialResponse( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: false, + } + ); + + // Assess + expect(result.batchItemFailures.length).toBe(3); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(result.batchItemFailures[2].itemIdentifier).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(3); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); + }); +}); From 9cf9cd37f5a54246fbce7bd2b42bf2159a8692e7 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 16:40:28 +0600 Subject: [PATCH 07/17] refactor: extract `shouldShortCircuit` & `shouldSkipCurrentGroup` calculation inside mixin --- packages/batch/src/SqsFifo.ts | 24 +++++++++++++++++++ packages/batch/src/SqsFifoPartialProcessor.ts | 15 ++---------- .../batch/src/SqsFifoPartialProcessorAsync.ts | 15 ++---------- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/packages/batch/src/SqsFifo.ts b/packages/batch/src/SqsFifo.ts index 9fea970d3c..da5695c316 100644 --- a/packages/batch/src/SqsFifo.ts +++ b/packages/batch/src/SqsFifo.ts @@ -114,5 +114,29 @@ export function SqsFifo< _setCurrentGroup(group?: string): void { this._currentGroupId = group; } + + /** + * Determines whether the current group should be short-circuited. + * If we have any failed messages, we should then short circuit the process and + * fail remaining messages unless `skipGroupOnError` is true + */ + _shouldShortCircuit(): boolean { + return ( + !this.options?.skipGroupOnError && this.failureMessages.length !== 0 + ); + } + + /** + * Determines whether the current group should be skipped. + * If `skipGroupOnError` is true and the current group has previously failed, + * then we should skip processing the current group. + */ + _shouldSkipCurrentGroup(): boolean { + return ( + (this.options?.skipGroupOnError ?? false) && + this._currentGroupId && + this._failedGroupIds.has(this._currentGroupId) + ); + } }; } diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 94a7383d68..e37bd384c9 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -66,22 +66,11 @@ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) { for (const record of this.records) { this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); - // If we have any failed messages, we should then short circuit the process and - // fail remaining messages unless `skipGroupOnError` is true - const shouldShortCircuit = - !this.options?.skipGroupOnError && this.failureMessages.length !== 0; - if (shouldShortCircuit) { + if (this._shouldShortCircuit()) { return this._shortCircuitProcessing(currentIndex, processedRecords); } - // If `skipGroupOnError` is true and the current group has previously failed, - // then we should skip processing the current group. - const shouldSkipCurrentGroup = - this.options?.skipGroupOnError && - this._currentGroupId && - this._failedGroupIds.has(this._currentGroupId); - - const result = shouldSkipCurrentGroup + const result = this._shouldSkipCurrentGroup() ? this._processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 645794296e..1ad3016c2f 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -66,22 +66,11 @@ class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) { for (const record of this.records) { this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); - // If we have any failed messages, we should then short circuit the process and - // fail remaining messages unless `skipGroupOnError` is true - const shouldShortCircuit = - !this.options?.skipGroupOnError && this.failureMessages.length !== 0; - if (shouldShortCircuit) { + if (this._shouldShortCircuit()) { return this._shortCircuitProcessing(currentIndex, processedRecords); } - // If `skipGroupOnError` is true and the current group has previously failed, - // then we should skip processing the current group. - const shouldSkipCurrentGroup = - this.options?.skipGroupOnError && - this._currentGroupId && - this._failedGroupIds.has(this._currentGroupId); - - const result = shouldSkipCurrentGroup + const result = this._shouldSkipCurrentGroup() ? this._processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() From 1453b0d12f45caf6ce6c5fb06728c2b5835e4b0c Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 6 Oct 2024 16:41:09 +0600 Subject: [PATCH 08/17] style: spacing in the doc --- packages/batch/src/SqsFifo.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/batch/src/SqsFifo.ts b/packages/batch/src/SqsFifo.ts index da5695c316..ed120fc918 100644 --- a/packages/batch/src/SqsFifo.ts +++ b/packages/batch/src/SqsFifo.ts @@ -117,6 +117,7 @@ export function SqsFifo< /** * Determines whether the current group should be short-circuited. + * * If we have any failed messages, we should then short circuit the process and * fail remaining messages unless `skipGroupOnError` is true */ @@ -128,6 +129,7 @@ export function SqsFifo< /** * Determines whether the current group should be skipped. + * * If `skipGroupOnError` is true and the current group has previously failed, * then we should skip processing the current group. */ From 97a0682c90aae8ad7f42ac122b6ea49d50ebbcaf Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 3 Nov 2024 12:01:35 +0600 Subject: [PATCH 09/17] feat: revert to original implementation for `SqsFifoPartialProcessor` --- packages/batch/src/SqsFifoPartialProcessor.ts | 126 ++++++++++++++++-- 1 file changed, 117 insertions(+), 9 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index e37bd384c9..6a9546fcb6 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,9 +1,17 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessorSync } from './BatchProcessorSync.js'; -import { SqsFifo } from './SqsFifo.js'; import { EventType } from './constants.js'; -import { SqsFifoMessageGroupShortCircuitError } from './errors.js'; -import type { FailureResponse, SuccessResponse } from './types.js'; +import { + type BatchProcessingError, + SqsFifoMessageGroupShortCircuitError, + SqsFifoShortCircuitError, +} from './errors.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; /** * Batch processor for SQS FIFO queues @@ -36,10 +44,39 @@ import type { FailureResponse, SuccessResponse } from './types.js'; * }); * ``` */ -class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) { +class SqsFifoPartialProcessor extends BatchProcessorSync { + /** + * The ID of the current message group being processed. + */ + #currentGroupId?: string; + /** + * A set of group IDs that have already encountered failures. + */ + #failedGroupIds: Set; + public constructor() { super(EventType.SQS); + this.#failedGroupIds = new Set(); } + + /** + * Handles a failure for a given record. + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. + * @param record - The record that failed. + * @param exception - The error that occurred. + * @returns The failure response. + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + if (this.options?.skipGroupOnError && this.#currentGroupId) { + this.#addToFailedGroup(this.#currentGroupId); + } + + return super.failureHandler(record, exception); + } + /** * Process a record with a synchronous handler * @@ -64,14 +101,25 @@ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); - if (this._shouldShortCircuit()) { - return this._shortCircuitProcessing(currentIndex, processedRecords); + // If we have any failed messages, we should then short circuit the process and + // fail remaining messages unless `skipGroupOnError` is true + const shouldShortCircuit = + !this.options?.skipGroupOnError && this.failureMessages.length !== 0; + if (shouldShortCircuit) { + return this.shortCircuitProcessing(currentIndex, processedRecords); } - const result = this._shouldSkipCurrentGroup() - ? this._processFailRecord( + // If `skipGroupOnError` is true and the current group has previously failed, + // then we should skip processing the current group. + const shouldSkipCurrentGroup = + this.options?.skipGroupOnError && + this.#currentGroupId && + this.#failedGroupIds.has(this.#currentGroupId); + + const result = shouldSkipCurrentGroup + ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() ) @@ -85,6 +133,66 @@ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) { return processedRecords; } + + /** + * Starting from the first failure index, fail all remaining messages regardless + * of their group ID. + * + * This short circuit mechanism is used when we detect a failed message in the batch. + * + * Since messages in a FIFO queue are processed in order, we must stop processing any + * remaining messages in the batch to prevent out-of-order processing. + * + * @param firstFailureIndex Index of first message that failed + * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully + */ + protected shortCircuitProcessing( + firstFailureIndex: number, + processedRecords: (SuccessResponse | FailureResponse)[] + ): (SuccessResponse | FailureResponse)[] { + const remainingRecords = this.records.slice(firstFailureIndex); + + for (const record of remainingRecords) { + this.#processFailRecord(record, new SqsFifoShortCircuitError()); + } + + this.clean(); + + return processedRecords; + } + + /** + * Adds the specified group ID to the set of failed group IDs. + * + * @param group - The group ID to be added to the set of failed group IDs. + */ + #addToFailedGroup(group: string): void { + this.#failedGroupIds.add(group); + } + + /** + * Processes a fail record. + * + * @param record - The record that failed. + * @param exception - The error that occurred. + */ + #processFailRecord( + record: BaseRecord, + exception: BatchProcessingError + ): FailureResponse { + const data = this.toBatchType(record, this.eventType); + + return this.failureHandler(data, exception); + } + + /** + * Sets the current group ID for the message being processed. + * + * @param group - The group ID of the current message being processed. + */ + #setCurrentGroup(group?: string): void { + this.#currentGroupId = group; + } } export { SqsFifoPartialProcessor }; From bf95cd29be51d2bdf7198e4d5fb4e818858b4650 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 3 Nov 2024 12:04:15 +0600 Subject: [PATCH 10/17] refactor: remove `Mixin` from `SqsFifoPartialProcessorAsync` --- packages/batch/src/SqsFifo.ts | 144 ------------------ .../batch/src/SqsFifoPartialProcessorAsync.ts | 139 +++++++++++++++-- 2 files changed, 130 insertions(+), 153 deletions(-) delete mode 100644 packages/batch/src/SqsFifo.ts diff --git a/packages/batch/src/SqsFifo.ts b/packages/batch/src/SqsFifo.ts deleted file mode 100644 index ed120fc918..0000000000 --- a/packages/batch/src/SqsFifo.ts +++ /dev/null @@ -1,144 +0,0 @@ -import type { BatchProcessor } from './BatchProcessor.js'; -import type { BatchProcessorSync } from './BatchProcessorSync.js'; -import { - type BatchProcessingError, - SqsFifoShortCircuitError, -} from './errors.js'; -import type { - BaseRecord, - EventSourceDataClassTypes, - FailureResponse, - SuccessResponse, -} from './types.js'; - -/** - * A type alias for a generic constructor function. - * @template T - The type of the instance that the constructor creates. - */ -// biome-ignore lint/suspicious/noExplicitAny: This is a generic type that is intentionally open -type GenericConstructor = new (...args: any[]) => T; - -export function SqsFifo< - TBase extends GenericConstructor, ->(Base: TBase) { - return class extends Base { - /** - * The ID of the current message group being processed. - */ - _currentGroupId?: string; - /** - * A set of group IDs that have already encountered failures. - */ - _failedGroupIds: Set; - - // biome-ignore lint/suspicious/noExplicitAny: - public constructor(...args: any[]) { - super(...args); - this._failedGroupIds = new Set(); - } - - /** - * Handles a failure for a given record. - * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. - * @param record - The record that failed. - * @param exception - The error that occurred. - * @returns The failure response. - */ - public failureHandler( - record: EventSourceDataClassTypes, - exception: Error - ): FailureResponse { - if (this.options?.skipGroupOnError && this._currentGroupId) { - this._addToFailedGroup(this._currentGroupId); - } - - return super.failureHandler(record, exception); - } - - /** - * Starting from the first failure index, fail all remaining messages regardless - * of their group ID. - * - * This short circuit mechanism is used when we detect a failed message in the batch. - * - * Since messages in a FIFO queue are processed in order, we must stop processing any - * remaining messages in the batch to prevent out-of-order processing. - * - * @param firstFailureIndex Index of first message that failed - * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully - */ - _shortCircuitProcessing( - firstFailureIndex: number, - processedRecords: (SuccessResponse | FailureResponse)[] - ): (SuccessResponse | FailureResponse)[] { - const remainingRecords = this.records.slice(firstFailureIndex); - - for (const record of remainingRecords) { - this._processFailRecord(record, new SqsFifoShortCircuitError()); - } - - this.clean(); - - return processedRecords; - } - - /** - * Adds the specified group ID to the set of failed group IDs. - * - * @param group - The group ID to be added to the set of failed group IDs. - */ - _addToFailedGroup(group: string): void { - this._failedGroupIds.add(group); - } - - /** - * Processes a fail record. - * - * @param record - The record that failed. - * @param exception - The error that occurred. - */ - _processFailRecord( - record: BaseRecord, - exception: BatchProcessingError - ): FailureResponse { - const data = this.toBatchType(record, this.eventType); - - return this.failureHandler(data, exception); - } - - /** - * Sets the current group ID for the message being processed. - * - * @param group - The group ID of the current message being processed. - */ - _setCurrentGroup(group?: string): void { - this._currentGroupId = group; - } - - /** - * Determines whether the current group should be short-circuited. - * - * If we have any failed messages, we should then short circuit the process and - * fail remaining messages unless `skipGroupOnError` is true - */ - _shouldShortCircuit(): boolean { - return ( - !this.options?.skipGroupOnError && this.failureMessages.length !== 0 - ); - } - - /** - * Determines whether the current group should be skipped. - * - * If `skipGroupOnError` is true and the current group has previously failed, - * then we should skip processing the current group. - */ - _shouldSkipCurrentGroup(): boolean { - return ( - (this.options?.skipGroupOnError ?? false) && - this._currentGroupId && - this._failedGroupIds.has(this._currentGroupId) - ); - } - }; -} diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 1ad3016c2f..037bb68251 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -1,9 +1,17 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessor } from './BatchProcessor.js'; -import { SqsFifo } from './SqsFifo.js'; import { EventType } from './constants.js'; -import { SqsFifoMessageGroupShortCircuitError } from './errors.js'; -import type { FailureResponse, SuccessResponse } from './types.js'; +import { + type BatchProcessingError, + SqsFifoMessageGroupShortCircuitError, + SqsFifoShortCircuitError, +} from './errors.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; /** * Batch processor for SQS FIFO queues @@ -36,10 +44,39 @@ import type { FailureResponse, SuccessResponse } from './types.js'; * }); * ``` */ -class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) { +class SqsFifoPartialProcessorAsync extends BatchProcessor { + /** + * The ID of the current message group being processed. + */ + #currentGroupId?: string; + /** + * A set of group IDs that have already encountered failures. + */ + #failedGroupIds: Set; + public constructor() { super(EventType.SQS); + this.#failedGroupIds = new Set(); } + + /** + * Handles a failure for a given record. + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. + * @param record - The record that failed. + * @param exception - The error that occurred. + * @returns The failure response. + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + if (this.options?.skipGroupOnError && this.#currentGroupId) { + this.#addToFailedGroup(this.#currentGroupId); + } + + return super.failureHandler(record, exception); + } + /** * Process a record with a asynchronous handler * @@ -64,14 +101,14 @@ class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); - if (this._shouldShortCircuit()) { - return this._shortCircuitProcessing(currentIndex, processedRecords); + if (this.#shouldShortCircuit()) { + return this.shortCircuitProcessing(currentIndex, processedRecords); } - const result = this._shouldSkipCurrentGroup() - ? this._processFailRecord( + const result = this.#shouldSkipCurrentGroup() + ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() ) @@ -85,6 +122,90 @@ class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) { return processedRecords; } + + /** + * Starting from the first failure index, fail all remaining messages regardless + * of their group ID. + * + * This short circuit mechanism is used when we detect a failed message in the batch. + * + * Since messages in a FIFO queue are processed in order, we must stop processing any + * remaining messages in the batch to prevent out-of-order processing. + * + * @param firstFailureIndex Index of first message that failed + * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully + */ + protected shortCircuitProcessing( + firstFailureIndex: number, + processedRecords: (SuccessResponse | FailureResponse)[] + ): (SuccessResponse | FailureResponse)[] { + const remainingRecords = this.records.slice(firstFailureIndex); + + for (const record of remainingRecords) { + this.#processFailRecord(record, new SqsFifoShortCircuitError()); + } + + this.clean(); + + return processedRecords; + } + + /** + * Adds the specified group ID to the set of failed group IDs. + * + * @param group - The group ID to be added to the set of failed group IDs. + */ + #addToFailedGroup(group: string): void { + this.#failedGroupIds.add(group); + } + + /** + * Processes a fail record. + * + * @param record - The record that failed. + * @param exception - The error that occurred. + */ + #processFailRecord( + record: BaseRecord, + exception: BatchProcessingError + ): FailureResponse { + const data = this.toBatchType(record, this.eventType); + + return this.failureHandler(data, exception); + } + + /** + * Sets the current group ID for the message being processed. + * + * @param group - The group ID of the current message being processed. + */ + #setCurrentGroup(group?: string): void { + this.#currentGroupId = group; + } + + /** + * Determines whether the current group should be short-circuited. + * + * If we have any failed messages, we should then short circuit the process and + * fail remaining messages unless `skipGroupOnError` is true + */ + #shouldShortCircuit(): boolean { + return !this.options?.skipGroupOnError && this.failureMessages.length !== 0; + } + + /** + * Determines whether the current group should be skipped. + * + * If `skipGroupOnError` is true and the current group has previously failed, + * then we should skip processing the current group. + */ + #shouldSkipCurrentGroup(): boolean { + return ( + (this.options?.skipGroupOnError ?? false) && + this.#currentGroupId && + this.#failedGroupIds.has(this.#currentGroupId) + ); + } } export { SqsFifoPartialProcessorAsync }; From a425e81f43a66145fed9b0efbac450fd25a16a94 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 3 Nov 2024 12:43:29 +0600 Subject: [PATCH 11/17] fix: sonarlint issue for `failedGroupIds` --- packages/batch/src/SqsFifoPartialProcessorAsync.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 037bb68251..3fe0d9185f 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -52,7 +52,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { /** * A set of group IDs that have already encountered failures. */ - #failedGroupIds: Set; + readonly #failedGroupIds: Set; public constructor() { super(EventType.SQS); From f561c1c78bd99a61f20721f9dd0ab3142a6cf64b Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 5 Nov 2024 20:10:06 +0600 Subject: [PATCH 12/17] doc: async sqs fifo message processing --- docs/utilities/batch.md | 12 +++++++--- .../batch/gettingStartedSQSFifoAsync.ts | 22 +++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 examples/snippets/batch/gettingStartedSQSFifoAsync.ts diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ec0e4ba108..dd019c2bf8 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -149,12 +149,18 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va === "Recommended" - ```typescript hl_lines="1-4 8" + ```typescript hl_lines="1-4 8 20" --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" ``` 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) +=== "Async processing" + + ```typescript hl_lines="1-4 8 20" + --8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts" + ``` + === "Enabling skipGroupOnError flag" ```typescript hl_lines="1-4 13 30" @@ -162,8 +168,8 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va ``` !!! Note - Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`. - This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details. + Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`. + If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`. ### Processing messages from Kinesis diff --git a/examples/snippets/batch/gettingStartedSQSFifoAsync.ts b/examples/snippets/batch/gettingStartedSQSFifoAsync.ts new file mode 100644 index 0000000000..5920480cf6 --- /dev/null +++ b/examples/snippets/batch/gettingStartedSQSFifoAsync.ts @@ -0,0 +1,22 @@ +import { + SqsFifoPartialProcessorAsync, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { SQSHandler, SQSRecord } from 'aws-lambda'; + +const processor = new SqsFifoPartialProcessorAsync(); +const logger = new Logger(); + +const recordHandler = async (record: SQSRecord): Promise => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler: SQSHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + }); From ae5c3ce861863cf3b334b24b43a0575964a31a48 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 5 Nov 2024 20:45:18 +0600 Subject: [PATCH 13/17] test: refactor `SqsFifoPartialProcessor` & `SqsFifoPartialProcessorAsync` tests --- .../unit/SqsFifoPartialProcessor.test.ts | 291 ++++++++++-------- .../unit/SqsFifoPartialProcessorAsync.test.ts | 155 ---------- 2 files changed, 162 insertions(+), 284 deletions(-) delete mode 100644 packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 7eec6164de..fc871ecf39 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -2,13 +2,42 @@ import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { SqsFifoMessageGroupShortCircuitError, SqsFifoPartialProcessor, + SqsFifoPartialProcessorAsync, SqsFifoShortCircuitError, + processPartialResponse, processPartialResponseSync, } from '../../src/index.js'; +import type { PartialItemFailureResponse } from '../../src/types.js'; import { sqsRecordFactory } from '../helpers/factories.js'; import { sqsRecordHandler } from '../helpers/handlers.js'; -describe('Class: SqsFifoBatchProcessor', () => { +type ProcessorConfig = { + name: string; + processorClass: + | typeof SqsFifoPartialProcessor + | typeof SqsFifoPartialProcessorAsync; + processFunction: + | typeof processPartialResponse + | typeof processPartialResponseSync; + isAsync: boolean; +}; + +const processors: ProcessorConfig[] = [ + { + name: 'Synchronous', + processorClass: SqsFifoPartialProcessor, + processFunction: processPartialResponseSync, + isAsync: false, + }, + { + name: 'Asynchronous', + processorClass: SqsFifoPartialProcessorAsync, + processFunction: processPartialResponse, + isAsync: true, + }, +]; + +describe('SQS FIFO Processors', () => { const ENVIRONMENT_VARIABLES = process.env; beforeEach(() => { @@ -20,136 +49,140 @@ describe('Class: SqsFifoBatchProcessor', () => { process.env = ENVIRONMENT_VARIABLES; }); - describe('Synchronous SQS FIFO batch processing', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord] }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result.batchItemFailures).toStrictEqual([]); - }); + for (const { name, processorClass, processFunction, isAsync } of processors) { + describe(`${name} SQS FIFO batch processing`, () => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord] }; + const processor = new processorClass(); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('fail'); - const thirdRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord, thirdRecord] }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result.batchItemFailures.length).toBe(2); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); - }); + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor) + : (processFunction( + event, + sqsRecordHandler, + processor + ) as PartialItemFailureResponse); - it('continues processing and moves to the next group when `skipGroupOnError` is true', () => { - // Prepare - const firstRecord = sqsRecordFactory('fail', '1'); - const secondRecord = sqsRecordFactory('success', '1'); - const thirdRecord = sqsRecordFactory('fail', '2'); - const fourthRecord = sqsRecordFactory('success', '2'); - const fifthRecord = sqsRecordFactory('success', '3'); - const event = { - Records: [ - firstRecord, - secondRecord, - thirdRecord, - fourthRecord, - fifthRecord, - ], - }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor, - { - skipGroupOnError: true, - } - ); - - // Assess - expect(result.batchItemFailures.length).toBe(4); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - firstRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[2].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(result.batchItemFailures[3].itemIdentifier).toBe( - fourthRecord.messageId - ); - expect(processor.errors.length).toBe(4); - expect(processor.errors[1]).toBeInstanceOf( - SqsFifoMessageGroupShortCircuitError - ); - expect(processor.errors[3]).toBeInstanceOf( - SqsFifoMessageGroupShortCircuitError - ); - }); + // Assess + expect(result.batchItemFailures).toStrictEqual([]); + }); + + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail'); + const thirdRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord, thirdRecord] }; + const processor = new processorClass(); + + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor) + : (processFunction( + event, + sqsRecordHandler, + processor + ) as PartialItemFailureResponse); - it('short circuits on the first failure when `skipGroupOnError` is false', () => { - // Prepare - const firstRecord = sqsRecordFactory('success', '1'); - const secondRecord = sqsRecordFactory('fail', '2'); - const thirdRecord = sqsRecordFactory('success', '3'); - const fourthRecord = sqsRecordFactory('success', '4'); - const event = { - Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], - }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor, - { - skipGroupOnError: false, - } - ); - - // Assess - expect(result.batchItemFailures.length).toBe(3); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(result.batchItemFailures[2].itemIdentifier).toBe( - fourthRecord.messageId - ); - expect(processor.errors.length).toBe(3); - expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + // Assess + expect(result.batchItemFailures.length).toBe(2); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); + + it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => { + // Prepare + const firstRecord = sqsRecordFactory('fail', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('success', '3'); + const event = { + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], + }; + const processor = new processorClass(); + + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: true, + }) + : (processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: true, + }) as PartialItemFailureResponse); + + // Assess + expect(result.batchItemFailures.length).toBe(4); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + firstRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[2].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(result.batchItemFailures[3].itemIdentifier).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(4); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + expect(processor.errors[3]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + it('short circuits on the first failure when `skipGroupOnError` is false', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('fail', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new processorClass(); + + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: false, + }) + : (processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: false, + }) as PartialItemFailureResponse); + + // Assess + expect(result.batchItemFailures.length).toBe(3); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(result.batchItemFailures[2].itemIdentifier).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(3); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); }); - }); + } }); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts deleted file mode 100644 index dfd262d045..0000000000 --- a/packages/batch/tests/unit/SqsFifoPartialProcessorAsync.test.ts +++ /dev/null @@ -1,155 +0,0 @@ -import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - SqsFifoMessageGroupShortCircuitError, - SqsFifoPartialProcessorAsync, - SqsFifoShortCircuitError, - processPartialResponse, -} from '../../src/index.js'; -import { sqsRecordFactory } from '../helpers/factories.js'; -import { sqsRecordHandler } from '../helpers/handlers.js'; - -describe('Class: SqsFifoPartialProcessorAsync', () => { - const ENVIRONMENT_VARIABLES = process.env; - - beforeEach(() => { - vi.clearAllMocks(); - process.env = { ...ENVIRONMENT_VARIABLES }; - }); - - afterAll(() => { - process.env = ENVIRONMENT_VARIABLES; - }); - - describe('Asynchronous SQS FIFO batch processing', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord] }; - const processor = new SqsFifoPartialProcessorAsync(); - - // Act - const result = await processPartialResponse( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result.batchItemFailures).toStrictEqual([]); - }); - - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('fail'); - const thirdRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord, thirdRecord] }; - const processor = new SqsFifoPartialProcessorAsync(); - - // Act - const result = await processPartialResponse( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result.batchItemFailures.length).toBe(2); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); - }); - - it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => { - // Prepare - const firstRecord = sqsRecordFactory('fail', '1'); - const secondRecord = sqsRecordFactory('success', '1'); - const thirdRecord = sqsRecordFactory('fail', '2'); - const fourthRecord = sqsRecordFactory('success', '2'); - const fifthRecord = sqsRecordFactory('success', '3'); - const event = { - Records: [ - firstRecord, - secondRecord, - thirdRecord, - fourthRecord, - fifthRecord, - ], - }; - const processor = new SqsFifoPartialProcessorAsync(); - - // Act - const result = await processPartialResponse( - event, - sqsRecordHandler, - processor, - { - skipGroupOnError: true, - } - ); - - // Assess - expect(result.batchItemFailures.length).toBe(4); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - firstRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[2].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(result.batchItemFailures[3].itemIdentifier).toBe( - fourthRecord.messageId - ); - expect(processor.errors.length).toBe(4); - expect(processor.errors[1]).toBeInstanceOf( - SqsFifoMessageGroupShortCircuitError - ); - expect(processor.errors[3]).toBeInstanceOf( - SqsFifoMessageGroupShortCircuitError - ); - }); - - it('short circuits on the first failure when `skipGroupOnError` is false', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success', '1'); - const secondRecord = sqsRecordFactory('fail', '2'); - const thirdRecord = sqsRecordFactory('success', '3'); - const fourthRecord = sqsRecordFactory('success', '4'); - const event = { - Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], - }; - const processor = new SqsFifoPartialProcessorAsync(); - - // Act - const result = await processPartialResponse( - event, - sqsRecordHandler, - processor, - { - skipGroupOnError: false, - } - ); - - // Assess - expect(result.batchItemFailures.length).toBe(3); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(result.batchItemFailures[2].itemIdentifier).toBe( - fourthRecord.messageId - ); - expect(processor.errors.length).toBe(3); - expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); - }); - }); -}); From f9beef9e24e259aead26a45df5176655a979c57c Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Thu, 7 Nov 2024 10:45:14 +0600 Subject: [PATCH 14/17] refactor: use `SqsFifoProcessingUtils` inside `SqsFifoPartialProcessorAsync` --- .../batch/src/SqsFifoPartialProcessorAsync.ts | 65 +++----------- packages/batch/src/SqsFifoProcessingUtils.ts | 86 +++++++++++++++++++ 2 files changed, 96 insertions(+), 55 deletions(-) create mode 100644 packages/batch/src/SqsFifoProcessingUtils.ts diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 3fe0d9185f..e566fc01e6 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -1,5 +1,6 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessor } from './BatchProcessor.js'; +import { SqsFifoProcessingUtils } from './SqsFifoProcessingUtils.js'; import { EventType } from './constants.js'; import { type BatchProcessingError, @@ -46,17 +47,13 @@ import type { */ class SqsFifoPartialProcessorAsync extends BatchProcessor { /** - * The ID of the current message group being processed. + * Utility class for processing SQS FIFO queues */ - #currentGroupId?: string; - /** - * A set of group IDs that have already encountered failures. - */ - readonly #failedGroupIds: Set; + readonly #utils: SqsFifoProcessingUtils; public constructor() { super(EventType.SQS); - this.#failedGroupIds = new Set(); + this.#utils = new SqsFifoProcessingUtils(); } /** @@ -70,9 +67,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { record: EventSourceDataClassTypes, exception: Error ): FailureResponse { - if (this.options?.skipGroupOnError && this.#currentGroupId) { - this.#addToFailedGroup(this.#currentGroupId); - } + this.#utils.processFailureForCurrentGroup(this.options); return super.failureHandler(record, exception); } @@ -101,13 +96,15 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this.#utils.setCurrentGroup( + (record as SQSRecord).attributes?.MessageGroupId + ); - if (this.#shouldShortCircuit()) { + if (this.#utils.shouldShortCircuit(this.failureMessages, this.options)) { return this.shortCircuitProcessing(currentIndex, processedRecords); } - const result = this.#shouldSkipCurrentGroup() + const result = this.#utils.shouldSkipCurrentGroup(this.options) ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() @@ -150,15 +147,6 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { return processedRecords; } - /** - * Adds the specified group ID to the set of failed group IDs. - * - * @param group - The group ID to be added to the set of failed group IDs. - */ - #addToFailedGroup(group: string): void { - this.#failedGroupIds.add(group); - } - /** * Processes a fail record. * @@ -173,39 +161,6 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { return this.failureHandler(data, exception); } - - /** - * Sets the current group ID for the message being processed. - * - * @param group - The group ID of the current message being processed. - */ - #setCurrentGroup(group?: string): void { - this.#currentGroupId = group; - } - - /** - * Determines whether the current group should be short-circuited. - * - * If we have any failed messages, we should then short circuit the process and - * fail remaining messages unless `skipGroupOnError` is true - */ - #shouldShortCircuit(): boolean { - return !this.options?.skipGroupOnError && this.failureMessages.length !== 0; - } - - /** - * Determines whether the current group should be skipped. - * - * If `skipGroupOnError` is true and the current group has previously failed, - * then we should skip processing the current group. - */ - #shouldSkipCurrentGroup(): boolean { - return ( - (this.options?.skipGroupOnError ?? false) && - this.#currentGroupId && - this.#failedGroupIds.has(this.#currentGroupId) - ); - } } export { SqsFifoPartialProcessorAsync }; diff --git a/packages/batch/src/SqsFifoProcessingUtils.ts b/packages/batch/src/SqsFifoProcessingUtils.ts new file mode 100644 index 0000000000..cb0e33c723 --- /dev/null +++ b/packages/batch/src/SqsFifoProcessingUtils.ts @@ -0,0 +1,86 @@ +import type { + BatchProcessingOptions, + EventSourceDataClassTypes, +} from './types.js'; + +/** + * Utility class to handle processing of SQS FIFO messages. + */ +class SqsFifoProcessingUtils { + /** + * The ID of the current message group being processed. + */ + #currentGroupId?: string; + + /** + * A set of group IDs that have already encountered failures. + */ + readonly #failedGroupIds: Set; + + public constructor() { + this.#failedGroupIds = new Set(); + } + + /** + * Adds the specified group ID to the set of failed group IDs. + * + * @param group - The group ID to be added to the set of failed group IDs. + */ + public addToFailedGroup(group: string): void { + this.#failedGroupIds.add(group); + } + + /** + * Sets the current group ID for the message being processed. + * + * @param group - The group ID of the current message being processed. + */ + public setCurrentGroup(group?: string): void { + this.#currentGroupId = group; + } + + /** + * Determines whether the current group should be short-circuited. + * + * If we have any failed messages, we should then short circuit the process and + * fail remaining messages unless `skipGroupOnError` is true + * + * @param failureMessages - The list of failure messages. + * @param options - The options for the batch processing. + */ + public shouldShortCircuit( + failureMessages: EventSourceDataClassTypes[], + options?: BatchProcessingOptions + ): boolean { + return !options?.skipGroupOnError && failureMessages.length !== 0; + } + + /** + * Determines whether the current group should be skipped. + * + * If `skipGroupOnError` is true and the current group has previously failed, + * then we should skip processing the current group. + * + * @param options - The options for the batch processing. + */ + public shouldSkipCurrentGroup(options?: BatchProcessingOptions): boolean { + return ( + (options?.skipGroupOnError ?? false) && + this.#currentGroupId && + this.#failedGroupIds.has(this.#currentGroupId) + ); + } + + /** + * Handles failure for current group + * + * @param options - The options for the batch processing. + */ + public processFailureForCurrentGroup(options?: BatchProcessingOptions) { + if (options?.skipGroupOnError && this.#currentGroupId) { + this.addToFailedGroup(this.#currentGroupId); + } + } +} + +export { SqsFifoProcessingUtils }; From b27f651e32635f747a0af96597bfba3eb67d2d07 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Thu, 7 Nov 2024 10:53:08 +0600 Subject: [PATCH 15/17] doc: update doc comments for `failureHandler` --- packages/batch/src/SqsFifoPartialProcessorAsync.ts | 3 +-- packages/batch/src/SqsFifoProcessingUtils.ts | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index e566fc01e6..492e21b609 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -58,10 +58,9 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { /** * Handles a failure for a given record. - * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. + * * @param record - The record that failed. * @param exception - The error that occurred. - * @returns The failure response. */ public failureHandler( record: EventSourceDataClassTypes, diff --git a/packages/batch/src/SqsFifoProcessingUtils.ts b/packages/batch/src/SqsFifoProcessingUtils.ts index cb0e33c723..c60959a3d0 100644 --- a/packages/batch/src/SqsFifoProcessingUtils.ts +++ b/packages/batch/src/SqsFifoProcessingUtils.ts @@ -73,6 +73,7 @@ class SqsFifoProcessingUtils { /** * Handles failure for current group + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. * * @param options - The options for the batch processing. */ From 5d9d203d00253a6fa91e887e85fe7b7a4d7abfa4 Mon Sep 17 00:00:00 2001 From: Asifur Rahman Date: Thu, 7 Nov 2024 19:37:44 +0600 Subject: [PATCH 16/17] Update packages/batch/src/processPartialResponse.ts Co-authored-by: Alexander Schueren --- packages/batch/src/processPartialResponse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index 99331b9fff..d50fc478df 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -70,7 +70,7 @@ import type { * @param event The event object containing the batch of records * @param recordHandler Async function to process each record from the batch * @param processor Batch processor instance to handle the batch processing - * @param options Batch processing options, which can vary with chosen batch processor implementation + * @param options Batch processing options, see {{@link BatchProcessingOptions}} */ const processPartialResponse = async ( event: { Records: BaseRecord[] }, From fe2547779b06c1b28c37980819f0a26c266db0be Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Fri, 8 Nov 2024 14:23:36 +0600 Subject: [PATCH 17/17] refactor: `SqsFifoProcessor` for functionalities used in fifo processor classes --- packages/batch/src/SqsFifoPartialProcessor.ts | 54 +++++-------------- .../batch/src/SqsFifoPartialProcessorAsync.ts | 18 ++++--- ...ProcessingUtils.ts => SqsFifoProcessor.ts} | 8 +-- 3 files changed, 27 insertions(+), 53 deletions(-) rename packages/batch/src/{SqsFifoProcessingUtils.ts => SqsFifoProcessor.ts} (88%) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 6a9546fcb6..5854e75e42 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,5 +1,6 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessorSync } from './BatchProcessorSync.js'; +import { SqsFifoProcessor } from './SqsFifoProcessor.js'; import { EventType } from './constants.js'; import { type BatchProcessingError, @@ -46,17 +47,13 @@ import type { */ class SqsFifoPartialProcessor extends BatchProcessorSync { /** - * The ID of the current message group being processed. + * Processor for handling SQS FIFO message */ - #currentGroupId?: string; - /** - * A set of group IDs that have already encountered failures. - */ - #failedGroupIds: Set; + readonly #processor: SqsFifoProcessor; public constructor() { super(EventType.SQS); - this.#failedGroupIds = new Set(); + this.#processor = new SqsFifoProcessor(); } /** @@ -70,9 +67,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { record: EventSourceDataClassTypes, exception: Error ): FailureResponse { - if (this.options?.skipGroupOnError && this.#currentGroupId) { - this.#addToFailedGroup(this.#currentGroupId); - } + this.#processor.processFailureForCurrentGroup(this.options); return super.failureHandler(record, exception); } @@ -101,24 +96,17 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this.#processor.setCurrentGroup( + (record as SQSRecord).attributes?.MessageGroupId + ); - // If we have any failed messages, we should then short circuit the process and - // fail remaining messages unless `skipGroupOnError` is true - const shouldShortCircuit = - !this.options?.skipGroupOnError && this.failureMessages.length !== 0; - if (shouldShortCircuit) { + if ( + this.#processor.shouldShortCircuit(this.failureMessages, this.options) + ) { return this.shortCircuitProcessing(currentIndex, processedRecords); } - // If `skipGroupOnError` is true and the current group has previously failed, - // then we should skip processing the current group. - const shouldSkipCurrentGroup = - this.options?.skipGroupOnError && - this.#currentGroupId && - this.#failedGroupIds.has(this.#currentGroupId); - - const result = shouldSkipCurrentGroup + const result = this.#processor.shouldSkipCurrentGroup(this.options) ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() @@ -161,15 +149,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { return processedRecords; } - /** - * Adds the specified group ID to the set of failed group IDs. - * - * @param group - The group ID to be added to the set of failed group IDs. - */ - #addToFailedGroup(group: string): void { - this.#failedGroupIds.add(group); - } - /** * Processes a fail record. * @@ -184,15 +163,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { return this.failureHandler(data, exception); } - - /** - * Sets the current group ID for the message being processed. - * - * @param group - The group ID of the current message being processed. - */ - #setCurrentGroup(group?: string): void { - this.#currentGroupId = group; - } } export { SqsFifoPartialProcessor }; diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 492e21b609..98d47a3bb1 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -1,6 +1,6 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessor } from './BatchProcessor.js'; -import { SqsFifoProcessingUtils } from './SqsFifoProcessingUtils.js'; +import { SqsFifoProcessor } from './SqsFifoProcessor.js'; import { EventType } from './constants.js'; import { type BatchProcessingError, @@ -47,13 +47,13 @@ import type { */ class SqsFifoPartialProcessorAsync extends BatchProcessor { /** - * Utility class for processing SQS FIFO queues + * Processor for handling SQS FIFO message */ - readonly #utils: SqsFifoProcessingUtils; + readonly #processor: SqsFifoProcessor; public constructor() { super(EventType.SQS); - this.#utils = new SqsFifoProcessingUtils(); + this.#processor = new SqsFifoProcessor(); } /** @@ -66,7 +66,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { record: EventSourceDataClassTypes, exception: Error ): FailureResponse { - this.#utils.processFailureForCurrentGroup(this.options); + this.#processor.processFailureForCurrentGroup(this.options); return super.failureHandler(record, exception); } @@ -95,15 +95,17 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this.#utils.setCurrentGroup( + this.#processor.setCurrentGroup( (record as SQSRecord).attributes?.MessageGroupId ); - if (this.#utils.shouldShortCircuit(this.failureMessages, this.options)) { + if ( + this.#processor.shouldShortCircuit(this.failureMessages, this.options) + ) { return this.shortCircuitProcessing(currentIndex, processedRecords); } - const result = this.#utils.shouldSkipCurrentGroup(this.options) + const result = this.#processor.shouldSkipCurrentGroup(this.options) ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() diff --git a/packages/batch/src/SqsFifoProcessingUtils.ts b/packages/batch/src/SqsFifoProcessor.ts similarity index 88% rename from packages/batch/src/SqsFifoProcessingUtils.ts rename to packages/batch/src/SqsFifoProcessor.ts index c60959a3d0..d24510eba1 100644 --- a/packages/batch/src/SqsFifoProcessingUtils.ts +++ b/packages/batch/src/SqsFifoProcessor.ts @@ -4,9 +4,11 @@ import type { } from './types.js'; /** - * Utility class to handle processing of SQS FIFO messages. + * Class representing a processor for SQS FIFO messages. + * This class provides utilities for handling message groups, including tracking failed groups, + * determining whether to short-circuit processing, and skipping groups based on processing options. */ -class SqsFifoProcessingUtils { +class SqsFifoProcessor { /** * The ID of the current message group being processed. */ @@ -84,4 +86,4 @@ class SqsFifoProcessingUtils { } } -export { SqsFifoProcessingUtils }; +export { SqsFifoProcessor };