Skip to content

feat(batch): sequential async processing of records for BatchProcessor #3109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,10 @@ For such cases we recommend to use the `BatchProcessorSync` and `processPartialR
*If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse`
* If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync`

The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order.
The difference between the two processors is in how they handle record processing:

* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order.
* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one.

???+ question "When is this useful?"

Expand Down Expand Up @@ -477,6 +480,16 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba
--8<-- "examples/snippets/batch/extendingFailure.ts"
```

### Sequential async processing

By default, the `BatchProcessor` processes records in parallel using `Promise.all()`. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially.

!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel."

```typescript hl_lines="8 17" title="Sequential async processing"
--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts"
```

### Create your own partial processor

You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods.
Expand Down
18 changes: 18 additions & 0 deletions examples/snippets/batch/sequentialAsyncProcessing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (_record: SQSRecord): Promise<void> => {
// Process the record
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
processInParallel: false,
});
40 changes: 32 additions & 8 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ abstract class BasePartialProcessor {
public abstract prepare(): void;

/**
* Process all records with an asyncronous handler
* Process all records with an asynchronous handler
*
* Once called, the processor will create an array of promises to process each record
* and wait for all of them to settle before returning the results.
Expand All @@ -122,21 +122,21 @@ abstract class BasePartialProcessor {
}
this.prepare();

const processingPromises: Promise<SuccessResponse | FailureResponse>[] =
this.records.map((record) => this.processRecord(record));

const processedRecords: (SuccessResponse | FailureResponse)[] =
await Promise.all(processingPromises);
// Default to `true` if `processInParallel` is not specified.
const processInParallel = this.options?.processInParallel ?? true;
const processedRecords = processInParallel
? await this.#processRecordsInParallel()
: await this.#processRecordsSequentially();

this.clean();

return processedRecords;
}

/**
* Process a record with an asyncronous handler
* Process a record with an asynchronous handler
*
* An implementation of this method is required for asyncronous processors.
* An implementation of this method is required for asynchronous processors.
*
* When implementing this method, you should at least call the successHandler method
* when a record succeeds processing and the failureHandler method when a record
Expand Down Expand Up @@ -249,6 +249,30 @@ abstract class BasePartialProcessor {

return entry;
}

/**
* Processes records in parallel using `Promise.all`.
*/
async #processRecordsInParallel(): Promise<
(SuccessResponse | FailureResponse)[]
> {
return Promise.all(
this.records.map((record) => this.processRecord(record))
);
}

/**
* Processes records sequentially, ensuring that each record is processed one after the other.
*/
async #processRecordsSequentially(): Promise<
(SuccessResponse | FailureResponse)[]
> {
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
for (const record of this.records) {
processedRecords.push(await this.processRecord(record));
}
return processedRecords;
}
}

export { BasePartialProcessor };
7 changes: 7 additions & 0 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
* @property context The context object provided by the AWS Lambda runtime
* @property skipGroupOnError The option to group on error during processing
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails
* @property processInParallel Indicates whether the records should be processed in parallel
*/
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
/**
Expand All @@ -30,6 +31,12 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
* Set this to false to prevent throwing an error if the entire batch fails.
*/
throwOnFullBatchFailure?: boolean;
/**
* Indicates whether the records should be processed in parallel.
* When set to `true`, the records will be processed in parallel using `Promise.all`.
* When set to `false`, the records will be processed sequentially.
*/
processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean;
};

/**
Expand Down
Loading