From c2339edaf645ff159b08644cd7c986e74c18bf0d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 26 May 2024 17:20:06 +0600 Subject: [PATCH 01/24] feat: generic type for BatchProcessingOptions skipGroupOnError can be set only for SqsFifoPartialProcessor --- packages/batch/src/types.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 5033ce6f5a..e1f0155db1 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -4,9 +4,12 @@ import type { KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; +import { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor'; +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -type BatchProcessingOptions = { +type BatchProcessingOptions = { context: Context; + skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; }; type EventSourceDataClassTypes = From c080b8d2bfc2cab67b0d4a03bea80509d3f385db Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 26 May 2024 17:20:51 +0600 Subject: [PATCH 02/24] refactor: update processPartialResponseSync function for new generic BatchProcessingOptions type --- packages/batch/src/processPartialResponseSync.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index 7fb7056385..de2ce7798a 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -13,11 +13,11 @@ import type { * @param processor Batch processor to handle partial failure cases * @returns Lambda Partial Batch Response */ -const processPartialResponseSync = ( +const processPartialResponseSync = ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, - processor: BasePartialBatchProcessor, - options?: BatchProcessingOptions + processor: T, + options?: BatchProcessingOptions ): PartialItemFailureResponse => { if (!event.Records || !Array.isArray(event.Records)) { throw new UnexpectedBatchTypeError(); From b9197ce31caacee9ca1bfa62336b6a3077b5dad1 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Mon, 27 May 2024 21:18:05 +0600 Subject: [PATCH 03/24] feat: SqsFifoMessageGroupShortCircuitError exception --- packages/batch/src/errors.ts | 12 ++++++++++++ packages/batch/src/index.ts | 1 + 2 files changed, 13 insertions(+) diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index 4ce6b27de9..547d8414ec 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -37,6 +37,17 @@ class SqsFifoShortCircuitError extends BatchProcessingError { } } +/** + * Error thrown by the Batch Processing utility when a previous record from + * SQS FIFO queue message group fails processing. + */ +class SqsFifoMessageGroupShortCircuitError extends BatchProcessingError { + public constructor() { + super('A previous record from this message group failed processing'); + this.name = 'SqsFifoMessageGroupShortCircuitError'; + } +} + /** * Error thrown by the Batch Processing utility when a partial processor receives an unexpected * batch type. @@ -55,5 +66,6 @@ export { BatchProcessingError, FullBatchFailureError, SqsFifoShortCircuitError, + SqsFifoMessageGroupShortCircuitError, UnexpectedBatchTypeError, }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 6613712b7e..499202f722 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -3,6 +3,7 @@ export { BatchProcessingError, FullBatchFailureError, SqsFifoShortCircuitError, + SqsFifoMessageGroupShortCircuitError, UnexpectedBatchTypeError, } from './errors.js'; export { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; From f9a5ab0a58040bfae112341a38acc98995a5c911 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 28 May 2024 16:22:51 +0600 Subject: [PATCH 04/24] feat: stick to the default behavior of short-circuit If skipGroupOnError is not true & have failed messages, we will short circuit the process. This is similar to old implementation --- packages/batch/src/SqsFifoPartialProcessor.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 3189c00fd5..4259dd35bf 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -24,9 +24,12 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - // If we have any failed messages, it means the last message failed - // We should then short circuit the process and fail remaining messages - if (this.failureMessages.length != 0) { + // If we have any failed messages, it means the last message failed. + // We should then short circuit the process and + // fail remaining messages(unless skipGroupOnError is set to true) + const shouldShortCircuit = + !this.options?.skipGroupOnError && this.failureMessages.length !== 0; + if (shouldShortCircuit) { return this.shortCircuitProcessing(currentIndex, processedRecords); } From 2405629962e0c2e43365ac95b1818366ba376d1e Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 28 May 2024 18:47:22 +0600 Subject: [PATCH 05/24] feat: skip processing record of group if it has already failed before --- packages/batch/src/SqsFifoPartialProcessor.ts | 104 ++++++++++++++++-- packages/batch/src/types.ts | 4 +- 2 files changed, 95 insertions(+), 13 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 4259dd35bf..23766ad714 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,22 +1,62 @@ +import { SQSRecord } from 'aws-lambda'; import { BatchProcessorSync } from './BatchProcessorSync.js'; import { EventType } from './constants.js'; -import { SqsFifoShortCircuitError } from './errors.js'; -import type { FailureResponse, SuccessResponse } from './types.js'; +import { + BatchProcessingError, + SqsFifoMessageGroupShortCircuitError, + SqsFifoShortCircuitError, +} from './errors.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; /** * Process native partial responses from SQS FIFO queues - * Stops processing records when the first record fails - * The remaining records are reported as failed items + * If `skipGroupOnError` is not enabled, stop processing records + * when the first record fails and the remaining records are reported as failed items. + * If `skipGroupOnError` is enabled, skip processing of subsequent records + * in the same message group after the first failure in that group. */ class SqsFifoPartialProcessor extends BatchProcessorSync { + /** + * The ID of the current message group being processed. + */ + private currentGroupId?: string; + /** + * A set of group IDs that have already encountered failures. + */ + private failedGroupIds: Set; + public constructor() { super(EventType.SQS); + this.failedGroupIds = new Set(); + } + + /** + * Handles a failure for a given record. + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is enabled. + * @param record - The record that failed. + * @param exception - The error that occurred. + * @returns The failure response. + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + if (this.options?.skipGroupOnError && this.currentGroupId) + this.addToFailedGroup(this.currentGroupId); + + return super.failureHandler(record, exception); } /** * Call instance's handler for each record. * When the first failed message is detected, the process is short-circuited - * And the remaining messages are reported as failed items + * And the remaining messages are reported as failed items, + * unless the `skipGroupOnError` option is enabled. */ public processSync(): (SuccessResponse | FailureResponse)[] { this.prepare(); @@ -24,16 +64,30 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { + this.setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + // If we have any failed messages, it means the last message failed. // We should then short circuit the process and - // fail remaining messages(unless skipGroupOnError is set to true) + // fail remaining messages unless `skipGroupOnError` is enabled const shouldShortCircuit = !this.options?.skipGroupOnError && this.failureMessages.length !== 0; if (shouldShortCircuit) { return this.shortCircuitProcessing(currentIndex, processedRecords); } - processedRecords.push(this.processRecordSync(record)); + const shouldSkipCurrentGroup = + this.options?.skipGroupOnError && + this.currentGroupId && + this.failedGroupIds.has(this.currentGroupId); + + const result = shouldSkipCurrentGroup + ? this.processFailRecord( + record, + new SqsFifoMessageGroupShortCircuitError() + ) + : this.processRecordSync(record); + + processedRecords.push(result); currentIndex++; } @@ -54,16 +108,44 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - const data = this.toBatchType(record, this.eventType); - processedRecords.push( - this.failureHandler(data, new SqsFifoShortCircuitError()) - ); + this.processFailRecord(record, new SqsFifoShortCircuitError()); } this.clean(); return processedRecords; } + + /** + * Adds the specified group ID to the set of failed group IDs. + * @param group - The group ID to be added to the set of failed group IDs. + */ + private addToFailedGroup(group: string): void { + this.failedGroupIds.add(group); + } + + /** + * Processes a fail record. + * @param record - The record that failed. + * @param exception - The error that occurred. + * @returns The failure response. + */ + private processFailRecord( + record: BaseRecord, + exception: BatchProcessingError + ): FailureResponse { + const data = this.toBatchType(record, this.eventType); + + return this.failureHandler(data, exception); + } + + /** + * Sets the current group ID for the message being processed. + * @param group - The group ID of the current message being processed. + */ + private setCurrentGroup(group?: string): void { + this.currentGroupId = group; + } } export { SqsFifoPartialProcessor }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index e1f0155db1..eb8141ab61 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -4,8 +4,8 @@ import type { KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; -import { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor'; -import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; type BatchProcessingOptions = { context: Context; From c5b504109278e88dce58c1f9eaa1363491a649b0 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 28 May 2024 19:31:21 +0600 Subject: [PATCH 06/24] test: skipGroupOnError flag for SqsFifoPartialProcessor --- packages/batch/tests/helpers/factories.ts | 3 +- .../unit/SqsFifoPartialProcessor.test.ts | 77 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index 7df6110742..e8af6e696a 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -5,7 +5,7 @@ import type { } from 'aws-lambda'; import { randomInt, randomUUID } from 'node:crypto'; -const sqsRecordFactory = (body: string): SQSRecord => { +const sqsRecordFactory = (body: string, messageGroupId?: string): SQSRecord => { return { messageId: randomUUID(), receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', @@ -15,6 +15,7 @@ const sqsRecordFactory = (body: string): SQSRecord => { SentTimestamp: '1545082649183', SenderId: 'AIDAIENQZJOLO23YVJ4VO', ApproximateFirstReceiveTimestamp: '1545082649185', + MessageGroupId: messageGroupId, }, messageAttributes: {}, md5OfBody: 'e4e68fb7bd0e697a0ae8f1bb342846b3', diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 61d12183fc..cc94fab0e6 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -7,9 +7,11 @@ import { SqsFifoPartialProcessor, processPartialResponseSync, SqsFifoShortCircuitError, + SqsFifoMessageGroupShortCircuitError, } from '../../src/index.js'; import { sqsRecordFactory } from '../helpers/factories.js'; import { sqsRecordHandler } from '../helpers/handlers.js'; +import context from '@aws-lambda-powertools/testing-utils/context'; describe('Class: SqsFifoBatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -68,5 +70,80 @@ describe('Class: SqsFifoBatchProcessor', () => { ); expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); }); + + test('When `skipGroupOnError` is true, SQS FIFO Batch processor keeps on processing record after failure and only skips a record if its previously failed', () => { + // Prepare + const messageGroupId = 'same_group'; + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail', messageGroupId); + const thirdRecord = sqsRecordFactory('success', 'another_group'); + const fourthRecord = sqsRecordFactory('fail', messageGroupId); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + context, + skipGroupOnError: true, + } + ); + + //Assess + expect(result['batchItemFailures'].length).toBe(2); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(2); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + test('When `skipGroupOnError` is false, SQS FIFO Batch processor short circuits the process on first failure', () => { + // Prepare + const messageGroupId = 'same_group'; + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail', messageGroupId); + const thirdRecord = sqsRecordFactory('success'); + const fourthRecord = sqsRecordFactory('fail', messageGroupId); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + context, + skipGroupOnError: false, + } + ); + + //Assess + expect(result['batchItemFailures'].length).toBe(3); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + expect(result['batchItemFailures'][2]['itemIdentifier']).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(3); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); }); }); From 6a5477b709c678a8f06202a4355e28c3d3022b1d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 28 May 2024 19:55:28 +0600 Subject: [PATCH 07/24] doc: describe skipGroupOnError with example/diagram --- docs/utilities/batch.md | 48 ++++++++++++++++--- .../gettingStartedSQSFifoSkipGroupOnError.ts | 32 +++++++++++++ 2 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index c8e253492a..4d7e67a483 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -141,12 +141,23 @@ Processing batches from SQS works in three stages: #### FIFO queues -When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. -This helps preserve the ordering of messages in your queue. +When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, a batch may include messages from different group IDs. -```typescript hl_lines="1-4 13 28-30" ---8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" -``` +By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. + +Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID. + +=== "Recommended" + + ```typescript hl_lines="1-4 13 28-30" + --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" + ``` + +=== "Enabling skipGroupOnError flag" + + ```typescript hl_lines="1-4 13 28-30" + --8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts" + ``` 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) @@ -283,7 +294,7 @@ sequenceDiagram > Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. -Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues. +Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skipGroupOnError` flag.
```mermaid @@ -307,6 +318,31 @@ sequenceDiagram SQS FIFO mechanism with Batch Item Failures
+Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skipGroupOnError` flag. + +
+```mermaid +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function-->Lambda function: Process messages from another MessageGroupID + Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure + deactivate Lambda function + activate SQS queue + Lambda service->>SQS queue: Delete successful messages processed + SQS queue-->>SQS queue: Failed messages return + deactivate SQS queue +``` +SQS FIFO mechanism with Batch Item Failures +
+ #### Kinesis and DynamoDB Streams > Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}. diff --git a/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts new file mode 100644 index 0000000000..1fc6616db8 --- /dev/null +++ b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts @@ -0,0 +1,32 @@ +import { + SqsFifoPartialProcessor, + processPartialResponseSync, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new SqsFifoPartialProcessor(); // (1)! +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponseSync(event, recordHandler, processor, { + context, + skipGroupOnError: true, + }); +}; From ca18ea7650a0146fb1969ac0de1960c40a2b4b2c Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 29 May 2024 20:19:14 +0600 Subject: [PATCH 08/24] test: SQS Fifo for different scenarios by `skipGroupOnError` flag --- .../unit/SqsFifoPartialProcessor.test.ts | 92 +++++++++++++++---- 1 file changed, 76 insertions(+), 16 deletions(-) diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index cc94fab0e6..548de645e9 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -71,15 +71,21 @@ describe('Class: SqsFifoBatchProcessor', () => { expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); }); - test('When `skipGroupOnError` is true, SQS FIFO Batch processor keeps on processing record after failure and only skips a record if its previously failed', () => { + test('When `skipGroupOnError` is true, SQS FIFO processor is set to continue processing even after first failure', () => { // Prepare - const messageGroupId = 'same_group'; - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('fail', messageGroupId); - const thirdRecord = sqsRecordFactory('success', 'another_group'); - const fourthRecord = sqsRecordFactory('fail', messageGroupId); + const firstRecord = sqsRecordFactory('fail', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('success', '3'); const event = { - Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], }; const processor = new SqsFifoPartialProcessor(); @@ -94,15 +100,70 @@ describe('Class: SqsFifoBatchProcessor', () => { } ); - //Assess - expect(result['batchItemFailures'].length).toBe(2); + // Assess + expect(result['batchItemFailures'].length).toBe(4); expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + firstRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( secondRecord.messageId ); + expect(result['batchItemFailures'][2]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + expect(result['batchItemFailures'][3]['itemIdentifier']).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(4); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + expect(processor.errors[3]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + test('When `skipGroupOnError` is true, SQS FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('fail', '3'); + const event = { + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + context, + skipGroupOnError: true, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(3); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + thirdRecord.messageId + ); expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( fourthRecord.messageId ); - expect(processor.errors.length).toBe(2); + expect(result['batchItemFailures'][2]['itemIdentifier']).toBe( + fifthRecord.messageId + ); + expect(processor.errors.length).toBe(3); expect(processor.errors[1]).toBeInstanceOf( SqsFifoMessageGroupShortCircuitError ); @@ -110,11 +171,10 @@ describe('Class: SqsFifoBatchProcessor', () => { test('When `skipGroupOnError` is false, SQS FIFO Batch processor short circuits the process on first failure', () => { // Prepare - const messageGroupId = 'same_group'; - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('fail', messageGroupId); - const thirdRecord = sqsRecordFactory('success'); - const fourthRecord = sqsRecordFactory('fail', messageGroupId); + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('fail', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); const event = { Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], }; @@ -131,7 +191,7 @@ describe('Class: SqsFifoBatchProcessor', () => { } ); - //Assess + // Assess expect(result['batchItemFailures'].length).toBe(3); expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( secondRecord.messageId From 249ca8bf77bead9adaeda1d41ca9e1deacc30e0a Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 29 May 2024 21:04:27 +0600 Subject: [PATCH 09/24] doc: fix SQS FIFO queues aws documentation link --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 4d7e67a483..e60338f4bf 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -141,7 +141,7 @@ Processing batches from SQS works in three stages: #### FIFO queues -When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, a batch may include messages from different group IDs. +When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html){target="_blank"}, a batch may include messages from different group IDs. By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. From a5203e29cbbdcf8291f809023b85b34b052ed3ad Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 29 May 2024 22:58:34 +0600 Subject: [PATCH 10/24] doc: update comments of SqsFifoPartialProcessor --- packages/batch/src/SqsFifoPartialProcessor.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 23766ad714..e5cae5406e 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -17,7 +17,7 @@ import type { * Process native partial responses from SQS FIFO queues * If `skipGroupOnError` is not enabled, stop processing records * when the first record fails and the remaining records are reported as failed items. - * If `skipGroupOnError` is enabled, skip processing of subsequent records + * If `skipGroupOnError` is true, skip processing of subsequent records * in the same message group after the first failure in that group. */ class SqsFifoPartialProcessor extends BatchProcessorSync { @@ -37,7 +37,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { /** * Handles a failure for a given record. - * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is enabled. + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. * @param record - The record that failed. * @param exception - The error that occurred. * @returns The failure response. @@ -46,8 +46,9 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { record: EventSourceDataClassTypes, exception: Error ): FailureResponse { - if (this.options?.skipGroupOnError && this.currentGroupId) + if (this.options?.skipGroupOnError && this.currentGroupId) { this.addToFailedGroup(this.currentGroupId); + } return super.failureHandler(record, exception); } @@ -56,7 +57,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { * Call instance's handler for each record. * When the first failed message is detected, the process is short-circuited * And the remaining messages are reported as failed items, - * unless the `skipGroupOnError` option is enabled. + * unless the `skipGroupOnError` option is true. */ public processSync(): (SuccessResponse | FailureResponse)[] { this.prepare(); @@ -66,15 +67,16 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { for (const record of this.records) { this.setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); - // If we have any failed messages, it means the last message failed. - // We should then short circuit the process and - // fail remaining messages unless `skipGroupOnError` is enabled + // If we have any failed messages, we should then short circuit the process and + // fail remaining messages unless `skipGroupOnError` is true const shouldShortCircuit = !this.options?.skipGroupOnError && this.failureMessages.length !== 0; if (shouldShortCircuit) { return this.shortCircuitProcessing(currentIndex, processedRecords); } + // If `skipGroupOnError` is true and the current group has previously failed, + // then we should skip processing the current group. const shouldSkipCurrentGroup = this.options?.skipGroupOnError && this.currentGroupId && From 8713413c6d89faf80ccf99fc0b7a06ec007c21c6 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 29 May 2024 22:59:43 +0600 Subject: [PATCH 11/24] test: change the last record to success Because this will be more accurate with the test description --- .../batch/tests/unit/SqsFifoPartialProcessor.test.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 548de645e9..70cba5a168 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -129,7 +129,7 @@ describe('Class: SqsFifoBatchProcessor', () => { const secondRecord = sqsRecordFactory('success', '1'); const thirdRecord = sqsRecordFactory('fail', '2'); const fourthRecord = sqsRecordFactory('success', '2'); - const fifthRecord = sqsRecordFactory('fail', '3'); + const fifthRecord = sqsRecordFactory('success', '3'); const event = { Records: [ firstRecord, @@ -153,17 +153,14 @@ describe('Class: SqsFifoBatchProcessor', () => { ); // Assess - expect(result['batchItemFailures'].length).toBe(3); + expect(result['batchItemFailures'].length).toBe(2); expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( thirdRecord.messageId ); expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( fourthRecord.messageId ); - expect(result['batchItemFailures'][2]['itemIdentifier']).toBe( - fifthRecord.messageId - ); - expect(processor.errors.length).toBe(3); + expect(processor.errors.length).toBe(2); expect(processor.errors[1]).toBeInstanceOf( SqsFifoMessageGroupShortCircuitError ); From ffe9366766e2dc467a6c0db57099e73b21100d13 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Thu, 30 May 2024 09:54:31 +0600 Subject: [PATCH 12/24] doc: update comments for `skipGroupOnError` flag --- .../batch/src/processPartialResponseSync.ts | 21 ++++++++++++++++++- packages/batch/src/types.ts | 6 ++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index 1421ec96be..97df61348f 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -41,11 +41,30 @@ import type { * context, * }); * ``` + * @example + * ```typescript + * import { + * SqsFifoPartialProcessor, + * processPartialResponseSync, + * } from '@aws-lambda-powertools/batch'; + * import type { SQSRecord, SQSHandler } from 'aws-lambda'; + * + * const processor = new SqsFifoPartialProcessor(); + * + * const recordHandler = async (record: SQSRecord): Promise => { + * const payload = JSON.parse(record.body); + * }; * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponseSync(event, recordHandler, processor, { + * context, + * skipGroupOnError: true + * }); + * ``` * @param event The event object containing the batch of records * @param recordHandler Sync function to process each record from the batch * @param processor Batch processor instance to handle the batch processing - * @param options Batch processing options + * @param options Batch processing options, which can vary with chosen batch processor implementation */ const processPartialResponseSync = ( event: { Records: BaseRecord[] }, diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 1af03b8b26..ebe71db950 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -10,7 +10,9 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; /** * Options for batch processing * + * @template T The type of the batch processor, defaults to BasePartialBatchProcessor * @property context The context object provided by the AWS Lambda runtime + * @property skipGroupOnError The option to group on error during processing */ type BatchProcessingOptions = { /** @@ -18,6 +20,10 @@ type BatchProcessingOptions = { * it's made available to the handler function you specify */ context: Context; + /** + * This option is only available for SqsFifoPartialProcessor. + * If true skip the group on error during processing. + */ skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; }; From adf5d3ee32e00c31c660503f8e3d7933ce8ed0b9 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Fri, 31 May 2024 18:03:25 +0600 Subject: [PATCH 13/24] test: FIFO Batch processor processes everything on success despite the`skipGroupOnError` option --- .../unit/SqsFifoPartialProcessor.test.ts | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 70cba5a168..28649b328b 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -166,6 +166,60 @@ describe('Class: SqsFifoBatchProcessor', () => { ); }); + test('When `skipGroupOnError` is true, SQS FIFO Batch processor processes everything with no failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('success', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + context, + skipGroupOnError: true, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(0); + expect(processor.errors.length).toBe(0); + }); + + test('When `skipGroupOnError` is false, SQS FIFO Batch processor processes everything with no failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('success', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + context, + skipGroupOnError: false, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(0); + expect(processor.errors.length).toBe(0); + }); + test('When `skipGroupOnError` is false, SQS FIFO Batch processor short circuits the process on first failure', () => { // Prepare const firstRecord = sqsRecordFactory('success', '1'); From 4cf98394cd44f009eba71e47cf71e7f73c639e5a Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Fri, 31 May 2024 18:28:24 +0600 Subject: [PATCH 14/24] style: space after example --- packages/batch/src/processPartialResponseSync.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index 97df61348f..7fb2cf6b89 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -61,6 +61,7 @@ import type { * skipGroupOnError: true * }); * ``` + * * @param event The event object containing the batch of records * @param recordHandler Sync function to process each record from the batch * @param processor Batch processor instance to handle the batch processing From 082f8b7777b5ba257e97ba98c8e9b297f4a3d42a Mon Sep 17 00:00:00 2001 From: Asifur Rahman Arnab Date: Tue, 4 Jun 2024 09:47:18 +0600 Subject: [PATCH 15/24] fix: check if messageGroupId exists Co-authored-by: Andrea Amorosi --- packages/batch/tests/helpers/factories.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index e8af6e696a..ce51736788 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -15,7 +15,7 @@ const sqsRecordFactory = (body: string, messageGroupId?: string): SQSRecord => { SentTimestamp: '1545082649183', SenderId: 'AIDAIENQZJOLO23YVJ4VO', ApproximateFirstReceiveTimestamp: '1545082649185', - MessageGroupId: messageGroupId, + ...(messageGroupId ? { MessageGroupId: messageGroupId } : {}), }, messageAttributes: {}, md5OfBody: 'e4e68fb7bd0e697a0ae8f1bb342846b3', From 9a0ac3b609da83debe2499585af8f686631915b4 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 09:49:51 +0600 Subject: [PATCH 16/24] doc: styling for processFailRecord docString --- packages/batch/src/SqsFifoPartialProcessor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 3182a1d45a..cb05ae8eed 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -171,9 +171,9 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { /** * Processes a fail record. + * * @param record - The record that failed. * @param exception - The error that occurred. - * @returns The failure response. */ private processFailRecord( record: BaseRecord, From c6d8c744253a99a823ff629ef514febe54a50ef1 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 10:08:24 +0600 Subject: [PATCH 17/24] doc: enhance processPartialResponseSync docstring --- packages/batch/src/processPartialResponseSync.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index 7fb2cf6b89..cdf349dd84 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -41,6 +41,12 @@ import type { * context, * }); * ``` + * + * When working with SQS FIFO queues, we will stop processing at the first failure + * and mark unprocessed messages as failed to preserve ordering. However, if you want to + * continue processing messages from different group IDs, you can enable the `skipGroupOnError` + * option for seamless processing of messages from various group IDs. + * * @example * ```typescript * import { From ff76eadc2b95c45ce0c75054325bb2bf654cd01e Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 10:17:56 +0600 Subject: [PATCH 18/24] fix: code highlight numbers --- docs/utilities/batch.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 87940d7284..bba466a832 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -149,13 +149,13 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va === "Recommended" - ```typescript hl_lines="1-4 13 28-30" + ```typescript hl_lines="1-4 8" --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" ``` === "Enabling skipGroupOnError flag" - ```typescript hl_lines="1-4 13 28-30" + ```typescript hl_lines="1-4 13 30" --8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts" ``` From c7dfdd359cc61e4b8064d00e3e3cfbe9829b6345 Mon Sep 17 00:00:00 2001 From: Asifur Rahman Arnab Date: Tue, 4 Jun 2024 10:27:57 +0600 Subject: [PATCH 19/24] fix: indentation of batch readme Co-authored-by: Andrea Amorosi --- docs/utilities/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index bba466a832..6c82305660 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -159,7 +159,7 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va --8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts" ``` -1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) + 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) !!! Note Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`. From a2d4e5b401d2c7435027d66f2ec6a98f751d07f3 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 18:05:54 +0600 Subject: [PATCH 20/24] doc: move annotation to fix rendering Moving to ~L155 didn't work, had to put a new line. So, now it's at L156. For better readability, also added a new line after annotation. --- docs/utilities/batch.md | 4 ++-- .../snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 6c82305660..9f8d8852c5 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -152,6 +152,8 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va ```typescript hl_lines="1-4 8" --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" ``` + + 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) === "Enabling skipGroupOnError flag" @@ -159,8 +161,6 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va --8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts" ``` - 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) - !!! Note Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`. This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details. diff --git a/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts index 1fc6616db8..3124e7105c 100644 --- a/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts +++ b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts @@ -10,7 +10,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new SqsFifoPartialProcessor(); // (1)! +const processor = new SqsFifoPartialProcessor(); const logger = new Logger(); const recordHandler = (record: SQSRecord): void => { From 03b471c0dbfca79a10ae0286f332c92e876e9ad5 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 18:09:48 +0600 Subject: [PATCH 21/24] refactor: make `context' optional --- packages/batch/src/types.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index ebe71db950..51f9b78e34 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -19,7 +19,7 @@ type BatchProcessingOptions = { * The context object provided by the AWS Lambda runtime. When provided, * it's made available to the handler function you specify */ - context: Context; + context?: Context; /** * This option is only available for SqsFifoPartialProcessor. * If true skip the group on error during processing. From a1ab4200df3bfeac33a1022263b813a777ea45f6 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 18:11:00 +0600 Subject: [PATCH 22/24] fix: remove `context` from `skipGroupOnError` tests --- packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 28649b328b..54f463caa8 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -11,7 +11,6 @@ import { } from '../../src/index.js'; import { sqsRecordFactory } from '../helpers/factories.js'; import { sqsRecordHandler } from '../helpers/handlers.js'; -import context from '@aws-lambda-powertools/testing-utils/context'; describe('Class: SqsFifoBatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -95,7 +94,6 @@ describe('Class: SqsFifoBatchProcessor', () => { sqsRecordHandler, processor, { - context, skipGroupOnError: true, } ); @@ -147,7 +145,6 @@ describe('Class: SqsFifoBatchProcessor', () => { sqsRecordHandler, processor, { - context, skipGroupOnError: true, } ); @@ -183,7 +180,6 @@ describe('Class: SqsFifoBatchProcessor', () => { sqsRecordHandler, processor, { - context, skipGroupOnError: true, } ); @@ -210,7 +206,6 @@ describe('Class: SqsFifoBatchProcessor', () => { sqsRecordHandler, processor, { - context, skipGroupOnError: false, } ); @@ -237,7 +232,6 @@ describe('Class: SqsFifoBatchProcessor', () => { sqsRecordHandler, processor, { - context, skipGroupOnError: false, } ); From f442956a71e05c97676f85545165f510f24a5153 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 18:39:18 +0600 Subject: [PATCH 23/24] refactor: change TS privates to JS private fields --- packages/batch/src/SqsFifoPartialProcessor.ts | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index cb05ae8eed..7910eac830 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -48,15 +48,15 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { /** * The ID of the current message group being processed. */ - private currentGroupId?: string; + #currentGroupId?: string; /** * A set of group IDs that have already encountered failures. */ - private failedGroupIds: Set; + #failedGroupIds: Set; public constructor() { super(EventType.SQS); - this.failedGroupIds = new Set(); + this.#failedGroupIds = new Set(); } /** @@ -70,8 +70,8 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { record: EventSourceDataClassTypes, exception: Error ): FailureResponse { - if (this.options?.skipGroupOnError && this.currentGroupId) { - this.addToFailedGroup(this.currentGroupId); + if (this.options?.skipGroupOnError && this.#currentGroupId) { + this.#addToFailedGroup(this.#currentGroupId); } return super.failureHandler(record, exception); @@ -101,7 +101,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this.setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); // If we have any failed messages, we should then short circuit the process and // fail remaining messages unless `skipGroupOnError` is true @@ -115,11 +115,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { // then we should skip processing the current group. const shouldSkipCurrentGroup = this.options?.skipGroupOnError && - this.currentGroupId && - this.failedGroupIds.has(this.currentGroupId); + this.#currentGroupId && + this.#failedGroupIds.has(this.#currentGroupId); const result = shouldSkipCurrentGroup - ? this.processFailRecord( + ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() ) @@ -153,7 +153,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - this.processFailRecord(record, new SqsFifoShortCircuitError()); + this.#processFailRecord(record, new SqsFifoShortCircuitError()); } this.clean(); @@ -165,8 +165,8 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { * Adds the specified group ID to the set of failed group IDs. * @param group - The group ID to be added to the set of failed group IDs. */ - private addToFailedGroup(group: string): void { - this.failedGroupIds.add(group); + #addToFailedGroup(group: string): void { + this.#failedGroupIds.add(group); } /** @@ -175,7 +175,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { * @param record - The record that failed. * @param exception - The error that occurred. */ - private processFailRecord( + #processFailRecord( record: BaseRecord, exception: BatchProcessingError ): FailureResponse { @@ -188,8 +188,8 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { * Sets the current group ID for the message being processed. * @param group - The group ID of the current message being processed. */ - private setCurrentGroup(group?: string): void { - this.currentGroupId = group; + #setCurrentGroup(group?: string): void { + this.#currentGroupId = group; } } From 035452453e043972652f1aa68d6698c405e118bc Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Tue, 4 Jun 2024 18:51:41 +0600 Subject: [PATCH 24/24] style: fix docstring spacing --- packages/batch/src/SqsFifoPartialProcessor.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 7910eac830..26354f3715 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -163,6 +163,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { /** * Adds the specified group ID to the set of failed group IDs. + * * @param group - The group ID to be added to the set of failed group IDs. */ #addToFailedGroup(group: string): void { @@ -186,6 +187,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { /** * Sets the current group ID for the message being processed. + * * @param group - The group ID of the current message being processed. */ #setCurrentGroup(group?: string): void {