diff --git a/docs/snippets/batch/accessProcessedMessages.ts b/docs/snippets/batch/accessProcessedMessages.ts index fe5d727ea0..ea99abeda7 100644 --- a/docs/snippets/batch/accessProcessedMessages.ts +++ b/docs/snippets/batch/accessProcessedMessages.ts @@ -25,7 +25,7 @@ export const handler = async ( const batch = event.Records; // (1)! processor.register(batch, recordHandler, { context }); // (2)! - const processedMessages = processor.process(); + const processedMessages = await processor.process(); for (const message of processedMessages) { const status: 'success' | 'fail' = message[0]; diff --git a/docs/snippets/batch/advancedTracingRecordHandler.ts b/docs/snippets/batch/advancedTracingRecordHandler.ts index 4de43b3198..a65c9f976d 100644 --- a/docs/snippets/batch/advancedTracingRecordHandler.ts +++ b/docs/snippets/batch/advancedTracingRecordHandler.ts @@ -15,7 +15,7 @@ import type { const processor = new BatchProcessor(EventType.SQS); const tracer = new Tracer({ serviceName: 'serverlessAirline' }); -const recordHandler = (record: SQSRecord): void => { +const recordHandler = async (record: SQSRecord): Promise => { const subsegment = tracer.getSegment()?.addNewSubsegment('### recordHandler'); // (1)! subsegment?.addAnnotation('messageId', record.messageId); // (2)! diff --git a/docs/snippets/batch/customPartialProcessor.ts b/docs/snippets/batch/customPartialProcessor.ts index 6ebd113282..e634b74d53 100644 --- a/docs/snippets/batch/customPartialProcessor.ts +++ b/docs/snippets/batch/customPartialProcessor.ts @@ -27,12 +27,6 @@ class MyPartialProcessor extends BasePartialBatchProcessor { this.#tableName = tableName; } - public async asyncProcessRecord( - _record: BaseRecord - ): Promise { - throw new Error('Not implemented'); - } - /** * It's called once, **after** processing the batch. * @@ -64,13 +58,21 @@ class MyPartialProcessor extends BasePartialBatchProcessor { this.successMessages = []; } + public async processRecord( + _record: BaseRecord + ): Promise { + throw new Error('Not implemented'); + } + /** * It handles how your record is processed. * * Here we are keeping the status of each run, `this.handler` is * the function that is passed when calling `processor.register()`. */ - public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { + public processRecordSync( + record: BaseRecord + ): SuccessResponse | FailureResponse { try { const result = this.handler(record); diff --git a/docs/snippets/batch/gettingStartedAsync.ts b/docs/snippets/batch/gettingStartedAsync.ts index 0080752026..1d41e06624 100644 --- a/docs/snippets/batch/gettingStartedAsync.ts +++ b/docs/snippets/batch/gettingStartedAsync.ts @@ -1,7 +1,7 @@ import { - AsyncBatchProcessor, + BatchProcessor, EventType, - asyncProcessPartialResponse, + processPartialResponse, } from '@aws-lambda-powertools/batch'; import axios from 'axios'; // axios is an external dependency import type { @@ -11,7 +11,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new AsyncBatchProcessor(EventType.SQS); +const processor = new BatchProcessor(EventType.SQS); const recordHandler = async (record: SQSRecord): Promise => { const res = await axios.post('https://httpbin.org/anything', { @@ -25,7 +25,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return await asyncProcessPartialResponse(event, recordHandler, processor, { + return await processPartialResponse(event, recordHandler, processor, { context, }); }; diff --git a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts index 3304d31b95..8c4ee77e98 100644 --- a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts +++ b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts @@ -14,7 +14,7 @@ import type { const processor = new BatchProcessor(EventType.DynamoDBStreams); // (1)! const logger = new Logger(); -const recordHandler = (record: DynamoDBRecord): void => { +const recordHandler = async (record: DynamoDBRecord): Promise => { if (record.dynamodb && record.dynamodb.NewImage) { logger.info('Processing record', { record: record.dynamodb.NewImage }); const message = record.dynamodb.NewImage.Message.S; diff --git a/docs/snippets/batch/gettingStartedErrorHandling.ts b/docs/snippets/batch/gettingStartedErrorHandling.ts index 08efa51732..20b2b32f60 100644 --- a/docs/snippets/batch/gettingStartedErrorHandling.ts +++ b/docs/snippets/batch/gettingStartedErrorHandling.ts @@ -21,7 +21,7 @@ class InvalidPayload extends Error { } } -const recordHandler = (record: SQSRecord): void => { +const recordHandler = async (record: SQSRecord): Promise => { const payload = record.body; if (payload) { const item = JSON.parse(payload); diff --git a/docs/snippets/batch/gettingStartedKinesis.ts b/docs/snippets/batch/gettingStartedKinesis.ts index 2def6b2e55..b4f3e7403f 100644 --- a/docs/snippets/batch/gettingStartedKinesis.ts +++ b/docs/snippets/batch/gettingStartedKinesis.ts @@ -14,7 +14,7 @@ import type { const processor = new BatchProcessor(EventType.KinesisDataStreams); // (1)! const logger = new Logger(); -const recordHandler = (record: KinesisStreamRecord): void => { +const recordHandler = async (record: KinesisStreamRecord): Promise => { logger.info('Processing record', { record: record.kinesis.data }); const payload = JSON.parse(record.kinesis.data); logger.info('Processed item', { item: payload }); diff --git a/docs/snippets/batch/gettingStartedSQS.ts b/docs/snippets/batch/gettingStartedSQS.ts index 9878b865e9..5d03281490 100644 --- a/docs/snippets/batch/gettingStartedSQS.ts +++ b/docs/snippets/batch/gettingStartedSQS.ts @@ -15,7 +15,7 @@ const processor = new BatchProcessor(EventType.SQS); // (1)! const logger = new Logger(); // prettier-ignore -const recordHandler = (record: SQSRecord): void => { // (2)! +const recordHandler = async (record: SQSRecord): Promise => { // (2)! const payload = record.body; if (payload) { const item = JSON.parse(payload); diff --git a/docs/snippets/batch/gettingStartedSQSFifo.ts b/docs/snippets/batch/gettingStartedSQSFifo.ts index 5c63c1e0d7..3b7e03b3a6 100644 --- a/docs/snippets/batch/gettingStartedSQSFifo.ts +++ b/docs/snippets/batch/gettingStartedSQSFifo.ts @@ -1,6 +1,6 @@ import { SqsFifoPartialProcessor, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -25,7 +25,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ac426546e5..47c14cad56 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -52,7 +52,7 @@ journey Records expired: 1: Failure ``` -This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration: +This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration: * [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. @@ -69,11 +69,11 @@ This behavior changes when you enable Report Batch Item Failures feature in your For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed. -You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned. +Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned. ### Required resources -The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. +The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries. !!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires." @@ -137,7 +137,7 @@ Processing batches from SQS works in three stages: #### FIFO queues When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. -This helps preserve the ordering of messages in your queue. +This helps preserve the ordering of messages in your queue. ```typescript hl_lines="1-4 13 28-30" --8<-- "docs/snippets/batch/gettingStartedSQSFifo.ts" @@ -145,6 +145,10 @@ This helps preserve the ordering of messages in your queue. 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) +!!! Note + Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`. + This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details. + ### Processing messages from Kinesis Processing batches from Kinesis works in three stages: @@ -225,7 +229,7 @@ By default, we catch any exception raised by your record handler function. This --8<-- ``` - 1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor) + 1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor) 2. Exceptions raised in `record_handler` will propagate to `process_partial_response`.

We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab). @@ -356,21 +360,29 @@ sequenceDiagram Kinesis and DynamoDB streams mechanism with multiple batch item failures -### Processing messages asynchronously +### Async or sync processing -You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently. +There are two processors you can use with this utility: -???+ question "When is this useful?" - Your use case might be able to process multiple records at the same time without conflicting with one another. +* **`BatchProcessor`** and **`processPartialResponse`** – Processes messages asynchronously +* **`BatchProcessorSync`** and **`processPartialResponseSync`** – Processes messages synchronously + +In most cases your function will be `async` returning a `Promise`. Therefore, the `BatchProcessor` is the default processor handling your batch records asynchronously. +There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another. +For such cases we recommend to use the `BatchProcessorSync` and `processPartialResponseSync` functions. + +!!! info "Note that you need match your processing function with the right batch processor" + * If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse` + * If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync` +The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order. + +???+ question "When is this useful?" + For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently. The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). -```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor" ---8<-- "docs/snippets/batch/gettingStartedAsync.ts" -``` - ## Advanced ### Accessing processed messages @@ -380,6 +392,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur * **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record + ```typescript hl_lines="25 27-28 30-33 38" title="Accessing processed messages" --8<-- "docs/snippets/batch/accessProcessedMessages.ts" ``` @@ -391,7 +404,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out. -We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function. +We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function. ```typescript hl_lines="17 35" --8<-- "docs/snippets/batch/accessLambdaContext.ts" @@ -408,14 +421,14 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc ???+ example Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing - -```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor" ---8<-- "docs/snippets/batch/extendingFailure.ts" -``` + + ```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor" + --8<-- "docs/snippets/batch/extendingFailure.ts" + ``` ### Create your own partial processor -You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `asyncProcessRecord()` abstract methods. +You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods.
```mermaid @@ -426,28 +439,26 @@ classDiagram +prepare() +clean() +processRecord(record: BaseRecord) - +asyncProcessRecord(record: BaseRecord) + +processRecordSync(record: BaseRecord) } - class YourCustomProcessor { +prepare() +clean() +processRecord(record: BaseRecord) - +asyncProcessRecord(record: BaseRecord) + +processRecordSyc(record: BaseRecord) } - BasePartialProcessor <|-- YourCustomProcessor : extends ``` Visual representation to bring your own processor
-* **`processRecord()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`) * **`prepare()`** – called once as part of the processor initialization * **`clean()`** – teardown logic called once after `processRecord` completes -* **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic - -You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function. +* **`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic +* **`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`) +You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function. + ```typescript hl_lines="21 30 41 62 73 84" title="Creating a custom batch processor" --8<-- "docs/snippets/batch/customPartialProcessor.ts" ``` @@ -456,7 +467,7 @@ You can then use this class as a context manager, or pass it to `processPartialR You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without calling `tracer.setSegment(subsegment)`. This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing. -```ts +```typescript --8<-- "docs/snippets/batch/advancedTracingRecordHandler.ts" ``` @@ -466,7 +477,7 @@ You can use Tracer to create subsegments for each batch record processed. To do ## Testing your code -As there is no external calls, you can unit test your code with `BatchProcessor` quite easily. +As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily. **Example**: diff --git a/packages/batch/README.md b/packages/batch/README.md index 4883d430f2..ff26800107 100644 --- a/packages/batch/README.md +++ b/packages/batch/README.md @@ -60,9 +60,9 @@ When using SQS as a Lambda event source, you can specify the `EventType.SQS` to ```ts import { - BatchProcessor, + BatchProcessorSync, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -72,7 +72,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.SQS); +const processor = new BatchProcessorSync(EventType.SQS); const logger = new Logger(); const recordHandler = (record: SQSRecord): void => { @@ -87,7 +87,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; @@ -100,9 +100,9 @@ When using Kinesis Data Streams as a Lambda event source, you can specify the `E ```ts import { - BatchProcessor, + BatchProcessorSync, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -112,7 +112,7 @@ import type { KinesisStreamBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.KinesisDataStreams); +const processor = new BatchProcessorSync(EventType.KinesisDataStreams); const logger = new Logger(); const recordHandler = (record: KinesisStreamRecord): void => { @@ -125,7 +125,7 @@ export const handler = async ( event: KinesisStreamEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; @@ -133,13 +133,13 @@ export const handler = async ( ### DynamoDB Streams Processor -When using DynamoDB Streams as a Lambda event source, you can use the `BatchProcessor` with the `EventType.DynamoDBStreams` to process the records. The response will be a `DynamoDBBatchResponse` which contains a list of items that failed to be processed. +When using DynamoDB Streams as a Lambda event source, you can use the `BatchProcessorSync` with the `EventType.DynamoDBStreams` to process the records. The response will be a `DynamoDBBatchResponse` which contains a list of items that failed to be processed. ```ts import { - BatchProcessor, + BatchProcessorSync, EventType, - processPartialResponse, + processPartialResponseSync, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -149,7 +149,7 @@ import type { DynamoDBBatchResponse, } from 'aws-lambda'; -const processor = new BatchProcessor(EventType.DynamoDBStreams); +const processor = new BatchProcessorSync(EventType.DynamoDBStreams); const logger = new Logger(); const recordHandler = (record: DynamoDBRecord): void => { @@ -167,7 +167,7 @@ export const handler = async ( event: DynamoDBStreamEvent, context: Context ): Promise => { - return processPartialResponse(event, recordHandler, processor, { + return processPartialResponseSync(event, recordHandler, processor, { context, }); }; @@ -175,13 +175,13 @@ export const handler = async ( ### Async processing -If your use case allows you to process multiple records at the same time without conflicting with each other, you can use the `AsyncBatchProcessor` to process records asynchronously. This will create an array of promises that will be resolved once all records have been processed. +If your use case allows you to process multiple records at the same time without conflicting with each other, you can use the `BatchProcessor` to process records asynchronously. This will create an array of promises that will be resolved once all records have been processed. ```ts import { - AsyncBatchProcessor, + BatchProcessor, EventType, - asyncProcessPartialResponse, + processPartialResponse, } from '@aws-lambda-powertools/batch'; import axios from 'axios'; // axios is an external dependency import type { @@ -191,7 +191,7 @@ import type { SQSBatchResponse, } from 'aws-lambda'; -const processor = new AsyncBatchProcessor(EventType.SQS); +const processor = new BatchProcessor(EventType.SQS); const recordHandler = async (record: SQSRecord): Promise => { const res = await axios.post('https://httpbin.org/anything', { @@ -205,7 +205,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return await asyncProcessPartialResponse(event, recordHandler, processor, { + return await processPartialResponse(event, recordHandler, processor, { context, }); }; diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index a51400d4a5..9de6901394 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -34,40 +34,6 @@ abstract class BasePartialProcessor { this.handler = new Function(); } - /** - * Call instance's handler for each record - * @returns List of processed records - */ - public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> { - /** - * If this is an sync processor, user should have called process instead, - * so we call the method early to throw the error early thus failing fast. - */ - if (this.constructor.name === 'BatchProcessor') { - await this.asyncProcessRecord(this.records[0]); - } - this.prepare(); - - const processingPromises: Promise[] = - this.records.map((record) => this.asyncProcessRecord(record)); - - const processedRecords: (SuccessResponse | FailureResponse)[] = - await Promise.all(processingPromises); - - this.clean(); - - return processedRecords; - } - - /** - * Process a record with an asyncronous handler - * - * @param record Record to be processed - */ - public abstract asyncProcessRecord( - record: BaseRecord - ): Promise; - /** * Clean class instance after processing */ @@ -99,20 +65,21 @@ abstract class BasePartialProcessor { * Call instance's handler for each record * @returns List of processed records */ - public process(): (SuccessResponse | FailureResponse)[] { + public async process(): Promise<(SuccessResponse | FailureResponse)[]> { /** - * If this is an async processor, user should have called processAsync instead, + * If this is a sync processor, user should have called processSync instead, * so we call the method early to throw the error early thus failing fast. */ - if (this.constructor.name === 'AsyncBatchProcessor') { - this.processRecord(this.records[0]); + if (this.constructor.name === 'BatchProcessorSync') { + await this.processRecord(this.records[0]); } this.prepare(); - const processedRecords: (SuccessResponse | FailureResponse)[] = []; - for (const record of this.records) { - processedRecords.push(this.processRecord(record)); - } + const processingPromises: Promise[] = + this.records.map((record) => this.processRecord(record)); + + const processedRecords: (SuccessResponse | FailureResponse)[] = + await Promise.all(processingPromises); this.clean(); @@ -120,17 +87,51 @@ abstract class BasePartialProcessor { } /** - * Process a record with the handler + * Process a record with an asyncronous handler + * * @param record Record to be processed */ public abstract processRecord( record: BaseRecord + ): Promise; + + /** + * Process a record with the handler + * @param record Record to be processed + */ + public abstract processRecordSync( + record: BaseRecord ): SuccessResponse | FailureResponse; + /** + * Call instance's handler for each record + * @returns List of processed records + */ + public processSync(): (SuccessResponse | FailureResponse)[] { + /** + * If this is an async processor, user should have called process instead, + * so we call the method early to throw the error early thus failing fast. + */ + if (this.constructor.name === 'BatchProcessor') { + this.processRecordSync(this.records[0]); + } + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + for (const record of this.records) { + processedRecords.push(this.processRecordSync(record)); + } + + this.clean(); + + return processedRecords; + } + /** * Set class instance attributes before execution * @param records List of records to be processed * @param handler CallableFunction to process entries of "records" + * @param options Options to be used during processing * @returns this object */ public register( diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 730b3e94ef..bbe9856f07 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -6,27 +6,31 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB */ class BatchProcessor extends BasePartialBatchProcessor { - public async asyncProcessRecord( - _record: BaseRecord + public async processRecord( + record: BaseRecord ): Promise { - throw new BatchProcessingError('Not implemented. Use process() instead.'); - } - - /** - * Process a record with instance's handler - * @param record Batch record to be processed - * @returns response of success or failure - */ - public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { try { const data = this.toBatchType(record, this.eventType); - const result = this.handler(data, this.options?.context); + const result = await this.handler(data, this.options?.context); return this.successHandler(record, result); } catch (error) { return this.failureHandler(record, error as Error); } } + + /** + * Process a record with instance's handler + * @param _record Batch record to be processed + * @returns response of success or failure + */ + public processRecordSync( + _record: BaseRecord + ): SuccessResponse | FailureResponse { + throw new BatchProcessingError( + 'Not implemented. Use asyncProcess() instead.' + ); + } } export { BatchProcessor }; diff --git a/packages/batch/src/AsyncBatchProcessor.ts b/packages/batch/src/BatchProcessorSync.ts similarity index 66% rename from packages/batch/src/AsyncBatchProcessor.ts rename to packages/batch/src/BatchProcessorSync.ts index 10c404a323..5bb8ce1f12 100644 --- a/packages/batch/src/AsyncBatchProcessor.ts +++ b/packages/batch/src/BatchProcessorSync.ts @@ -5,18 +5,11 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; /** * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB */ -class AsyncBatchProcessor extends BasePartialBatchProcessor { - public async asyncProcessRecord( - record: BaseRecord +class BatchProcessorSync extends BasePartialBatchProcessor { + public async processRecord( + _record: BaseRecord ): Promise { - try { - const data = this.toBatchType(record, this.eventType); - const result = await this.handler(data, this.options?.context); - - return this.successHandler(record, result); - } catch (error) { - return this.failureHandler(record, error as Error); - } + throw new BatchProcessingError('Not implemented. Use process() instead.'); } /** @@ -24,11 +17,18 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor { * @param record Batch record to be processed * @returns response of success or failure */ - public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse { - throw new BatchProcessingError( - 'Not implemented. Use asyncProcess() instead.' - ); + public processRecordSync( + record: BaseRecord + ): SuccessResponse | FailureResponse { + try { + const data = this.toBatchType(record, this.eventType); + const result = this.handler(data, this.options?.context); + + return this.successHandler(record, result); + } catch (error) { + return this.failureHandler(record, error as Error); + } } } -export { AsyncBatchProcessor }; +export { BatchProcessorSync }; diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index ebdd4d73e8..2aca4ed814 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,4 +1,4 @@ -import { BatchProcessor } from './BatchProcessor'; +import { BatchProcessorSync } from './BatchProcessorSync'; import { EventType } from './constants'; import { SqsFifoShortCircuitError } from './errors'; import type { FailureResponse, SuccessResponse } from './types'; @@ -8,7 +8,7 @@ import type { FailureResponse, SuccessResponse } from './types'; * Stops processing records when the first record fails * The remaining records are reported as failed items */ -class SqsFifoPartialProcessor extends BatchProcessor { +class SqsFifoPartialProcessor extends BatchProcessorSync { public constructor() { super(EventType.SQS); } @@ -18,7 +18,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { * When the first failed message is detected, the process is short-circuited * And the remaining messages are reported as failed items */ - public process(): (SuccessResponse | FailureResponse)[] { + public processSync(): (SuccessResponse | FailureResponse)[] { this.prepare(); const processedRecords: (SuccessResponse | FailureResponse)[] = []; @@ -30,7 +30,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { return this.shortCircuitProcessing(currentIndex, processedRecords); } - processedRecords.push(this.processRecord(record)); + processedRecords.push(this.processRecordSync(record)); currentIndex++; } diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 96f931823d..b1b1069b26 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -3,8 +3,8 @@ export * from './errors'; export * from './types'; export * from './BasePartialProcessor'; export * from './BasePartialBatchProcessor'; +export * from './BatchProcessorSync'; export * from './BatchProcessor'; -export * from './AsyncBatchProcessor'; +export * from './processPartialResponseSync'; export * from './processPartialResponse'; -export * from './asyncProcessPartialResponse'; export * from './SqsFifoPartialProcessor'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index 2385f28666..947b4268b6 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -11,21 +11,22 @@ import type { * @param event Lambda's original event * @param recordHandler Callable function to process each record from the batch * @param processor Batch processor to handle partial failure cases + * @param options Batch processing options * @returns Lambda Partial Batch Response */ -const processPartialResponse = ( +const processPartialResponse = async ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, processor: BasePartialBatchProcessor, options?: BatchProcessingOptions -): PartialItemFailureResponse => { +): Promise => { if (!event.Records || !Array.isArray(event.Records)) { throw new UnexpectedBatchTypeError(); } processor.register(event.Records, recordHandler, options); - processor.process(); + await processor.process(); return processor.response(); }; diff --git a/packages/batch/src/asyncProcessPartialResponse.ts b/packages/batch/src/processPartialResponseSync.ts similarity index 84% rename from packages/batch/src/asyncProcessPartialResponse.ts rename to packages/batch/src/processPartialResponseSync.ts index 5a33b5b534..474713e7d5 100644 --- a/packages/batch/src/asyncProcessPartialResponse.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -13,21 +13,21 @@ import type { * @param processor Batch processor to handle partial failure cases * @returns Lambda Partial Batch Response */ -const asyncProcessPartialResponse = async ( +const processPartialResponseSync = ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, processor: BasePartialBatchProcessor, options?: BatchProcessingOptions -): Promise => { +): PartialItemFailureResponse => { if (!event.Records || !Array.isArray(event.Records)) { throw new UnexpectedBatchTypeError(); } processor.register(event.Records, recordHandler, options); - await processor.asyncProcess(); + processor.processSync(); return processor.response(); }; -export { asyncProcessPartialResponse }; +export { processPartialResponseSync }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 0f928b4bc7..99cb2245e5 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -1,7 +1,7 @@ /** - * Test BatchProcessor class + * Test AsyncBatchProcessor class * - * @group unit/batch/class/batchprocessor + * @group unit/batch/class/asyncBatchProcessor */ import type { Context } from 'aws-lambda'; import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; @@ -15,13 +15,13 @@ import { sqsRecordFactory, } from '../helpers/factories'; import { - dynamodbRecordHandler, - handlerWithContext, - kinesisRecordHandler, - sqsRecordHandler, + asyncDynamodbRecordHandler, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, + asyncHandlerWithContext, } from '../helpers/handlers'; -describe('Class: BatchProcessor', () => { +describe('Class: AsyncBatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; const options: BatchProcessingOptions = { context: dummyContext }; @@ -35,8 +35,8 @@ describe('Class: BatchProcessor', () => { process.env = ENVIRONMENT_VARIABLES; }); - describe('Synchronously processing SQS Records', () => { - test('Batch processing SQS records with no failures', () => { + describe('Asynchronously processing SQS Records', () => { + test('Batch processing SQS records with no failures', async () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -44,8 +44,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.SQS); // Act - processor.register(records, sqsRecordHandler); - const processedMessages = processor.process(); + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -54,7 +54,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing SQS records with some failures', () => { + test('Batch processing SQS records with some failures', async () => { // Prepare const firstRecord = sqsRecordFactory('failure'); const secondRecord = sqsRecordFactory('success'); @@ -63,8 +63,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.SQS); // Act - processor.register(records, sqsRecordHandler); - const processedMessages = processor.process(); + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.process(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -81,7 +81,7 @@ describe('Class: BatchProcessor', () => { }); }); - test('Batch processing SQS records with all failures', () => { + test('Batch processing SQS records with all failures', async () => { // Prepare const firstRecord = sqsRecordFactory('failure'); const secondRecord = sqsRecordFactory('failure'); @@ -90,14 +90,18 @@ describe('Class: BatchProcessor', () => { const records = [firstRecord, secondRecord, thirdRecord]; const processor = new BatchProcessor(EventType.SQS); - // Act & Assess - processor.register(records, sqsRecordHandler); - expect(() => processor.process()).toThrowError(FullBatchFailureError); + // Act + processor.register(records, asyncSqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); - describe('Synchronously processing Kinesis Records', () => { - test('Batch processing Kinesis records with no failures', () => { + describe('Asynchronously processing Kinesis Records', () => { + test('Batch processing Kinesis records with no failures', async () => { // Prepare const firstRecord = kinesisRecordFactory('success'); const secondRecord = kinesisRecordFactory('success'); @@ -105,8 +109,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.KinesisDataStreams); // Act - processor.register(records, kinesisRecordHandler); - const processedMessages = processor.process(); + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -115,7 +119,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing Kinesis records with some failures', () => { + test('Batch processing Kinesis records with some failures', async () => { // Prepare const firstRecord = kinesisRecordFactory('failure'); const secondRecord = kinesisRecordFactory('success'); @@ -124,8 +128,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.KinesisDataStreams); // Act - processor.register(records, kinesisRecordHandler); - const processedMessages = processor.process(); + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.process(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -142,7 +146,8 @@ describe('Class: BatchProcessor', () => { }); }); - test('Batch processing Kinesis records with all failures', () => { + test('Batch processing Kinesis records with all failures', async () => { + // Prepare const firstRecord = kinesisRecordFactory('failure'); const secondRecord = kinesisRecordFactory('failure'); const thirdRecord = kinesisRecordFactory('fail'); @@ -151,15 +156,17 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.KinesisDataStreams); // Act - processor.register(records, kinesisRecordHandler); + processor.register(records, asyncKinesisRecordHandler); // Assess - expect(() => processor.process()).toThrowError(FullBatchFailureError); + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); - describe('Synchronously processing DynamoDB Records', () => { - test('Batch processing DynamoDB records with no failures', () => { + describe('Asynchronously processing DynamoDB Records', () => { + test('Batch processing DynamoDB records with no failures', async () => { // Prepare const firstRecord = dynamodbRecordFactory('success'); const secondRecord = dynamodbRecordFactory('success'); @@ -167,8 +174,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.DynamoDBStreams); // Act - processor.register(records, dynamodbRecordHandler); - const processedMessages = processor.process(); + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -177,7 +184,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing DynamoDB records with some failures', () => { + test('Batch processing DynamoDB records with some failures', async () => { // Prepare const firstRecord = dynamodbRecordFactory('failure'); const secondRecord = dynamodbRecordFactory('success'); @@ -186,8 +193,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.DynamoDBStreams); // Act - processor.register(records, dynamodbRecordHandler); - const processedMessages = processor.process(); + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.process(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -204,7 +211,7 @@ describe('Class: BatchProcessor', () => { }); }); - test('Batch processing DynamoDB records with all failures', () => { + test('Batch processing DynamoDB records with all failures', async () => { // Prepare const firstRecord = dynamodbRecordFactory('failure'); const secondRecord = dynamodbRecordFactory('failure'); @@ -214,15 +221,17 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.DynamoDBStreams); // Act - processor.register(records, dynamodbRecordHandler); + processor.register(records, asyncDynamodbRecordHandler); // Assess - expect(() => processor.process()).toThrowError(FullBatchFailureError); + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); describe('Batch processing with Lambda context', () => { - test('Batch processing when context is provided and handler accepts', () => { + test('Batch processing when context is provided and handler accepts', async () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -230,8 +239,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.SQS); // Act - processor.register(records, handlerWithContext, options); - const processedMessages = processor.process(); + processor.register(records, asyncHandlerWithContext, options); + const processedMessages = await processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -240,7 +249,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing when context is provided and handler does not accept', () => { + test('Batch processing when context is provided and handler does not accept', async () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -248,8 +257,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.SQS); // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = processor.process(); + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.process(); // Assess expect(processedMessages).toStrictEqual([ @@ -258,7 +267,7 @@ describe('Class: BatchProcessor', () => { ]); }); - test('Batch processing when malformed context is provided and handler attempts to use', () => { + test('Batch processing when malformed context is provided and handler attempts to use', async () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); @@ -268,18 +277,18 @@ describe('Class: BatchProcessor', () => { const badOptions = { context: badContext as unknown as Context }; // Act - processor.register(records, handlerWithContext, badOptions); - expect(() => processor.process()).toThrowError(FullBatchFailureError); + processor.register(records, asyncHandlerWithContext, badOptions); + await expect(() => processor.process()).rejects.toThrowError( + FullBatchFailureError + ); }); }); - test('When calling the async process method, it should throw an error', async () => { + test('When calling the sync process method, it should throw an error', () => { // Prepare const processor = new BatchProcessor(EventType.SQS); // Act & Assess - await expect(() => processor.asyncProcess()).rejects.toThrowError( - BatchProcessingError - ); + expect(() => processor.processSync()).toThrowError(BatchProcessingError); }); }); diff --git a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessorSync.test.ts similarity index 63% rename from packages/batch/tests/unit/AsyncBatchProcessor.test.ts rename to packages/batch/tests/unit/BatchProcessorSync.test.ts index 127bae2705..4f77d299cc 100644 --- a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessorSync.test.ts @@ -1,11 +1,11 @@ /** - * Test AsyncBatchProcessor class + * Test BatchProcessor class * - * @group unit/batch/class/asyncBatchProcessor + * @group unit/batch/class/batchprocessor */ import type { Context } from 'aws-lambda'; import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; -import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor'; +import { BatchProcessorSync } from '../../src/BatchProcessorSync'; import { EventType } from '../../src/constants'; import { BatchProcessingError, FullBatchFailureError } from '../../src/errors'; import type { BatchProcessingOptions } from '../../src/types'; @@ -15,13 +15,13 @@ import { sqsRecordFactory, } from '../helpers/factories'; import { - asyncDynamodbRecordHandler, - asyncKinesisRecordHandler, - asyncSqsRecordHandler, - asyncHandlerWithContext, + dynamodbRecordHandler, + handlerWithContext, + kinesisRecordHandler, + sqsRecordHandler, } from '../helpers/handlers'; -describe('Class: AsyncBatchProcessor', () => { +describe('Class: BatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; const options: BatchProcessingOptions = { context: dummyContext }; @@ -35,17 +35,17 @@ describe('Class: AsyncBatchProcessor', () => { process.env = ENVIRONMENT_VARIABLES; }); - describe('Asynchronously processing SQS Records', () => { - test('Batch processing SQS records with no failures', async () => { + describe('Synchronously processing SQS Records', () => { + test('Batch processing SQS records with no failures', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.asyncProcess(); + processor.register(records, sqsRecordHandler); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -54,17 +54,17 @@ describe('Class: AsyncBatchProcessor', () => { ]); }); - test('Batch processing SQS records with some failures', async () => { + test('Batch processing SQS records with some failures', () => { // Prepare const firstRecord = sqsRecordFactory('failure'); const secondRecord = sqsRecordFactory('success'); const thirdRecord = sqsRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act - processor.register(records, asyncSqsRecordHandler); - const processedMessages = await processor.asyncProcess(); + processor.register(records, sqsRecordHandler); + const processedMessages = processor.processSync(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -81,36 +81,32 @@ describe('Class: AsyncBatchProcessor', () => { }); }); - test('Batch processing SQS records with all failures', async () => { + test('Batch processing SQS records with all failures', () => { // Prepare const firstRecord = sqsRecordFactory('failure'); const secondRecord = sqsRecordFactory('failure'); const thirdRecord = sqsRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); - // Act - processor.register(records, asyncSqsRecordHandler); - - // Assess - await expect(processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); + // Act & Assess + processor.register(records, sqsRecordHandler); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); - describe('Asynchronously processing Kinesis Records', () => { - test('Batch processing Kinesis records with no failures', async () => { + describe('Synchronously processing Kinesis Records', () => { + test('Batch processing Kinesis records with no failures', () => { // Prepare const firstRecord = kinesisRecordFactory('success'); const secondRecord = kinesisRecordFactory('success'); const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + const processor = new BatchProcessorSync(EventType.KinesisDataStreams); // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.asyncProcess(); + processor.register(records, kinesisRecordHandler); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -119,17 +115,17 @@ describe('Class: AsyncBatchProcessor', () => { ]); }); - test('Batch processing Kinesis records with some failures', async () => { + test('Batch processing Kinesis records with some failures', () => { // Prepare const firstRecord = kinesisRecordFactory('failure'); const secondRecord = kinesisRecordFactory('success'); const thirdRecord = kinesisRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + const processor = new BatchProcessorSync(EventType.KinesisDataStreams); // Act - processor.register(records, asyncKinesisRecordHandler); - const processedMessages = await processor.asyncProcess(); + processor.register(records, kinesisRecordHandler); + const processedMessages = processor.processSync(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -146,36 +142,33 @@ describe('Class: AsyncBatchProcessor', () => { }); }); - test('Batch processing Kinesis records with all failures', async () => { - // Prepare + test('Batch processing Kinesis records with all failures', () => { const firstRecord = kinesisRecordFactory('failure'); const secondRecord = kinesisRecordFactory('failure'); const thirdRecord = kinesisRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + const processor = new BatchProcessorSync(EventType.KinesisDataStreams); // Act - processor.register(records, asyncKinesisRecordHandler); + processor.register(records, kinesisRecordHandler); // Assess - await expect(processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); - describe('Asynchronously processing DynamoDB Records', () => { - test('Batch processing DynamoDB records with no failures', async () => { + describe('Synchronously processing DynamoDB Records', () => { + test('Batch processing DynamoDB records with no failures', () => { // Prepare const firstRecord = dynamodbRecordFactory('success'); const secondRecord = dynamodbRecordFactory('success'); const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + const processor = new BatchProcessorSync(EventType.DynamoDBStreams); // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.asyncProcess(); + processor.register(records, dynamodbRecordHandler); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -184,17 +177,17 @@ describe('Class: AsyncBatchProcessor', () => { ]); }); - test('Batch processing DynamoDB records with some failures', async () => { + test('Batch processing DynamoDB records with some failures', () => { // Prepare const firstRecord = dynamodbRecordFactory('failure'); const secondRecord = dynamodbRecordFactory('success'); const thirdRecord = dynamodbRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + const processor = new BatchProcessorSync(EventType.DynamoDBStreams); // Act - processor.register(records, asyncDynamodbRecordHandler); - const processedMessages = await processor.asyncProcess(); + processor.register(records, dynamodbRecordHandler); + const processedMessages = processor.processSync(); // Assess expect(processedMessages[1]).toStrictEqual([ @@ -211,36 +204,34 @@ describe('Class: AsyncBatchProcessor', () => { }); }); - test('Batch processing DynamoDB records with all failures', async () => { + test('Batch processing DynamoDB records with all failures', () => { // Prepare const firstRecord = dynamodbRecordFactory('failure'); const secondRecord = dynamodbRecordFactory('failure'); const thirdRecord = dynamodbRecordFactory('fail'); const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + const processor = new BatchProcessorSync(EventType.DynamoDBStreams); // Act - processor.register(records, asyncDynamodbRecordHandler); + processor.register(records, dynamodbRecordHandler); // Assess - await expect(processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); describe('Batch processing with Lambda context', () => { - test('Batch processing when context is provided and handler accepts', async () => { + test('Batch processing when context is provided and handler accepts', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act - processor.register(records, asyncHandlerWithContext, options); - const processedMessages = await processor.asyncProcess(); + processor.register(records, handlerWithContext, options); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -249,16 +240,16 @@ describe('Class: AsyncBatchProcessor', () => { ]); }); - test('Batch processing when context is provided and handler does not accept', async () => { + test('Batch processing when context is provided and handler does not accept', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act - processor.register(records, asyncSqsRecordHandler, options); - const processedMessages = await processor.asyncProcess(); + processor.register(records, sqsRecordHandler, options); + const processedMessages = processor.processSync(); // Assess expect(processedMessages).toStrictEqual([ @@ -267,28 +258,28 @@ describe('Class: AsyncBatchProcessor', () => { ]); }); - test('Batch processing when malformed context is provided and handler attempts to use', async () => { + test('Batch processing when malformed context is provided and handler attempts to use', () => { // Prepare const firstRecord = sqsRecordFactory('success'); const secondRecord = sqsRecordFactory('success'); const records = [firstRecord, secondRecord]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); const badContext = { foo: 'bar' }; const badOptions = { context: badContext as unknown as Context }; // Act - processor.register(records, asyncHandlerWithContext, badOptions); - await expect(() => processor.asyncProcess()).rejects.toThrowError( - FullBatchFailureError - ); + processor.register(records, handlerWithContext, badOptions); + expect(() => processor.processSync()).toThrowError(FullBatchFailureError); }); }); - test('When calling the sync process method, it should throw an error', () => { + test('When calling the async process method, it should throw an error', async () => { // Prepare - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act & Assess - expect(() => processor.process()).toThrowError(BatchProcessingError); + await expect(() => processor.process()).rejects.toThrowError( + BatchProcessingError + ); }); }); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 127c87a0a2..5c7f6c5796 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -5,7 +5,7 @@ */ import { SqsFifoPartialProcessor, - processPartialResponse, + processPartialResponseSync, SqsFifoShortCircuitError, } from '../../src'; import { sqsRecordFactory } from '../helpers/factories'; @@ -33,7 +33,11 @@ describe('Class: SqsFifoBatchProcessor', () => { const processor = new SqsFifoPartialProcessor(); // Act - const result = processPartialResponse(event, sqsRecordHandler, processor); + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor + ); // Assess expect(result['batchItemFailures']).toStrictEqual([]); @@ -48,7 +52,11 @@ describe('Class: SqsFifoBatchProcessor', () => { const processor = new SqsFifoPartialProcessor(); // Act - const result = processPartialResponse(event, sqsRecordHandler, processor); + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor + ); // Assess expect(result['batchItemFailures'].length).toBe(2); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 5fba12f88e..2dbe585b45 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -1,7 +1,7 @@ /** - * Test processPartialResponse function + * Test asyncProcessPartialResponse function * - * @group unit/batch/function/processpartialresponse + * @group unit/batch/function/asyncProcesspartialresponse */ import type { Context, @@ -11,7 +11,8 @@ import type { } from 'aws-lambda'; import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; import { Custom as dummyEvent } from '@aws-lambda-powertools/commons/lib/samples/resources/events'; -import { BatchProcessor, processPartialResponse } from '../../src'; +import { BatchProcessor } from '../../src/BatchProcessor'; +import { processPartialResponse } from '../../src/processPartialResponse'; import { EventType } from '../../src/constants'; import type { BatchProcessingOptions, @@ -23,10 +24,10 @@ import { sqsRecordFactory, } from '../helpers/factories'; import { - dynamodbRecordHandler, - handlerWithContext, - kinesisRecordHandler, - sqsRecordHandler, + asyncDynamodbRecordHandler, + asyncHandlerWithContext, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, } from '../helpers/handlers'; describe('Function: processPartialResponse()', () => { @@ -45,7 +46,7 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call tests', () => { - test('Process partial response function call with synchronous handler', () => { + test('Process partial response function call with asynchronous handler', async () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -55,13 +56,17 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); // Act - const ret = processPartialResponse(batch, sqsRecordHandler, processor); + const ret = await processPartialResponse( + batch, + asyncSqsRecordHandler, + processor + ); // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response function call with context provided', () => { + test('Process partial response function call with context provided', async () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -71,9 +76,9 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); // Act - const ret = processPartialResponse( + const ret = await processPartialResponse( batch, - handlerWithContext, + asyncHandlerWithContext, processor, options ); @@ -84,7 +89,7 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call through handler', () => { - test('Process partial response through handler with SQS event', () => { + test('Process partial response through handler with SQS event', async () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -93,21 +98,21 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = ( + const handler = async ( event: SQSEvent, _context: Context - ): PartialItemFailureResponse => { - return processPartialResponse(event, sqsRecordHandler, processor); + ): Promise => { + return processPartialResponse(event, asyncSqsRecordHandler, processor); }; // Act - const result = handler(event, context); + const result = await handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler with Kinesis event', () => { + test('Process partial response through handler with Kinesis event', async () => { // Prepare const records = [ kinesisRecordFactory('success'), @@ -116,21 +121,25 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.KinesisDataStreams); const event: KinesisStreamEvent = { Records: records }; - const handler = ( + const handler = async ( event: KinesisStreamEvent, _context: Context - ): PartialItemFailureResponse => { - return processPartialResponse(event, kinesisRecordHandler, processor); + ): Promise => { + return await processPartialResponse( + event, + asyncKinesisRecordHandler, + processor + ); }; // Act - const result = handler(event, context); + const result = await handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler with DynamoDB event', () => { + test('Process partial response through handler with DynamoDB event', async () => { // Prepare const records = [ dynamodbRecordFactory('success'), @@ -139,41 +148,51 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.DynamoDBStreams); const event: DynamoDBStreamEvent = { Records: records }; - const handler = ( + const handler = async ( event: DynamoDBStreamEvent, _context: Context - ): PartialItemFailureResponse => { - return processPartialResponse(event, dynamodbRecordHandler, processor); + ): Promise => { + return await processPartialResponse( + event, + asyncDynamodbRecordHandler, + processor + ); }; // Act - const result = handler(event, context); + const result = await handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler for SQS records with incorrect event type', () => { + test('Process partial response through handler for SQS records with incorrect event type', async () => { // Prepare const processor = new BatchProcessor(EventType.SQS); const event = dummyEvent; - const handler = ( + const handler = async ( event: SQSEvent, _context: Context - ): PartialItemFailureResponse => { - return processPartialResponse(event, sqsRecordHandler, processor); + ): Promise => { + return await processPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); }; // Act & Assess - expect(() => handler(event as unknown as SQSEvent, context)).toThrowError( + await expect(() => + handler(event as unknown as SQSEvent, context) + ).rejects.toThrowError( `Unexpected batch type. Possible values are: ${Object.keys( EventType ).join(', ')}` ); }); - test('Process partial response through handler with context provided', () => { + test('Process partial response through handler with context provided', async () => { // Prepare const records = [ sqsRecordFactory('success'), @@ -182,22 +201,22 @@ describe('Function: processPartialResponse()', () => { const processor = new BatchProcessor(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = ( + const handler = async ( event: SQSEvent, context: Context - ): PartialItemFailureResponse => { + ): Promise => { const options: BatchProcessingOptions = { context: context }; - return processPartialResponse( + return await processPartialResponse( event, - handlerWithContext, + asyncHandlerWithContext, processor, options ); }; // Act - const result = handler(event, context); + const result = await handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); diff --git a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponseSync.test.ts similarity index 62% rename from packages/batch/tests/unit/asyncProcessPartialResponse.test.ts rename to packages/batch/tests/unit/processPartialResponseSync.test.ts index c7e98edca4..4c3e9a47a5 100644 --- a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponseSync.test.ts @@ -1,7 +1,7 @@ /** - * Test asyncProcessPartialResponse function + * Test processPartialResponse function * - * @group unit/batch/function/asyncProcesspartialresponse + * @group unit/batch/function/processpartialresponse */ import type { Context, @@ -11,7 +11,8 @@ import type { } from 'aws-lambda'; import { helloworldContext as dummyContext } from '@aws-lambda-powertools/commons/lib/samples/resources/contexts'; import { Custom as dummyEvent } from '@aws-lambda-powertools/commons/lib/samples/resources/events'; -import { AsyncBatchProcessor, asyncProcessPartialResponse } from '../../src'; +import { BatchProcessorSync } from '../../src/BatchProcessorSync'; +import { processPartialResponseSync } from '../../src/processPartialResponseSync'; import { EventType } from '../../src/constants'; import type { BatchProcessingOptions, @@ -23,10 +24,10 @@ import { sqsRecordFactory, } from '../helpers/factories'; import { - asyncDynamodbRecordHandler, - asyncHandlerWithContext, - asyncKinesisRecordHandler, - asyncSqsRecordHandler, + dynamodbRecordHandler, + handlerWithContext, + kinesisRecordHandler, + sqsRecordHandler, } from '../helpers/handlers'; describe('Function: processPartialResponse()', () => { @@ -45,19 +46,19 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call tests', () => { - test('Process partial response function call with asynchronous handler', async () => { + test('Process partial response function call with synchronous handler', () => { // Prepare const records = [ sqsRecordFactory('success'), sqsRecordFactory('success'), ]; const batch = { Records: records }; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act - const ret = await asyncProcessPartialResponse( + const ret = processPartialResponseSync( batch, - asyncSqsRecordHandler, + sqsRecordHandler, processor ); @@ -65,19 +66,19 @@ describe('Function: processPartialResponse()', () => { expect(ret).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response function call with context provided', async () => { + test('Process partial response function call with context provided', () => { // Prepare const records = [ sqsRecordFactory('success'), sqsRecordFactory('success'), ]; const batch = { Records: records }; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); // Act - const ret = await asyncProcessPartialResponse( + const ret = processPartialResponseSync( batch, - asyncHandlerWithContext, + handlerWithContext, processor, options ); @@ -88,138 +89,128 @@ describe('Function: processPartialResponse()', () => { }); describe('Process partial response function call through handler', () => { - test('Process partial response through handler with SQS event', async () => { + test('Process partial response through handler with SQS event', () => { // Prepare const records = [ sqsRecordFactory('success'), sqsRecordFactory('success'), ]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( + const handler = ( event: SQSEvent, _context: Context - ): Promise => { - return asyncProcessPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler with Kinesis event', async () => { + test('Process partial response through handler with Kinesis event', () => { // Prepare const records = [ kinesisRecordFactory('success'), kinesisRecordFactory('success'), ]; - const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + const processor = new BatchProcessorSync(EventType.KinesisDataStreams); const event: KinesisStreamEvent = { Records: records }; - const handler = async ( + const handler = ( event: KinesisStreamEvent, _context: Context - ): Promise => { - return await asyncProcessPartialResponse( + ): PartialItemFailureResponse => { + return processPartialResponseSync( event, - asyncKinesisRecordHandler, + kinesisRecordHandler, processor ); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler with DynamoDB event', async () => { + test('Process partial response through handler with DynamoDB event', () => { // Prepare const records = [ dynamodbRecordFactory('success'), dynamodbRecordFactory('success'), ]; - const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + const processor = new BatchProcessorSync(EventType.DynamoDBStreams); const event: DynamoDBStreamEvent = { Records: records }; - const handler = async ( + const handler = ( event: DynamoDBStreamEvent, _context: Context - ): Promise => { - return await asyncProcessPartialResponse( + ): PartialItemFailureResponse => { + return processPartialResponseSync( event, - asyncDynamodbRecordHandler, + dynamodbRecordHandler, processor ); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] }); }); - test('Process partial response through handler for SQS records with incorrect event type', async () => { + test('Process partial response through handler for SQS records with incorrect event type', () => { // Prepare - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); const event = dummyEvent; - const handler = async ( + const handler = ( event: SQSEvent, _context: Context - ): Promise => { - return await asyncProcessPartialResponse( - event, - asyncSqsRecordHandler, - processor - ); + ): PartialItemFailureResponse => { + return processPartialResponseSync(event, sqsRecordHandler, processor); }; // Act & Assess - await expect(() => - handler(event as unknown as SQSEvent, context) - ).rejects.toThrowError( + expect(() => handler(event as unknown as SQSEvent, context)).toThrowError( `Unexpected batch type. Possible values are: ${Object.keys( EventType ).join(', ')}` ); }); - test('Process partial response through handler with context provided', async () => { + test('Process partial response through handler with context provided', () => { // Prepare const records = [ sqsRecordFactory('success'), sqsRecordFactory('success'), ]; - const processor = new AsyncBatchProcessor(EventType.SQS); + const processor = new BatchProcessorSync(EventType.SQS); const event: SQSEvent = { Records: records }; - const handler = async ( + const handler = ( event: SQSEvent, context: Context - ): Promise => { + ): PartialItemFailureResponse => { const options: BatchProcessingOptions = { context: context }; - return await asyncProcessPartialResponse( + return processPartialResponseSync( event, - asyncHandlerWithContext, + handlerWithContext, processor, options ); }; // Act - const result = await handler(event, context); + const result = handler(event, context); // Assess expect(result).toStrictEqual({ batchItemFailures: [] });