From 562a32d36989f4f4eb4feefc6e2f9c433fde7337 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 10:40:39 +0600 Subject: [PATCH 01/16] feat: `processInParallel` in `BatchProcessingOptions` type --- packages/batch/src/types.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 7e60704535..892d318777 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -14,6 +14,7 @@ import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; * @property context The context object provided by the AWS Lambda runtime * @property skipGroupOnError The option to group on error during processing * @property throwOnFullBatchFailure The option to throw an error if the entire batch fails + * @property processInParallel Indicates whether the records should be processed in parallel */ type BatchProcessingOptions = { /** @@ -30,6 +31,13 @@ type BatchProcessingOptions = { * Set this to false to prevent throwing an error if the entire batch fails. */ throwOnFullBatchFailure?: boolean; + /** + * Indicates whether the records should be processed in parallel. + * When set to `true`, the records will be processed concurrently using `Promise.all`. + * When set to `false`, the records will be processed sequentially. + * @default true + */ + processInParallel?: boolean; }; /** From e205d045500fb55c9aef14007a9f65128feb4da1 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 10:59:44 +0600 Subject: [PATCH 02/16] feat: process the records based on `processInParallel` flag If the the option is set to true, process parallel otherwise process sequentially --- packages/batch/src/BasePartialProcessor.ts | 34 +++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 2e7e9bf8ac..4ee04d6967 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -122,11 +122,15 @@ abstract class BasePartialProcessor { } this.prepare(); - const processingPromises: Promise[] = - this.records.map((record) => this.processRecord(record)); - - const processedRecords: (SuccessResponse | FailureResponse)[] = - await Promise.all(processingPromises); + /** + * Process the records in parallel if the option is set to true. + * Otherwise, process the records sequentially. + */ + const processedRecords = this.options?.processInParallel + ? await Promise.all( + this.records.map((record) => this.processRecord(record)) + ) + : await this.#processSequentially(); this.clean(); @@ -134,9 +138,9 @@ abstract class BasePartialProcessor { } /** - * Process a record with an asyncronous handler + * Process a record with an asynchronous handler * - * An implementation of this method is required for asyncronous processors. + * An implementation of this method is required for asynchronous processors. * * When implementing this method, you should at least call the successHandler method * when a record succeeds processing and the failureHandler method when a record @@ -224,9 +228,8 @@ abstract class BasePartialProcessor { this.records = records; this.handler = handler; - if (options) { - this.options = options; - } + // By default, we process the records in parallel. + this.options = { processInParallel: true, ...options }; return this; } @@ -249,6 +252,17 @@ abstract class BasePartialProcessor { return entry; } + + /** + * Processes the records sequentially, ensuring that each record is processed one after the other. + */ + async #processSequentially(): Promise<(SuccessResponse | FailureResponse)[]> { + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + for (const record of this.records) { + processedRecords.push(await this.processRecord(record)); + } + return processedRecords; + } } export { BasePartialProcessor }; From 86639ba1a5ebf192c55a2db9b385c295639cdf9d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 12:49:38 +0600 Subject: [PATCH 03/16] tests: `BatchProcessor`process records asynchronously in parallel and sequential --- .../batch/tests/unit/BatchProcessor.test.ts | 405 ++++++++++-------- 1 file changed, 217 insertions(+), 188 deletions(-) diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index e49ee4c30f..9c0fea0182 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -35,198 +35,227 @@ describe('Class: AsyncBatchProcessor', () => { process.env = ENVIRONMENT_VARIABLES; }); - describe('Asynchronously processing SQS Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - it('completes processing with with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], - }); - }); - - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - describe('Asynchronously processing Kinesis Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); - - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], - }); + describe('Asynchronously processing', () => { + const testCases = [ + { + description: 'with parallel processing', + options: { processInParallel: true }, + }, + { + description: 'with sequential processing', + options: { processInParallel: false }, + }, + ]; + + describe('SQS Records', () => { + for (const { description, options } of testCases) { + describe(description, () => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + it('completes processing with with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], + }); + }); + + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + } }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); + describe('Kinesis Records', () => { + for (const { description, options } of testCases) { + describe(description, () => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); + + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], + }); + }); + + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + } }); - }); - - describe('Asynchronously processing DynamoDB Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], - ]); - }); - - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], - }); - }); - - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); + describe('DynamoDB Records', () => { + for (const { description, options } of testCases) { + describe(description, () => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + [ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ], + ]); + }); + + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], + }); + }); + + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + } }); }); From 38baf75d46d97c6fe00a71fa5a0a06eed32fec12 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 13:06:31 +0600 Subject: [PATCH 04/16] refactor: use `describe.each` instead for loop --- .../batch/tests/unit/BatchProcessor.test.ts | 358 +++++++++--------- 1 file changed, 174 insertions(+), 184 deletions(-) diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 9c0fea0182..1ecacf7ae9 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -36,226 +36,216 @@ describe('Class: AsyncBatchProcessor', () => { }); describe('Asynchronously processing', () => { - const testCases = [ + const cases = [ { - description: 'with parallel processing', + description: 'in parallel', options: { processInParallel: true }, }, { - description: 'with sequential processing', + description: 'sequentially', options: { processInParallel: false }, }, ]; describe('SQS Records', () => { - for (const { description, options } of testCases) { - describe(description, () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); + describe.each(cases)('$description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); - it('completes processing with with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], - }); + it('completes processing with with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], }); + }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); - // Act - processor.register(records, asyncSqsRecordHandler, options); + // Act + processor.register(records, asyncSqsRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); - } + }); }); describe('Kinesis Records', () => { - for (const { description, options } of testCases) { - describe(description, () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); + describe.each(cases)('$description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], - }); + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], }); + }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); - // Act - processor.register(records, asyncKinesisRecordHandler, options); + // Act + processor.register(records, asyncKinesisRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); - } + }); }); describe('DynamoDB Records', () => { - for (const { description, options } of testCases) { - describe(description, () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - [ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ], - ]); - }); + describe.each(cases)('$description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], - }); + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], }); + }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); - // Act - processor.register(records, asyncDynamodbRecordHandler, options); + // Act + processor.register(records, asyncDynamodbRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); - } + }); }); }); From 2fb6f5dd2c4696503911b07d8f8c50bfd294a9b5 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 16:35:38 +0600 Subject: [PATCH 05/16] test: `processPartialResponse` tests in parallel and sequentially --- .../tests/unit/processPartialResponse.test.ts | 605 ++++++++++-------- 1 file changed, 335 insertions(+), 270 deletions(-) diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 2b2ec185ad..09eb2e7e99 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -46,304 +46,369 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call tests', () => { - it('Process partial response function call with asynchronous handler', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act - const ret = await processPartialResponse( - batch, - asyncSqsRecordHandler, - processor - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); + const cases = [ + { + description: 'in parallel', + processingOptions: { processInParallel: true }, + }, + { + description: 'sequentially', + processingOptions: { processInParallel: false }, + }, + ]; + + describe.each(cases)('$description', ({ processingOptions }) => { + it('Process partial response function call with asynchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); - it('Process partial response function call with context provided', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act - const ret = await processPartialResponse( - batch, - asyncHandlerWithContext, - processor, - options - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); + // Act + const ret = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + processingOptions + ); - it('Process partial response function call with asynchronous handler for full batch failure', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); - // Act & Assess - await expect( - processPartialResponse(batch, asyncSqsRecordHandler, processor) - ).rejects.toThrow(FullBatchFailureError); - }); + it('Process partial response function call with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); - it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act & Assess - await expect( - processPartialResponse(batch, asyncSqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: true, - }) - ).rejects.toThrow(FullBatchFailureError); - }); + // Act + const ret = await processPartialResponse( + batch, + asyncHandlerWithContext, + processor, + { + ...processingOptions, + ...options, + } + ); - it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessor(EventType.SQS); - - // Act - const response = await processPartialResponse( - batch, - asyncSqsRecordHandler, - processor, - { - ...options, - throwOnFullBatchFailure: false, - } - ); - - // Assess - expect(response).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: records[0].messageId }, - { itemIdentifier: records[1].messageId }, - ], + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); }); - }); - }); - describe('Process partial response function call through handler', () => { - it('Process partial response through handler with SQS event', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor); - }; - - // Act - const result = await handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); + it('Process partial response function call with asynchronous handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + processingOptions + ) + ).rejects.toThrow(FullBatchFailureError); + }); - it('Process partial response through handler with Kinesis event', async () => { - // Prepare - const records = [ - kinesisRecordFactory('success'), - kinesisRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - const event: KinesisStreamEvent = { Records: records }; - - const handler = async ( - event: KinesisStreamEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncKinesisRecordHandler, - processor - ); - }; + it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect( + processPartialResponse(batch, asyncSqsRecordHandler, processor, { + ...processingOptions, + ...options, + throwOnFullBatchFailure: true, + }) + ).rejects.toThrow(FullBatchFailureError); + }); - // Act - const result = await handler(event, context); + it('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); + // Act + const response = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor, + { + ...processingOptions, + ...options, + throwOnFullBatchFailure: false, + } + ); + + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); + }); }); + }); - it('Process partial response through handler with DynamoDB event', async () => { - // Prepare - const records = [ - dynamodbRecordFactory('success'), - dynamodbRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - const event: DynamoDBStreamEvent = { Records: records }; - - const handler = async ( - event: DynamoDBStreamEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncDynamodbRecordHandler, - processor - ); - }; + describe('Process partial response function call through handler', () => { + const cases = [ + { + description: 'in parallel', + processingOptions: { processInParallel: true }, + }, + { + description: 'sequentially', + processingOptions: { processInParallel: false }, + }, + ]; + + describe.each(cases)('$description', ({ processingOptions }) => { + it('Process partial response through handler with SQS event', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse( + event, + asyncSqsRecordHandler, + processor, + processingOptions + ); + }; - // Act - const result = await handler(event, context); + // Act + const result = await handler(event, context); - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - it('Process partial response through handler for SQS records with incorrect event type', async () => { - // Prepare - const processor = new BatchProcessor(EventType.SQS); + it('Process partial response through handler with Kinesis event', async () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + const event: KinesisStreamEvent = { Records: records }; + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncKinesisRecordHandler, + processor, + processingOptions + ); + }; - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); - }; + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + it('Process partial response through handler with DynamoDB event', async () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + const event: DynamoDBStreamEvent = { Records: records }; + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncDynamodbRecordHandler, + processor, + processingOptions + ); + }; - try { // Act - await handler({} as unknown as SQSEvent, context); - } catch (error) { + const result = await handler(event, context); + // Assess - assert(error instanceof UnexpectedBatchTypeError); - expect(error.message).toBe( - `Unexpected batch type. Possible values are: ${Object.keys( - EventType - ).join(', ')}` - ); - } - }); + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - it('Process partial response through handler with context provided', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - context: Context - ): Promise => { - const options: BatchProcessingOptions = { context: context }; - - return await processPartialResponse( - event, - asyncHandlerWithContext, - processor, - options - ); - }; + it('Process partial response through handler for SQS records with incorrect event type', async () => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncSqsRecordHandler, + processor, + processingOptions + ); + }; + + try { + // Act + await handler({} as unknown as SQSEvent, context); + } catch (error) { + // Assess + assert(error instanceof UnexpectedBatchTypeError); + expect(error.message).toBe( + `Unexpected batch type. Possible values are: ${Object.keys( + EventType + ).join(', ')}` + ); + } + }); - // Act - const result = await handler(event, context); + it('Process partial response through handler with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + context: Context + ): Promise => { + const options: BatchProcessingOptions = { + context: context, + ...processingOptions, + }; + + return await processPartialResponse( + event, + asyncHandlerWithContext, + processor, + options + ); + }; - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); + // Act + const result = await handler(event, context); - it('Process partial response through handler for full batch failure', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor); - }; - - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - FullBatchFailureError - ); - }); + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); - it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: true, - }); - }; + it('Process partial response through handler for full batch failure', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse( + event, + asyncSqsRecordHandler, + processor, + processingOptions + ); + }; + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + FullBatchFailureError + ); + }); - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - FullBatchFailureError - ); - }); + it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse( + event, + asyncSqsRecordHandler, + processor, + { + ...options, + ...processingOptions, + throwOnFullBatchFailure: true, + } + ); + }; + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + FullBatchFailureError + ); + }); - it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse(event, asyncSqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: false, - }); - }; + it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { + // Prepare + const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse( + event, + asyncSqsRecordHandler, + processor, + { + ...options, + ...processingOptions, + throwOnFullBatchFailure: false, + } + ); + }; - // Act - const response = await handler(event, context); + // Act + const response = await handler(event, context); - // Assess - expect(response).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: records[0].messageId }, - { itemIdentifier: records[1].messageId }, - ], + // Assess + expect(response).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: records[0].messageId }, + { itemIdentifier: records[1].messageId }, + ], + }); }); }); }); From 0fdc11fb598ace99ec86b74035fe4be5f478a094 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 16:58:26 +0600 Subject: [PATCH 06/16] style: fix typo --- packages/batch/src/BasePartialProcessor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 4ee04d6967..392d1feb42 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -98,7 +98,7 @@ abstract class BasePartialProcessor { public abstract prepare(): void; /** - * Process all records with an asyncronous handler + * Process all records with an asynchronous handler * * Once called, the processor will create an array of promises to process each record * and wait for all of them to settle before returning the results. From e89e94bce8106950614cce7b10d8964ccaadb15a Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 20:51:46 +0600 Subject: [PATCH 07/16] refactor: `processInParallel` option should be unavailable for fifo processor Although it would be better if we could omit this from sync processors too. But at this moment typescript can not differentiate between `BatchProcessor` & `BatchProcessorSync` --- packages/batch/src/BasePartialProcessor.ts | 11 ++++++++--- packages/batch/src/types.ts | 3 +-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 392d1feb42..b387ef4b11 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -126,7 +126,11 @@ abstract class BasePartialProcessor { * Process the records in parallel if the option is set to true. * Otherwise, process the records sequentially. */ - const processedRecords = this.options?.processInParallel + const processInParallel = + this.options?.processInParallel === undefined + ? true + : this.options.processInParallel; + const processedRecords = processInParallel ? await Promise.all( this.records.map((record) => this.processRecord(record)) ) @@ -228,8 +232,9 @@ abstract class BasePartialProcessor { this.records = records; this.handler = handler; - // By default, we process the records in parallel. - this.options = { processInParallel: true, ...options }; + if (options) { + this.options = options; + } return this; } diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 892d318777..b7388bd7ab 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -35,9 +35,8 @@ type BatchProcessingOptions = { * Indicates whether the records should be processed in parallel. * When set to `true`, the records will be processed concurrently using `Promise.all`. * When set to `false`, the records will be processed sequentially. - * @default true */ - processInParallel?: boolean; + processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean; }; /** From ac8f647c9e063acbf894ef7b0c5ea20e664a41bf Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 21:45:55 +0600 Subject: [PATCH 08/16] refactor: make separate function for parallel processing --- packages/batch/src/BasePartialProcessor.ts | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index b387ef4b11..1975f30c0d 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -123,17 +123,12 @@ abstract class BasePartialProcessor { this.prepare(); /** - * Process the records in parallel if the option is set to true. + * If `processInParallel` is not set or false, process the records in parallel. * Otherwise, process the records sequentially. */ - const processInParallel = - this.options?.processInParallel === undefined - ? true - : this.options.processInParallel; + const processInParallel = this.options?.processInParallel ?? true; const processedRecords = processInParallel - ? await Promise.all( - this.records.map((record) => this.processRecord(record)) - ) + ? await this.#processInParallel() : await this.#processSequentially(); this.clean(); @@ -259,7 +254,16 @@ abstract class BasePartialProcessor { } /** - * Processes the records sequentially, ensuring that each record is processed one after the other. + * Processes records in parallel using `Promise.all`. + */ + async #processInParallel(): Promise<(SuccessResponse | FailureResponse)[]> { + return Promise.all( + this.records.map((record) => this.processRecord(record)) + ); + } + + /** + * Processes records sequentially, ensuring that each record is processed one after the other. */ async #processSequentially(): Promise<(SuccessResponse | FailureResponse)[]> { const processedRecords: (SuccessResponse | FailureResponse)[] = []; From f2e495cf846ff02a564b55f5a03e91c41435394c Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Sun, 22 Sep 2024 21:49:39 +0600 Subject: [PATCH 09/16] fix: typo in comment --- packages/batch/src/BasePartialProcessor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 1975f30c0d..8846c73ff5 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -123,7 +123,7 @@ abstract class BasePartialProcessor { this.prepare(); /** - * If `processInParallel` is not set or false, process the records in parallel. + * If `processInParallel` is not provided or true, process the records in parallel. * Otherwise, process the records sequentially. */ const processInParallel = this.options?.processInParallel ?? true; From 7de9796ea361991d22e7332af2e2c90034b813c0 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 25 Sep 2024 09:52:02 +0600 Subject: [PATCH 10/16] doc: update comment for default `processInParallel` --- packages/batch/src/BasePartialProcessor.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 8846c73ff5..a9cd4c91da 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -122,10 +122,7 @@ abstract class BasePartialProcessor { } this.prepare(); - /** - * If `processInParallel` is not provided or true, process the records in parallel. - * Otherwise, process the records sequentially. - */ + // Default to `true` if `processInParallel` is not specified. const processInParallel = this.options?.processInParallel ?? true; const processedRecords = processInParallel ? await this.#processInParallel() From d98c933438edb8fc6f6ca99639a8a198c0dd4e2c Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 25 Sep 2024 09:55:27 +0600 Subject: [PATCH 11/16] doc: fix `processInParallel` option description --- packages/batch/src/types.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index b7388bd7ab..3748c03e95 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -33,7 +33,7 @@ type BatchProcessingOptions = { throwOnFullBatchFailure?: boolean; /** * Indicates whether the records should be processed in parallel. - * When set to `true`, the records will be processed concurrently using `Promise.all`. + * When set to `true`, the records will be processed in parallel using `Promise.all`. * When set to `false`, the records will be processed sequentially. */ processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean; From 48df5e27e894d557bc691d3bc56b325a0843759d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 25 Sep 2024 09:59:30 +0600 Subject: [PATCH 12/16] refactor: function naming for processing records --- packages/batch/src/BasePartialProcessor.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index a9cd4c91da..68bcac467d 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -125,8 +125,8 @@ abstract class BasePartialProcessor { // Default to `true` if `processInParallel` is not specified. const processInParallel = this.options?.processInParallel ?? true; const processedRecords = processInParallel - ? await this.#processInParallel() - : await this.#processSequentially(); + ? await this.#processRecordsInParallel() + : await this.#processRecordsSequentially(); this.clean(); @@ -253,7 +253,9 @@ abstract class BasePartialProcessor { /** * Processes records in parallel using `Promise.all`. */ - async #processInParallel(): Promise<(SuccessResponse | FailureResponse)[]> { + async #processRecordsInParallel(): Promise< + (SuccessResponse | FailureResponse)[] + > { return Promise.all( this.records.map((record) => this.processRecord(record)) ); @@ -262,7 +264,9 @@ abstract class BasePartialProcessor { /** * Processes records sequentially, ensuring that each record is processed one after the other. */ - async #processSequentially(): Promise<(SuccessResponse | FailureResponse)[]> { + async #processRecordsSequentially(): Promise< + (SuccessResponse | FailureResponse)[] + > { const processedRecords: (SuccessResponse | FailureResponse)[] = []; for (const record of this.records) { processedRecords.push(await this.processRecord(record)); From d9a69b60eba16fd7c3508eb3f65e94ec63933664 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 25 Sep 2024 11:03:55 +0600 Subject: [PATCH 13/16] doc: `processInParallel` option for `BatchProcessor` --- docs/utilities/batch.md | 16 +++++++++++++++- .../batch/sequentialAsyncProcessing.ts | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 examples/snippets/batch/sequentialAsyncProcessing.ts diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index e49ab6b98e..af1c49ee0e 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -416,7 +416,10 @@ For such cases we recommend to use the `BatchProcessorSync` and `processPartialR *If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse` * If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync` -The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order. +The difference between the two processors is in how they handle record processing: + +* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order. +* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one. ???+ question "When is this useful?" @@ -477,6 +480,17 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba --8<-- "examples/snippets/batch/extendingFailure.ts" ``` +### Sequential async processing + +By default, the `BatchProcessor` processes records in parallel. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially. + +!!! info "Default Behavior" + If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel using `Promise.all()` + +```typescript hl_lines="8 17" title="Sequential async processing" +--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts" +``` + ### Create your own partial processor You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods. diff --git a/examples/snippets/batch/sequentialAsyncProcessing.ts b/examples/snippets/batch/sequentialAsyncProcessing.ts new file mode 100644 index 0000000000..10ae24262e --- /dev/null +++ b/examples/snippets/batch/sequentialAsyncProcessing.ts @@ -0,0 +1,18 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import type { SQSHandler, SQSRecord } from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); + +const recordHandler = async (_record: SQSRecord): Promise => { + // Process the record +}; + +export const handler: SQSHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + processInParallel: false, + }); From 6d076984682ac087771cce1f196e74fb1249b420 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Wed, 25 Sep 2024 11:09:14 +0600 Subject: [PATCH 14/16] doc: update the wordings and note --- docs/utilities/batch.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index af1c49ee0e..ec0e4ba108 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -482,10 +482,9 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba ### Sequential async processing -By default, the `BatchProcessor` processes records in parallel. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially. +By default, the `BatchProcessor` processes records in parallel using `Promise.all()`. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially. -!!! info "Default Behavior" - If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel using `Promise.all()` +!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel." ```typescript hl_lines="8 17" title="Sequential async processing" --8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts" From 1366013374a365c859247565831941010f408b05 Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Fri, 27 Sep 2024 09:52:35 +0600 Subject: [PATCH 15/16] fix: `sonarlint` issues for nested tests in `BatchProcessor` --- .../batch/tests/unit/BatchProcessor.test.ts | 342 +++++++++--------- 1 file changed, 168 insertions(+), 174 deletions(-) diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 1ecacf7ae9..650354d968 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -47,204 +47,198 @@ describe('Class: AsyncBatchProcessor', () => { }, ]; - describe('SQS Records', () => { - describe.each(cases)('$description', ({ options }) => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); + describe.each(cases)('SQS Records $description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); - it('completes processing with with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], - }); + it('completes processing with with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], }); + }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.SQS); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); - // Act - processor.register(records, asyncSqsRecordHandler, options); + // Act + processor.register(records, asyncSqsRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); - describe('Kinesis Records', () => { - describe.each(cases)('$description', ({ options }) => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); + describe.each(cases)('Kinesis Records $description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], - }); + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], }); + }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); - // Act - processor.register(records, asyncKinesisRecordHandler, options); + // Act + processor.register(records, asyncKinesisRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); - describe('DynamoDB Records', () => { - describe.each(cases)('$description', ({ options }) => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], - ]); - }); + describe.each(cases)('DynamoDB Records $description', ({ options }) => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], - }); + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], }); + }); - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); + it('completes processing with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); - // Act - processor.register(records, asyncDynamodbRecordHandler, options); + // Act + processor.register(records, asyncDynamodbRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); }); From 6dc296cdbe52f57403df1e12e4f77aeb8ce56c5d Mon Sep 17 00:00:00 2001 From: arnabrahman Date: Fri, 27 Sep 2024 11:08:17 +0600 Subject: [PATCH 16/16] fix: `sonarlint` issues for `processPartialResponse` tests --- .../tests/unit/processPartialResponse.test.ts | 213 +++++++----------- 1 file changed, 82 insertions(+), 131 deletions(-) diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 09eb2e7e99..54d0ef6fec 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -36,6 +36,62 @@ describe('Function: processPartialResponse()', () => { context, }; + const handlerWithSqsEvent = async ( + event: SQSEvent, + options: BatchProcessingOptions + ) => { + const processor = new BatchProcessor(EventType.SQS); + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => + processPartialResponse(event, asyncSqsRecordHandler, processor, options); + + return handler(event, context); + }; + + const handlerWithKinesisEvent = async ( + event: KinesisStreamEvent, + options: BatchProcessingOptions + ) => { + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => + processPartialResponse( + event, + asyncKinesisRecordHandler, + processor, + options + ); + + return handler(event, context); + }; + + const handlerWithDynamoDBEvent = async ( + event: DynamoDBStreamEvent, + options: BatchProcessingOptions + ) => { + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncDynamodbRecordHandler, + processor, + options + ); + }; + + return handler(event, context); + }; + beforeEach(() => { vi.clearAllMocks(); process.env = { ...ENVIRONMENT_VARIABLES }; @@ -184,23 +240,10 @@ describe('Function: processPartialResponse()', () => { sqsRecordFactory('success'), sqsRecordFactory('success'), ]; - const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse( - event, - asyncSqsRecordHandler, - processor, - processingOptions - ); - }; - // Act - const result = await handler(event, context); + const result = await handlerWithSqsEvent(event, processingOptions); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); @@ -212,23 +255,10 @@ describe('Function: processPartialResponse()', () => { kinesisRecordFactory('success'), kinesisRecordFactory('success'), ]; - const processor = new BatchProcessor(EventType.KinesisDataStreams); const event: KinesisStreamEvent = { Records: records }; - const handler = async ( - event: KinesisStreamEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncKinesisRecordHandler, - processor, - processingOptions - ); - }; - // Act - const result = await handler(event, context); + const result = await handlerWithKinesisEvent(event, processingOptions); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); @@ -240,47 +270,22 @@ describe('Function: processPartialResponse()', () => { dynamodbRecordFactory('success'), dynamodbRecordFactory('success'), ]; - const processor = new BatchProcessor(EventType.DynamoDBStreams); const event: DynamoDBStreamEvent = { Records: records }; - const handler = async ( - event: DynamoDBStreamEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncDynamodbRecordHandler, - processor, - processingOptions - ); - }; - // Act - const result = await handler(event, context); + const result = await handlerWithDynamoDBEvent(event, processingOptions); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); it('Process partial response through handler for SQS records with incorrect event type', async () => { - // Prepare - const processor = new BatchProcessor(EventType.SQS); - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return await processPartialResponse( - event, - asyncSqsRecordHandler, - processor, - processingOptions - ); - }; - try { // Act - await handler({} as unknown as SQSEvent, context); + await handlerWithSqsEvent( + {} as unknown as SQSEvent, + processingOptions + ); } catch (error) { // Assess assert(error instanceof UnexpectedBatchTypeError); @@ -298,28 +303,13 @@ describe('Function: processPartialResponse()', () => { sqsRecordFactory('success'), sqsRecordFactory('success'), ]; - const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( - event: SQSEvent, - context: Context - ): Promise => { - const options: BatchProcessingOptions = { - context: context, - ...processingOptions, - }; - - return await processPartialResponse( - event, - asyncHandlerWithContext, - processor, - options - ); - }; - // Act - const result = await handler(event, context); + const result = await handlerWithSqsEvent(event, { + context, + ...processingOptions, + }); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); @@ -328,79 +318,40 @@ describe('Function: processPartialResponse()', () => { it('Process partial response through handler for full batch failure', async () => { // Prepare const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse( - event, - asyncSqsRecordHandler, - processor, - processingOptions - ); - }; - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - FullBatchFailureError - ); + await expect( + handlerWithSqsEvent(event, processingOptions) + ).rejects.toThrow(FullBatchFailureError); }); it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => { // Prepare const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse( - event, - asyncSqsRecordHandler, - processor, - { - ...options, - ...processingOptions, - throwOnFullBatchFailure: true, - } - ); - }; - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - FullBatchFailureError - ); + await expect( + handlerWithSqsEvent(event, { + ...options, + ...processingOptions, + throwOnFullBatchFailure: true, + }) + ).rejects.toThrow(FullBatchFailureError); }); it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => { // Prepare const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return processPartialResponse( - event, - asyncSqsRecordHandler, - processor, - { - ...options, - ...processingOptions, - throwOnFullBatchFailure: false, - } - ); - }; - // Act - const response = await handler(event, context); + const response = await handlerWithSqsEvent(event, { + ...options, + ...processingOptions, + throwOnFullBatchFailure: false, + }); // Assess expect(response).toStrictEqual({