diff --git a/docs/snippets/batch/accessProcessedMessages.ts b/docs/snippets/batch/accessProcessedMessages.ts index fe5d727ea0..ea99abeda7 100644 --- a/docs/snippets/batch/accessProcessedMessages.ts +++ b/docs/snippets/batch/accessProcessedMessages.ts @@ -25,7 +25,7 @@ export const handler = async ( const batch = event.Records; // (1)! processor.register(batch, recordHandler, { context }); // (2)! - const processedMessages = processor.process(); + const processedMessages = await processor.process(); for (const message of processedMessages) { const status: 'success' | 'fail' = message[0]; diff --git a/docs/snippets/batch/advancedTracingRecordHandler.ts b/docs/snippets/batch/advancedTracingRecordHandler.ts index 4de43b3198..997abb3bc8 100644 --- a/docs/snippets/batch/advancedTracingRecordHandler.ts +++ b/docs/snippets/batch/advancedTracingRecordHandler.ts @@ -1,7 +1,7 @@ import { BatchProcessor, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Tracer, captureLambdaHandler } from '@aws-lambda-powertools/tracer'; import middy from '@middy/core'; @@ -36,7 +36,7 @@ const recordHandler = (record: SQSRecord): void => { export const handler = middy( async (event: SQSEvent, context: Context): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); } diff --git a/docs/snippets/batch/customPartialProcessor.ts b/docs/snippets/batch/customPartialProcessor.ts index 6ebd113282..771a6f7f3d 100644 --- a/docs/snippets/batch/customPartialProcessor.ts +++ b/docs/snippets/batch/customPartialProcessor.ts @@ -7,7 +7,7 @@ import { marshall } from '@aws-sdk/util-dynamodb'; import { EventType, BasePartialBatchProcessor, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import type { SuccessResponse, @@ -27,12 +27,6 @@ class MyPartialProcessor extends BasePartialBatchProcessor { this.#tableName = tableName; } - public async asyncProcessRecord( - _record: BaseRecord - ): Promise { - throw new Error('Not implemented'); - } - /** * It's called once, **after** processing the batch. * @@ -64,13 +58,21 @@ class MyPartialProcessor extends BasePartialBatchProcessor { this.successMessages = []; } + public async processRecord( + _record: BaseRecord + ): Promise { + throw new Error('Not implemented'); + } + /** * It handles how your record is processed. * * Here we are keeping the status of each run, `this.handler` is * the function that is passed when calling `processor.register()`. */ - public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { + public processRecordSync( + record: BaseRecord + ): SuccessResponse | FailureResponse { try { const result = this.handler(record); @@ -91,7 +93,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; diff --git a/docs/snippets/batch/extendingFailure.ts b/docs/snippets/batch/extendingFailure.ts index ab1ef530f9..399496d979 100644 --- a/docs/snippets/batch/extendingFailure.ts +++ b/docs/snippets/batch/extendingFailure.ts @@ -4,7 +4,7 @@ import { EventType, FailureResponse, EventSourceType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -47,7 +47,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; diff --git a/docs/snippets/batch/gettingStartedAsync.ts b/docs/snippets/batch/gettingStartedAsync.ts index 0080752026..1d41e06624 100644 --- a/docs/snippets/batch/gettingStartedAsync.ts +++ b/docs/snippets/batch/gettingStartedAsync.ts @@ -1,7 +1,7 @@ import { - AsyncBatchProcessor, + BatchProcessor, EventType, - asyncProcessPartialResponse, + processPartialResponse, } from '@aws-lambda-powertools/batch'; import axios from 'axios'; // axios is an external dependency import type { @@ -11,7 +11,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new AsyncBatchProcessor(EventType.SQS); +const processor = new BatchProcessor(EventType.SQS); const recordHandler = async (record: SQSRecord): Promise => { const res = await axios.post('https://httpbin.org/anything', { @@ -25,7 +25,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return await asyncProcessPartialResponse(event, recordHandler, processor, { + return await processPartialResponse(event, recordHandler, processor, { context, }); }; diff --git a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts index 3304d31b95..41a11d4869 100644 --- a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts +++ b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts @@ -1,7 +1,7 @@ import { BatchProcessor, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -29,7 +29,7 @@ export const handler = async ( event: DynamoDBStreamEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; diff --git a/docs/snippets/batch/gettingStartedErrorHandling.ts b/docs/snippets/batch/gettingStartedErrorHandling.ts index 08efa51732..df59d84aff 100644 --- a/docs/snippets/batch/gettingStartedErrorHandling.ts +++ b/docs/snippets/batch/gettingStartedErrorHandling.ts @@ -1,7 +1,7 @@ import { BatchProcessor, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -37,7 +37,7 @@ export const handler = async ( context: Context ): Promise => { // prettier-ignore - return processPartialResponse(event, recordHandler, processor, { // (2)! + return processPartialResponseSync(event, recordHandler, processor, { // (2)! context, }); }; diff --git a/docs/snippets/batch/gettingStartedKinesis.ts b/docs/snippets/batch/gettingStartedKinesis.ts index 2def6b2e55..8f3e887adc 100644 --- a/docs/snippets/batch/gettingStartedKinesis.ts +++ b/docs/snippets/batch/gettingStartedKinesis.ts @@ -1,7 +1,7 @@ import { BatchProcessor, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -24,7 +24,7 @@ export const handler = async ( event: KinesisStreamEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; diff --git a/docs/snippets/batch/gettingStartedSQS.ts b/docs/snippets/batch/gettingStartedSQS.ts index 9878b865e9..a2d4707ee6 100644 --- a/docs/snippets/batch/gettingStartedSQS.ts +++ b/docs/snippets/batch/gettingStartedSQS.ts @@ -1,7 +1,7 @@ import { BatchProcessor, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -28,7 +28,7 @@ export const handler = async ( context: Context ): Promise => { // prettier-ignore - return processPartialResponse(event, recordHandler, processor, { // (3)! + return processPartialResponseSync(event, recordHandler, processor, { // (3)! context, }); }; diff --git a/docs/snippets/batch/gettingStartedSQSFifo.ts b/docs/snippets/batch/gettingStartedSQSFifo.ts index 5c63c1e0d7..3b7e03b3a6 100644 --- a/docs/snippets/batch/gettingStartedSQSFifo.ts +++ b/docs/snippets/batch/gettingStartedSQSFifo.ts @@ -1,6 +1,6 @@ import { SqsFifoPartialProcessor, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -25,7 +25,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ac426546e5..dcf311241b 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -13,23 +13,23 @@ stateDiagram-v2 direction LR BatchSource: Amazon SQS

Amazon Kinesis Data Streams

Amazon DynamoDB Streams

LambdaInit: Lambda invocation - BatchProcessor: Batch Processor + BatchProcessorSync: Batch Processor RecordHandler: Record Handler function YourLogic: Your logic to process each batch item LambdaResponse: Lambda response BatchSource --> LambdaInit - LambdaInit --> BatchProcessor - BatchProcessor --> RecordHandler + LambdaInit --> BatchProcessorSync + BatchProcessorSync --> RecordHandler - state BatchProcessor { + state BatchProcessorSync { [*] --> RecordHandler: Your function RecordHandler --> YourLogic } - RecordHandler --> BatchProcessor: Collect results - BatchProcessor --> LambdaResponse: Report items that failed processing + RecordHandler --> BatchProcessorSync: Collect results + BatchProcessorSync --> LambdaResponse: Report items that failed processing ``` ## Key features @@ -99,9 +99,9 @@ The remaining sections of the documentation will rely on these samples. For comp Processing batches from SQS works in three stages: -1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type +1. Instantiate **`BatchProcessorSync`** and choose **`EventType.SQS`** for the event type 2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion -3. Use **`processPartialResponse`** to kick off processing +3. Use **`processPartialResponseSync`** to kick off processing ???+ info This code example optionally uses Logger for completion. @@ -149,9 +149,9 @@ This helps preserve the ordering of messages in your queue. Processing batches from Kinesis works in three stages: -1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type +1. Instantiate **`BatchProcessorSync`** and choose **`EventType.KinesisDataStreams`** for the event type 2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion -3. Use **`processPartialResponse`** to kick off processing +3. Use **`processPartialResponseSync`** to kick off processing ???+ info This code example optionally uses Logger for completion. @@ -182,9 +182,9 @@ Processing batches from Kinesis works in three stages: Processing batches from DynamoDB Streams works in three stages: -1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type +1. Instantiate **`BatchProcessorSync`** and choose **`EventType.DynamoDBStreams`** for the event type 2. Define your function to handle each batch record, and use the `DynamoDBRecord` type annotation for autocompletion -3. Use **`processPartialResponse`** to kick off processing +3. Use **`processPartialResponseSync`** to kick off processing ???+ info This code example optionally uses Logger for completion. @@ -225,7 +225,7 @@ By default, we catch any exception raised by your record handler function. This --8<-- ``` - 1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor) + 1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor) 2. Exceptions raised in `record_handler` will propagate to `process_partial_response`.

We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab). @@ -249,7 +249,7 @@ The following sequence diagrams explain how each Batch processor behaves under d > Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. -Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues. +Sequence diagram to explain how [`BatchProcessorSync` works](#processing-messages-from-sqs) with SQS Standard queues.
```mermaid @@ -302,7 +302,7 @@ sequenceDiagram > Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}. -Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb). +Sequence diagram to explain how `BatchProcessorSync` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb). For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"} @@ -358,7 +358,7 @@ sequenceDiagram ### Processing messages asynchronously -You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently. +You can use `BatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently. ???+ question "When is this useful?" Your use case might be able to process multiple records at the same time without conflicting with one another. @@ -367,7 +367,7 @@ You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` functi The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). -```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor" +```typescript hl_lines="1-5 14 28-30" title="High-concurrency with BatchProcessor" --8<-- "docs/snippets/batch/gettingStartedAsync.ts" ``` @@ -375,7 +375,7 @@ You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` functi ### Accessing processed messages -Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function. +Use the `BatchProcessorSync` directly in your function to access a list of all returned values from your `recordHandler` function. * **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record @@ -384,24 +384,24 @@ Use the `BatchProcessor` directly in your function to access a list of all retur --8<-- "docs/snippets/batch/accessProcessedMessages.ts" ``` -1. The processor requires the records array. This is typically handled by `processPartialResponse`. +1. The processor requires the records array. This is typically handled by `processPartialResponseSync`. 2. You need to register the `batch`, the `recordHandler` function, and optionally the `context` to access the Lambda context. ### Accessing Lambda Context Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out. -We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function. +We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function. ```typescript hl_lines="17 35" --8<-- "docs/snippets/batch/accessLambdaContext.ts" ``` -### Extending BatchProcessor +### Extending BatchProcessorSync -You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. +You might want to bring custom logic to the existing `BatchProcessorSync` to slightly override how we handle successes and failures. -For these scenarios, you can subclass `BatchProcessor` and quickly override `successHandler` and `failureHandler` methods: +For these scenarios, you can subclass `BatchProcessorSync` and quickly override `successHandler` and `failureHandler` methods: * **`successHandler()`** – Keeps track of successful batch records * **`failureHandler()`** – Keeps track of failed batch records @@ -409,7 +409,7 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc ???+ example Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing -```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor" +```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessorSync" --8<-- "docs/snippets/batch/extendingFailure.ts" ``` @@ -446,7 +446,7 @@ classDiagram * **`clean()`** – teardown logic called once after `processRecord` completes * **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic -You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function. +You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function. ```typescript hl_lines="21 30 41 62 73 84" title="Creating a custom batch processor" --8<-- "docs/snippets/batch/customPartialProcessor.ts" @@ -466,7 +466,7 @@ You can use Tracer to create subsegments for each batch record processed. To do ## Testing your code -As there is no external calls, you can unit test your code with `BatchProcessor` quite easily. +As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily. **Example**: diff --git a/packages/batch/README.md b/packages/batch/README.md index 4883d430f2..d6b26d2510 100644 --- a/packages/batch/README.md +++ b/packages/batch/README.md @@ -60,9 +60,9 @@ When using SQS as a Lambda event source, you can specify the `EventType.SQS` to ```ts import { - BatchProcessor, + BatchProcessorSync, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -72,7 +72,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.SQS); +const processor = new BatchProcessorSync(EventType.SQS); const logger = new Logger(); const recordHandler = (record: SQSRecord): void => { @@ -87,7 +87,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; @@ -100,9 +100,9 @@ When using Kinesis Data Streams as a Lambda event source, you can specify the `E ```ts import { - BatchProcessor, + BatchProcessorSync, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -112,7 +112,7 @@ import type { KinesisStreamBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.KinesisDataStreams); +const processor = new BatchProcessorSync(EventType.KinesisDataStreams); const logger = new Logger(); const recordHandler = (record: KinesisStreamRecord): void => { @@ -125,7 +125,7 @@ export const handler = async ( event: KinesisStreamEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; @@ -133,13 +133,13 @@ export const handler = async ( ### DynamoDB Streams Processor -When using DynamoDB Streams as a Lambda event source, you can use the `BatchProcessor` with the `EventType.DynamoDBStreams` to process the records. The response will be a `DynamoDBBatchResponse` which contains a list of items that failed to be processed. +When using DynamoDB Streams as a Lambda event source, you can use the `BatchProcessorSync` with the `EventType.DynamoDBStreams` to process the records. The response will be a `DynamoDBBatchResponse` which contains a list of items that failed to be processed. ```ts import { - BatchProcessor, + BatchProcessorSync, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -149,7 +149,7 @@ import type { DynamoDBBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.DynamoDBStreams); +const processor = new BatchProcessorSync(EventType.DynamoDBStreams); const logger = new Logger(); const recordHandler = (record: DynamoDBRecord): void => { @@ -167,7 +167,7 @@ export const handler = async ( event: DynamoDBStreamEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; @@ -175,11 +175,11 @@ export const handler = async ( ### Async processing -If your use case allows you to process multiple records at the same time without conflicting with each other, you can use the `AsyncBatchProcessor` to process records asynchronously. This will create an array of promises that will be resolved once all records have been processed. +If your use case allows you to process multiple records at the same time without conflicting with each other, you can use the `BatchProcessor` to process records asynchronously. This will create an array of promises that will be resolved once all records have been processed. ```ts import { - AsyncBatchProcessor, + BatchProcessor, EventType, asyncProcessPartialResponse, } from '@aws-lambda-powertools/batch'; @@ -191,7 +191,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new AsyncBatchProcessor(EventType.SQS); +const processor = new BatchProcessor(EventType.SQS); const recordHandler = async (record: SQSRecord): Promise => { const res = await axios.post('https://httpbin.org/anything', { diff --git a/packages/batch/src/AsyncBatchProcessor.ts b/packages/batch/src/AsyncBatchProcessor.ts deleted file mode 100644 index 10c404a323..0000000000 --- a/packages/batch/src/AsyncBatchProcessor.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { BatchProcessingError } from './errors'; -import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; - -/** - * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB - */ -class AsyncBatchProcessor extends BasePartialBatchProcessor { - public async asyncProcessRecord( - record: BaseRecord - ): Promise { - try { - const data = this.toBatchType(record, this.eventType); - const result = await this.handler(data, this.options?.context); - - return this.successHandler(record, result); - } catch (error) { - return this.failureHandler(record, error as Error); - } - } - - /** - * Process a record with instance's handler - * @param record Batch record to be processed - * @returns response of success or failure - */ - public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse { - throw new BatchProcessingError( - 'Not implemented. Use asyncProcess() instead.' - ); - } -} - -export { AsyncBatchProcessor }; diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts deleted file mode 100644 index 3cf4b30309..0000000000 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ /dev/null @@ -1,152 +0,0 @@ -import type { - DynamoDBRecord, - KinesisStreamRecord, - SQSRecord, -} from 'aws-lambda'; -import { BasePartialProcessor } from './BasePartialProcessor'; -import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants'; -import { FullBatchFailureError } from './errors'; -import type { - EventSourceDataClassTypes, - PartialItemFailureResponse, - PartialItemFailures, -} from './types'; - -/** - * Process batch and partially report failed items - */ -abstract class BasePartialBatchProcessor extends BasePartialProcessor { - public COLLECTOR_MAPPING; - - public batchResponse: PartialItemFailureResponse; - - public eventType: keyof typeof EventType; - - /** - * Initializes base batch processing class - * @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event - */ - public constructor(eventType: keyof typeof EventType) { - super(); - this.eventType = eventType; - this.batchResponse = DEFAULT_RESPONSE; - this.COLLECTOR_MAPPING = { - [EventType.SQS]: () => this.collectSqsFailures(), - [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), - [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), - }; - } - - /** - * Report messages to be deleted in case of partial failures - */ - public clean(): void { - if (!this.hasMessagesToReport()) { - return; - } - - if (this.entireBatchFailed()) { - throw new FullBatchFailureError(this.errors); - } - - const messages: PartialItemFailures[] = this.getMessagesToReport(); - this.batchResponse = { batchItemFailures: messages }; - } - - /** - * Collects identifiers of failed items for a DynamoDB stream - * @returns list of identifiers for failed items - */ - public collectDynamoDBFailures(): PartialItemFailures[] { - const failures: PartialItemFailures[] = []; - - for (const msg of this.failureMessages) { - const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; - if (msgId) { - failures.push({ itemIdentifier: msgId }); - } - } - - return failures; - } - - /** - * Collects identifiers of failed items for a Kinesis stream - * @returns list of identifiers for failed items - */ - public collectKinesisFailures(): PartialItemFailures[] { - const failures: PartialItemFailures[] = []; - - for (const msg of this.failureMessages) { - const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; - failures.push({ itemIdentifier: msgId }); - } - - return failures; - } - - /** - * Collects identifiers of failed items for an SQS batch - * @returns list of identifiers for failed items - */ - public collectSqsFailures(): PartialItemFailures[] { - const failures: PartialItemFailures[] = []; - - for (const msg of this.failureMessages) { - const msgId = (msg as SQSRecord).messageId; - failures.push({ itemIdentifier: msgId }); - } - - return failures; - } - - /** - * Determines whether all records in a batch failed to process - * @returns true if all records resulted in exception results - */ - public entireBatchFailed(): boolean { - return this.errors.length == this.records.length; - } - - /** - * Collects identifiers for failed batch items - * @returns formatted messages to use in batch deletion - */ - public getMessagesToReport(): PartialItemFailures[] { - return this.COLLECTOR_MAPPING[this.eventType](); - } - - /** - * Determines if any records failed to process - * @returns true if any records resulted in exception - */ - public hasMessagesToReport(): boolean { - return this.failureMessages.length != 0; - } - - /** - * Remove results from previous execution - */ - public prepare(): void { - this.successMessages.length = 0; - this.failureMessages.length = 0; - this.errors.length = 0; - this.batchResponse = DEFAULT_RESPONSE; - } - - /** - * @returns Batch items that failed processing, if any - */ - public response(): PartialItemFailureResponse { - return this.batchResponse; - } - - public toBatchType( - record: EventSourceDataClassTypes, - eventType: keyof typeof EventType - ): SQSRecord | KinesisStreamRecord | DynamoDBRecord { - return DATA_CLASS_MAPPING[eventType](record); - } -} - -export { BasePartialBatchProcessor }; diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index a51400d4a5..0e697e7567 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -26,7 +26,7 @@ abstract class BasePartialProcessor { /** * Initializes base processor class */ - public constructor() { + protected constructor() { this.successMessages = []; this.failureMessages = []; this.errors = []; @@ -34,40 +34,6 @@ abstract class BasePartialProcessor { this.handler = new Function(); } - /** - * Call instance's handler for each record - * @returns List of processed records - */ - public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> { - /** - * If this is an sync processor, user should have called process instead, - * so we call the method early to throw the error early thus failing fast. - */ - if (this.constructor.name === 'BatchProcessor') { - await this.asyncProcessRecord(this.records[0]); - } - this.prepare(); - - const processingPromises: Promise[] = - this.records.map((record) => this.asyncProcessRecord(record)); - - const processedRecords: (SuccessResponse | FailureResponse)[] = - await Promise.all(processingPromises); - - this.clean(); - - return processedRecords; - } - - /** - * Process a record with an asyncronous handler - * - * @param record Record to be processed - */ - public abstract asyncProcessRecord( - record: BaseRecord - ): Promise; - /** * Clean class instance after processing */ @@ -99,20 +65,14 @@ abstract class BasePartialProcessor { * Call instance's handler for each record * @returns List of processed records */ - public process(): (SuccessResponse | FailureResponse)[] { - /** - * If this is an async processor, user should have called processAsync instead, - * so we call the method early to throw the error early thus failing fast. - */ - if (this.constructor.name === 'AsyncBatchProcessor') { - this.processRecord(this.records[0]); - } + public async process(): Promise<(SuccessResponse | FailureResponse)[]> { this.prepare(); - const processedRecords: (SuccessResponse | FailureResponse)[] = []; - for (const record of this.records) { - processedRecords.push(this.processRecord(record)); - } + const processingPromises: Promise[] = + this.records.map((record) => this.processRecord(record)); + + const processedRecords: (SuccessResponse | FailureResponse)[] = + await Promise.all(processingPromises); this.clean(); @@ -120,17 +80,44 @@ abstract class BasePartialProcessor { } /** - * Process a record with the handler + * Process a record with an asyncronous handler + * * @param record Record to be processed */ public abstract processRecord( record: BaseRecord + ): Promise; + + /** + * Process a record with the handler + * @param record Record to be processed + */ + public abstract processRecordSync( + record: BaseRecord ): SuccessResponse | FailureResponse; + /** + * Call instance's handler for each record + * @returns List of processed records + */ + public processSync(): (SuccessResponse | FailureResponse)[] { + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + for (const record of this.records) { + processedRecords.push(this.processRecordSync(record)); + } + + this.clean(); + + return processedRecords; + } + /** * Set class instance attributes before execution * @param records List of records to be processed * @param handler CallableFunction to process entries of "records" + * @param options Options to be used during processing * @returns this object */ public register( diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 730b3e94ef..467085aa5a 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,15 +1,147 @@ -import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { BatchProcessingError } from './errors'; import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; +import { + EventSourceDataClassTypes, + PartialItemFailureResponse, + PartialItemFailures, +} from './types'; +import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants'; +import { BasePartialProcessor } from './BasePartialProcessor'; +import { FullBatchFailureError } from './errors'; +import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; /** * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB */ -class BatchProcessor extends BasePartialBatchProcessor { - public async asyncProcessRecord( - _record: BaseRecord +class BatchProcessor extends BasePartialProcessor { + public COLLECTOR_MAPPING; + + public batchResponse: PartialItemFailureResponse; + + public eventType: keyof typeof EventType; + + /** + * Initializes base batch processing class + * @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event + */ + public constructor(eventType: keyof typeof EventType) { + super(); + this.eventType = eventType; + this.batchResponse = DEFAULT_RESPONSE; + this.COLLECTOR_MAPPING = { + [EventType.SQS]: () => this.collectSqsFailures(), + [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), + [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), + }; + } + + /** + * Report messages to be deleted in case of partial failures + */ + public clean(): void { + if (!this.hasMessagesToReport()) { + return; + } + + if (this.entireBatchFailed()) { + throw new FullBatchFailureError(this.errors); + } + + const messages: PartialItemFailures[] = this.getMessagesToReport(); + this.batchResponse = { batchItemFailures: messages }; + } + + /** + * Collects identifiers of failed items for a DynamoDB stream + * @returns list of identifiers for failed items + */ + public collectDynamoDBFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; + + for (const msg of this.failureMessages) { + const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; + if (msgId) { + failures.push({ itemIdentifier: msgId }); + } + } + + return failures; + } + + /** + * Collects identifiers of failed items for a Kinesis stream + * @returns list of identifiers for failed items + */ + public collectKinesisFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; + + for (const msg of this.failureMessages) { + const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; + failures.push({ itemIdentifier: msgId }); + } + + return failures; + } + + /** + * Collects identifiers of failed items for an SQS batch + * @returns list of identifiers for failed items + */ + public collectSqsFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; + + for (const msg of this.failureMessages) { + const msgId = (msg as SQSRecord).messageId; + failures.push({ itemIdentifier: msgId }); + } + + return failures; + } + + /** + * Determines whether all records in a batch failed to process + * @returns true if all records resulted in exception results + */ + public entireBatchFailed(): boolean { + return this.errors.length == this.records.length; + } + + /** + * Collects identifiers for failed batch items + * @returns formatted messages to use in batch deletion + */ + public getMessagesToReport(): PartialItemFailures[] { + return this.COLLECTOR_MAPPING[this.eventType](); + } + + /** + * Determines if any records failed to process + * @returns true if any records resulted in exception + */ + public hasMessagesToReport(): boolean { + return this.failureMessages.length != 0; + } + + /** + * Remove results from previous execution + */ + public prepare(): void { + this.successMessages.length = 0; + this.failureMessages.length = 0; + this.errors.length = 0; + this.batchResponse = DEFAULT_RESPONSE; + } + + public async processRecord( + record: BaseRecord ): Promise { - throw new BatchProcessingError('Not implemented. Use process() instead.'); + try { + const data = this.toBatchType(record, this.eventType); + const result = await this.handler(data, this.options?.context); + + return this.successHandler(record, result); + } catch (error) { + return this.failureHandler(record, error as Error); + } } /** @@ -17,7 +149,9 @@ class BatchProcessor extends BasePartialBatchProcessor { * @param record Batch record to be processed * @returns response of success or failure */ - public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { + public processRecordSync( + record: BaseRecord + ): SuccessResponse | FailureResponse { try { const data = this.toBatchType(record, this.eventType); const result = this.handler(data, this.options?.context); @@ -27,6 +161,20 @@ class BatchProcessor extends BasePartialBatchProcessor { return this.failureHandler(record, error as Error); } } + + /** + * @returns Batch items that failed processing, if any + */ + public response(): PartialItemFailureResponse { + return this.batchResponse; + } + + public toBatchType( + record: EventSourceDataClassTypes, + eventType: keyof typeof EventType + ): SQSRecord | KinesisStreamRecord | DynamoDBRecord { + return DATA_CLASS_MAPPING[eventType](record); + } } export { BatchProcessor }; diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index ebdd4d73e8..9709062197 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,7 +1,7 @@ -import { BatchProcessor } from './BatchProcessor'; import { EventType } from './constants'; import { SqsFifoShortCircuitError } from './errors'; import type { FailureResponse, SuccessResponse } from './types'; +import { BatchProcessor } from './BatchProcessor'; /** * Process native partial responses from SQS FIFO queues @@ -18,7 +18,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { * When the first failed message is detected, the process is short-circuited * And the remaining messages are reported as failed items */ - public process(): (SuccessResponse | FailureResponse)[] { + public processSync(): (SuccessResponse | FailureResponse)[] { this.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; @@ -30,7 +30,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { return this.shortCircuitProcessing(currentIndex, processedRecords); } - processedRecords.push(this.processRecord(record)); + processedRecords.push(this.processRecordSync(record)); currentIndex++; } @@ -42,7 +42,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { /** * Starting from the first failure index, fail all remaining messages and append them to the result list * @param firstFailureIndex Index of first message that failed - * @param result List of success and failure responses with remaining messages failed + * @param processedRecords List of success and failure responses with remaining messages failed */ public shortCircuitProcessing( firstFailureIndex: number, diff --git a/packages/batch/src/asyncProcessPartialResponse.ts b/packages/batch/src/asyncProcessPartialResponse.ts deleted file mode 100644 index 5a33b5b534..0000000000 --- a/packages/batch/src/asyncProcessPartialResponse.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { UnexpectedBatchTypeError } from './errors'; -import type { - BaseRecord, - BatchProcessingOptions, - PartialItemFailureResponse, -} from './types'; - -/** - * Higher level function to handle batch event processing - * @param event Lambda's original event - * @param recordHandler Callable function to process each record from the batch - * @param processor Batch processor to handle partial failure cases - * @returns Lambda Partial Batch Response - */ -const asyncProcessPartialResponse = async ( - event: { Records: BaseRecord[] }, - recordHandler: CallableFunction, - processor: BasePartialBatchProcessor, - options?: BatchProcessingOptions -): Promise => { - if (!event.Records || !Array.isArray(event.Records)) { - throw new UnexpectedBatchTypeError(); - } - - processor.register(event.Records, recordHandler, options); - - await processor.asyncProcess(); - - return processor.response(); -}; - -export { asyncProcessPartialResponse }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 96f931823d..0b80180586 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -2,9 +2,6 @@ export * from './constants'; export * from './errors'; export * from './types'; export * from './BasePartialProcessor'; -export * from './BasePartialBatchProcessor'; export * from './BatchProcessor'; -export * from './AsyncBatchProcessor'; export * from './processPartialResponse'; -export * from './asyncProcessPartialResponse'; export * from './SqsFifoPartialProcessor'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index 2385f28666..bf8458d761 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -1,22 +1,48 @@ -import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; import { UnexpectedBatchTypeError } from './errors'; import type { BaseRecord, BatchProcessingOptions, PartialItemFailureResponse, } from './types'; +import { BatchProcessor } from './BatchProcessor'; /** * Higher level function to handle batch event processing * @param event Lambda's original event * @param recordHandler Callable function to process each record from the batch * @param processor Batch processor to handle partial failure cases + * @param options Batch processing options * @returns Lambda Partial Batch Response */ -const processPartialResponse = ( +const processPartialResponse = async ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, - processor: BasePartialBatchProcessor, + processor: BatchProcessor, + options?: BatchProcessingOptions +): Promise => { + if (!event.Records || !Array.isArray(event.Records)) { + throw new UnexpectedBatchTypeError(); + } + + processor.register(event.Records, recordHandler, options); + + await processor.process(); + + return processor.response(); +}; + +/** + * Higher level function to handle batch event processing + * @param event Lambda's original event + * @param recordHandler Callable function to process each record from the batch + * @param processor Batch processor to handle partial failure cases + * @param options Batch processing options + * @returns Lambda Partial Batch Response + */ +const processPartialResponseSync = ( + event: { Records: BaseRecord[] }, + recordHandler: CallableFunction, + processor: BatchProcessor, options?: BatchProcessingOptions ): PartialItemFailureResponse => { if (!event.Records || !Array.isArray(event.Records)) { @@ -25,9 +51,9 @@ const processPartialResponse = ( processor.register(event.Records, recordHandler, options); - processor.process(); + processor.processSync(); return processor.response(); }; -export { processPartialResponse }; +export { processPartialResponse, processPartialResponseSync }; diff --git a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts deleted file mode 100644 index 127bae2705..0000000000 --- a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Test AsyncBatchProcessor class - * - * @group unit/batch/class/asyncBatchProcessor - */ -import type { Context } from 'aws-lambda'; -import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; -import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor'; -import { EventType } from '../../src/constants'; -import { BatchProcessingError, FullBatchFailureError } from '../../src/errors'; -import type { BatchProcessingOptions } from '../../src/types'; -import { - dynamodbRecordFactory, - kinesisRecordFactory, - sqsRecordFactory, -} from '../helpers/factories'; -import { - asyncDynamodbRecordHandler, - asyncKinesisRecordHandler, - asyncSqsRecordHandler, - asyncHandlerWithContext, -} from '../helpers/handlers'; - -describe('Class: AsyncBatchProcessor', () => { - const ENVIRONMENT_VARIABLES = process.env; - const options: BatchProcessingOptions = { context: dummyContext }; - - beforeEach(() => { - jest.clearAllMocks(); - jest.resetModules(); - process.env = { ...ENVIRONMENT_VARIABLES }; - }); - - afterAll(() => { - process.env = ENVIRONMENT_VARIABLES; - }); - - describe('Asynchronously processing SQS Records', () => { - test('Batch processing SQS records with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - test('Batch processing SQS records with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], - }); - }); - - test('Batch processing SQS records with all failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler); - - // Assess - await expect(processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - describe('Asynchronously processing Kinesis Records', () => { - test('Batch processing Kinesis records with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); - - test('Batch processing Kinesis records with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], - }); - }); - - test('Batch processing Kinesis records with all failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); - - // Act - processor.register(records, asyncKinesisRecordHandler); - - // Assess - await expect(processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - describe('Asynchronously processing DynamoDB Records', () => { - test('Batch processing DynamoDB records with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], - ]); - }); - - test('Batch processing DynamoDB records with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], - }); - }); - - test('Batch processing DynamoDB records with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); - - // Act - processor.register(records, asyncDynamodbRecordHandler); - - // Assess - await expect(processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - describe('Batch processing with Lambda context', () => { - test('Batch processing when context is provided and handler accepts', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncHandlerWithContext, options); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - test('Batch processing when context is provided and handler does not accept', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - processor.register(records, asyncSqsRecordHandler, options); - const processedMessages = await processor.asyncProcess(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - test('Batch processing when malformed context is provided and handler attempts to use', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); - const badContext = { foo: 'bar' }; - const badOptions = { context: badContext as unknown as Context }; - - // Act - processor.register(records, asyncHandlerWithContext, badOptions); - await expect(() => processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - test('When calling the sync process method, it should throw an error', () => { - // Prepare - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act & Assess - expect(() => processor.process()).toThrowError(BatchProcessingError); - }); -}); diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 0f928b4bc7..1bf60e97d5 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -5,9 +5,8 @@ */ import type { Context } from 'aws-lambda'; import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; -import { BatchProcessor } from '../../src/BatchProcessor'; import { EventType } from '../../src/constants'; -import { BatchProcessingError, FullBatchFailureError } from '../../src/errors'; +import { FullBatchFailureError } from '../../src/errors'; import type { BatchProcessingOptions } from '../../src/types'; import { dynamodbRecordFactory, @@ -15,11 +14,16 @@ import { sqsRecordFactory, } from '../helpers/factories'; import { + asyncDynamodbRecordHandler, + asyncHandlerWithContext, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, dynamodbRecordHandler, handlerWithContext, kinesisRecordHandler, sqsRecordHandler, } from '../helpers/handlers'; +import { BatchProcessor } from '../../src/BatchProcessor'; describe('Class: BatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -45,7 +49,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, sqsRecordHandler); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -64,7 +68,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, sqsRecordHandler); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -92,7 +96,7 @@ describe('Class: BatchProcessor', () => { // Act & Assess processor.register(records, sqsRecordHandler); - expect(() => processor.process()).toThrowError(FullBatchFailureError); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); @@ -106,7 +110,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, kinesisRecordHandler); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -125,7 +129,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, kinesisRecordHandler); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -154,7 +158,7 @@ describe('Class: BatchProcessor', () => { processor.register(records, kinesisRecordHandler); // Assess - expect(() => processor.process()).toThrowError(FullBatchFailureError); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); @@ -168,7 +172,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, dynamodbRecordHandler); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -187,7 +191,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, dynamodbRecordHandler); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -217,7 +221,7 @@ describe('Class: BatchProcessor', () => { processor.register(records, dynamodbRecordHandler); // Assess - expect(() => processor.process()).toThrowError(FullBatchFailureError); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); @@ -231,7 +235,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, handlerWithContext, options); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -249,7 +253,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, sqsRecordHandler, options); - const processedMessages = processor.process(); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -269,17 +273,256 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, handlerWithContext, badOptions); - expect(() => processor.process()).toThrowError(FullBatchFailureError); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); - test('When calling the async process method, it should throw an error', async () => { - // Prepare - const processor = new BatchProcessor(EventType.SQS); + describe('Asynchronously processing SQS Records', () => { + test('Batch processing SQS records with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing SQS records with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], + }); + }); + + test('Batch processing SQS records with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + + describe('Asynchronously processing Kinesis Records', () => { + test('Batch processing Kinesis records with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); + + test('Batch processing Kinesis records with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], + }); + }); + + test('Batch processing Kinesis records with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); - // Act & Assess - await expect(() => processor.asyncProcess()).rejects.toThrowError( - BatchProcessingError - ); + // Act + processor.register(records, asyncKinesisRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + + describe('Asynchronously processing DynamoDB Records', () => { + test('Batch processing DynamoDB records with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); + + test('Batch processing DynamoDB records with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], + }); + }); + + test('Batch processing DynamoDB records with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + + describe('Batch processing with Lambda context', () => { + test('Batch processing when context is provided and handler accepts', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncHandlerWithContext, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when context is provided and handler does not accept', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when malformed context is provided and handler attempts to use', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + const badContext = { foo: 'bar' }; + const badOptions = { context: badContext as unknown as Context }; + + // Act + processor.register(records, asyncHandlerWithContext, badOptions); + await expect(() => processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); }); }); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 127c87a0a2..5c7f6c5796 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -5,7 +5,7 @@ */ import { SqsFifoPartialProcessor, - processPartialResponse, + processPartialResponseSync, SqsFifoShortCircuitError, } from '../../src'; import { sqsRecordFactory } from '../helpers/factories'; @@ -33,7 +33,11 @@ describe('Class: SqsFifoBatchProcessor', () => { const processor = new SqsFifoPartialProcessor(); // Act - const result = processPartialResponse(event, sqsRecordHandler, processor); + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor + ); // Assess expect(result['batchItemFailures']).toStrictEqual([]); @@ -48,7 +52,11 @@ describe('Class: SqsFifoBatchProcessor', () => { const processor = new SqsFifoPartialProcessor(); // Act - const result = processPartialResponse(event, sqsRecordHandler, processor); + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor + ); // Assess expect(result['batchItemFailures'].length).toBe(2); diff --git a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts deleted file mode 100644 index c7e98edca4..0000000000 --- a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Test asyncProcessPartialResponse function - * - * @group unit/batch/function/asyncProcesspartialresponse - */ -import type { - Context, - DynamoDBStreamEvent, - KinesisStreamEvent, - SQSEvent, -} from 'aws-lambda'; -import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; -import { Custom as dummyEvent } from '@aws-lambda-powertools/commons/lib/samples/resources/events'; -import { AsyncBatchProcessor, asyncProcessPartialResponse } from '../../src'; -import { EventType } from '../../src/constants'; -import type { - BatchProcessingOptions, - PartialItemFailureResponse, -} from '../../src/types'; -import { - dynamodbRecordFactory, - kinesisRecordFactory, - sqsRecordFactory, -} from '../helpers/factories'; -import { - asyncDynamodbRecordHandler, - asyncHandlerWithContext, - asyncKinesisRecordHandler, - asyncSqsRecordHandler, -} from '../helpers/handlers'; - -describe('Function: processPartialResponse()', () => { - const ENVIRONMENT_VARIABLES = process.env; - const context = dummyContext; - const options: BatchProcessingOptions = { context: dummyContext }; - - beforeEach(() => { - jest.clearAllMocks(); - jest.resetModules(); - process.env = { ...ENVIRONMENT_VARIABLES }; - }); - - afterAll(() => { - process.env = ENVIRONMENT_VARIABLES; - }); - - describe('Process partial response function call tests', () => { - test('Process partial response function call with asynchronous handler', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - const ret = await asyncProcessPartialResponse( - batch, - asyncSqsRecordHandler, - processor - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); - - test('Process partial response function call with context provided', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new AsyncBatchProcessor(EventType.SQS); - - // Act - const ret = await asyncProcessPartialResponse( - batch, - asyncHandlerWithContext, - processor, - options - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); - }); - - describe('Process partial response function call through handler', () => { - test('Process partial response through handler with SQS event', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new AsyncBatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return asyncProcessPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); - }; - - // Act - const result = await handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - test('Process partial response through handler with Kinesis event', async () => { - // Prepare - const records = [ - kinesisRecordFactory('success'), - kinesisRecordFactory('success'), - ]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); - const event: KinesisStreamEvent = { Records: records }; - - const handler = async ( - event: KinesisStreamEvent, - _context: Context - ): Promise => { - return await asyncProcessPartialResponse( - event, - asyncKinesisRecordHandler, - processor - ); - }; - - // Act - const result = await handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - test('Process partial response through handler with DynamoDB event', async () => { - // Prepare - const records = [ - dynamodbRecordFactory('success'), - dynamodbRecordFactory('success'), - ]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); - const event: DynamoDBStreamEvent = { Records: records }; - - const handler = async ( - event: DynamoDBStreamEvent, - _context: Context - ): Promise => { - return await asyncProcessPartialResponse( - event, - asyncDynamodbRecordHandler, - processor - ); - }; - - // Act - const result = await handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - test('Process partial response through handler for SQS records with incorrect event type', async () => { - // Prepare - const processor = new AsyncBatchProcessor(EventType.SQS); - const event = dummyEvent; - - const handler = async ( - event: SQSEvent, - _context: Context - ): Promise => { - return await asyncProcessPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); - }; - - // Act & Assess - await expect(() => - handler(event as unknown as SQSEvent, context) - ).rejects.toThrowError( - `Unexpected batch type. Possible values are: ${Object.keys( - EventType - ).join(', ')}` - ); - }); - - test('Process partial response through handler with context provided', async () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new AsyncBatchProcessor(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = async ( - event: SQSEvent, - context: Context - ): Promise => { - const options: BatchProcessingOptions = { context: context }; - - return await asyncProcessPartialResponse( - event, - asyncHandlerWithContext, - processor, - options - ); - }; - - // Act - const result = await handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - }); -}); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 5fba12f88e..2c11c42b8c 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -11,7 +11,6 @@ import type { } from 'aws-lambda'; import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; import { Custom as dummyEvent } from '@aws-lambda-powertools/commons/lib/samples/resources/events'; -import { BatchProcessor, processPartialResponse } from '../../src'; import { EventType } from '../../src/constants'; import type { BatchProcessingOptions, @@ -23,11 +22,20 @@ import { sqsRecordFactory, } from '../helpers/factories'; import { + asyncDynamodbRecordHandler, + asyncHandlerWithContext, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, dynamodbRecordHandler, handlerWithContext, kinesisRecordHandler, sqsRecordHandler, } from '../helpers/handlers'; +import { BatchProcessor } from '../../src/BatchProcessor'; +import { + processPartialResponse, + processPartialResponseSync, +} from '../../src/processPartialResponse'; describe('Function: processPartialResponse()', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -55,7 +63,11 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); // Act - const ret = processPartialResponse(batch, sqsRecordHandler, processor); + const ret = processPartialResponseSync( + batch, + sqsRecordHandler, + processor + ); // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); @@ -71,7 +83,7 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); // Act - const ret = processPartialResponse( + const ret = processPartialResponseSync( batch, handlerWithContext, processor, @@ -81,6 +93,47 @@ describe('Function: processPartialResponse()', () => { // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response function call with asynchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response function call with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = await processPartialResponse( + batch, + asyncHandlerWithContext, + processor, + options + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); }); describe('Process partial response function call through handler', () => { @@ -97,7 +150,7 @@ describe('Function: processPartialResponse()', () => { event: SQSEvent, _context: Context ): PartialItemFailureResponse => { - return processPartialResponse(event, sqsRecordHandler, processor); + return processPartialResponseSync(event, sqsRecordHandler, processor); }; // Act @@ -120,7 +173,11 @@ describe('Function: processPartialResponse()', () => { event: KinesisStreamEvent, _context: Context ): PartialItemFailureResponse => { - return processPartialResponse(event, kinesisRecordHandler, processor); + return processPartialResponseSync( + event, + kinesisRecordHandler, + processor + ); }; // Act @@ -143,7 +200,11 @@ describe('Function: processPartialResponse()', () => { event: DynamoDBStreamEvent, _context: Context ): PartialItemFailureResponse => { - return processPartialResponse(event, dynamodbRecordHandler, processor); + return processPartialResponseSync( + event, + dynamodbRecordHandler, + processor + ); }; // Act @@ -162,7 +223,7 @@ describe('Function: processPartialResponse()', () => { event: SQSEvent, _context: Context ): PartialItemFailureResponse => { - return processPartialResponse(event, sqsRecordHandler, processor); + return processPartialResponseSync(event, sqsRecordHandler, processor); }; // Act & Assess @@ -188,7 +249,7 @@ describe('Function: processPartialResponse()', () => { ): PartialItemFailureResponse => { const options: BatchProcessingOptions = { context: context }; - return processPartialResponse( + return processPartialResponseSync( event, handlerWithContext, processor, @@ -202,5 +263,138 @@ describe('Function: processPartialResponse()', () => { // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response through handler with SQS event', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with Kinesis event', async () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + const event: KinesisStreamEvent = { Records: records }; + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncKinesisRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with DynamoDB event', async () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + const event: DynamoDBStreamEvent = { Records: records }; + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncDynamodbRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler for SQS records with incorrect event type', async () => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const event = dummyEvent; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return await processPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + }; + + // Act & Assess + await expect(() => + handler(event as unknown as SQSEvent, context) + ).rejects.toThrowError( + `Unexpected batch type. Possible values are: ${Object.keys( + EventType + ).join(', ')}` + ); + }); + + test('Process partial response through handler with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + context: Context + ): Promise => { + const options: BatchProcessingOptions = { context: context }; + + return await processPartialResponse( + event, + asyncHandlerWithContext, + processor, + options + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); }); });