Skip to content

feat(batch): simplify BatchProcessor for async and sync functions #1682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/snippets/batch/accessProcessedMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const handler = async (
const batch = event.Records; // (1)!

processor.register(batch, recordHandler, { context }); // (2)!
const processedMessages = processor.process();
const processedMessages = await processor.process();

for (const message of processedMessages) {
const status: 'success' | 'fail' = message[0];
Expand Down
4 changes: 2 additions & 2 deletions docs/snippets/batch/advancedTracingRecordHandler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Tracer, captureLambdaHandler } from '@aws-lambda-powertools/tracer';
import middy from '@middy/core';
Expand Down Expand Up @@ -36,7 +36,7 @@ const recordHandler = (record: SQSRecord): void => {

export const handler = middy(
async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
}
Expand Down
20 changes: 11 additions & 9 deletions docs/snippets/batch/customPartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { marshall } from '@aws-sdk/util-dynamodb';
import {
EventType,
BasePartialBatchProcessor,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import type {
SuccessResponse,
Expand All @@ -27,12 +27,6 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
this.#tableName = tableName;
}

public async asyncProcessRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented');
}

/**
* It's called once, **after** processing the batch.
*
Expand Down Expand Up @@ -64,13 +58,21 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
this.successMessages = [];
}

public async processRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented');
}

/**
* It handles how your record is processed.
*
* Here we are keeping the status of each run, `this.handler` is
* the function that is passed when calling `processor.register()`.
*/
public processRecord(record: BaseRecord): SuccessResponse | FailureResponse {
public processRecordSync(
record: BaseRecord
): SuccessResponse | FailureResponse {
try {
const result = this.handler(record);

Expand All @@ -91,7 +93,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
};
4 changes: 2 additions & 2 deletions docs/snippets/batch/extendingFailure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
EventType,
FailureResponse,
EventSourceType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand Down Expand Up @@ -47,7 +47,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
};
8 changes: 4 additions & 4 deletions docs/snippets/batch/gettingStartedAsync.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
AsyncBatchProcessor,
BatchProcessor,
EventType,
asyncProcessPartialResponse,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import axios from 'axios'; // axios is an external dependency
import type {
Expand All @@ -11,7 +11,7 @@ import type {
SQSBatchResponse,
} from 'aws-lambda';

const processor = new AsyncBatchProcessor(EventType.SQS);
const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (record: SQSRecord): Promise<number> => {
const res = await axios.post('https://httpbin.org/anything', {
Expand All @@ -25,7 +25,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return await asyncProcessPartialResponse(event, recordHandler, processor, {
return await processPartialResponse(event, recordHandler, processor, {
context,
});
};
4 changes: 2 additions & 2 deletions docs/snippets/batch/gettingStartedDynamoDBStreams.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand Down Expand Up @@ -29,7 +29,7 @@ export const handler = async (
event: DynamoDBStreamEvent,
context: Context
): Promise<DynamoDBBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
};
4 changes: 2 additions & 2 deletions docs/snippets/batch/gettingStartedErrorHandling.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand Down Expand Up @@ -37,7 +37,7 @@ export const handler = async (
context: Context
): Promise<SQSBatchResponse> => {
// prettier-ignore
return processPartialResponse(event, recordHandler, processor, { // (2)!
return processPartialResponseSync(event, recordHandler, processor, { // (2)!
context,
});
};
4 changes: 2 additions & 2 deletions docs/snippets/batch/gettingStartedKinesis.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand All @@ -24,7 +24,7 @@ export const handler = async (
event: KinesisStreamEvent,
context: Context
): Promise<KinesisStreamBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
};
4 changes: 2 additions & 2 deletions docs/snippets/batch/gettingStartedSQS.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand All @@ -28,7 +28,7 @@ export const handler = async (
context: Context
): Promise<SQSBatchResponse> => {
// prettier-ignore
return processPartialResponse(event, recordHandler, processor, { // (3)!
return processPartialResponseSync(event, recordHandler, processor, { // (3)!
context,
});
};
Expand Down
4 changes: 2 additions & 2 deletions docs/snippets/batch/gettingStartedSQSFifo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
SqsFifoPartialProcessor,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand All @@ -25,7 +25,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
};
52 changes: 26 additions & 26 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
BatchProcessorSync: Batch Processor
RecordHandler: Record Handler function
YourLogic: Your logic to process each batch item
LambdaResponse: Lambda response

BatchSource --> LambdaInit

LambdaInit --> BatchProcessor
BatchProcessor --> RecordHandler
LambdaInit --> BatchProcessorSync
BatchProcessorSync --> RecordHandler

state BatchProcessor {
state BatchProcessorSync {
[*] --> RecordHandler: Your function
RecordHandler --> YourLogic
}

RecordHandler --> BatchProcessor: Collect results
BatchProcessor --> LambdaResponse: Report items that failed processing
RecordHandler --> BatchProcessorSync: Collect results
BatchProcessorSync --> LambdaResponse: Report items that failed processing
```

## Key features
Expand Down Expand Up @@ -99,9 +99,9 @@ The remaining sections of the documentation will rely on these samples. For comp

Processing batches from SQS works in three stages:

1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type
1. Instantiate **`BatchProcessorSync`** and choose **`EventType.SQS`** for the event type
2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion
3. Use **`processPartialResponse`** to kick off processing
3. Use **`processPartialResponseSync`** to kick off processing

???+ info
This code example optionally uses Logger for completion.
Expand Down Expand Up @@ -149,9 +149,9 @@ This helps preserve the ordering of messages in your queue.

Processing batches from Kinesis works in three stages:

1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type
1. Instantiate **`BatchProcessorSync`** and choose **`EventType.KinesisDataStreams`** for the event type
2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion
3. Use **`processPartialResponse`** to kick off processing
3. Use **`processPartialResponseSync`** to kick off processing

???+ info
This code example optionally uses Logger for completion.
Expand Down Expand Up @@ -182,9 +182,9 @@ Processing batches from Kinesis works in three stages:

Processing batches from DynamoDB Streams works in three stages:

1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type
1. Instantiate **`BatchProcessorSync`** and choose **`EventType.DynamoDBStreams`** for the event type
2. Define your function to handle each batch record, and use the `DynamoDBRecord` type annotation for autocompletion
3. Use **`processPartialResponse`** to kick off processing
3. Use **`processPartialResponseSync`** to kick off processing

???+ info
This code example optionally uses Logger for completion.
Expand Down Expand Up @@ -225,7 +225,7 @@ By default, we catch any exception raised by your record handler function. This
--8<--
```

1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor)
1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor)

2. Exceptions raised in `record_handler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).

Expand All @@ -249,7 +249,7 @@ The following sequence diagrams explain how each Batch processor behaves under d

> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.

Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues.
Sequence diagram to explain how [`BatchProcessorSync` works](#processing-messages-from-sqs) with SQS Standard queues.

<center>
```mermaid
Expand Down Expand Up @@ -302,7 +302,7 @@ sequenceDiagram

> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.

Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).
Sequence diagram to explain how `BatchProcessorSync` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).

For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}

Expand Down Expand Up @@ -358,7 +358,7 @@ sequenceDiagram

### Processing messages asynchronously

You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently.
You can use `BatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently.

???+ question "When is this useful?"
Your use case might be able to process multiple records at the same time without conflicting with one another.
Expand All @@ -367,15 +367,15 @@ You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` functi

The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).

```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor"
```typescript hl_lines="1-5 14 28-30" title="High-concurrency with BatchProcessor"
--8<-- "docs/snippets/batch/gettingStartedAsync.ts"
```

## Advanced

### Accessing processed messages

Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
Use the `BatchProcessorSync` directly in your function to access a list of all returned values from your `recordHandler` function.

* **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record
* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record
Expand All @@ -384,32 +384,32 @@ Use the `BatchProcessor` directly in your function to access a list of all retur
--8<-- "docs/snippets/batch/accessProcessedMessages.ts"
```

1. The processor requires the records array. This is typically handled by `processPartialResponse`.
1. The processor requires the records array. This is typically handled by `processPartialResponseSync`.
2. You need to register the `batch`, the `recordHandler` function, and optionally the `context` to access the Lambda context.

### Accessing Lambda Context

Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out.

We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function.
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function.

```typescript hl_lines="17 35"
--8<-- "docs/snippets/batch/accessLambdaContext.ts"
```

### Extending BatchProcessor
### Extending BatchProcessorSync

You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
You might want to bring custom logic to the existing `BatchProcessorSync` to slightly override how we handle successes and failures.

For these scenarios, you can subclass `BatchProcessor` and quickly override `successHandler` and `failureHandler` methods:
For these scenarios, you can subclass `BatchProcessorSync` and quickly override `successHandler` and `failureHandler` methods:

* **`successHandler()`** – Keeps track of successful batch records
* **`failureHandler()`** – Keeps track of failed batch records

???+ example
Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing

```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor"
```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessorSync"
--8<-- "docs/snippets/batch/extendingFailure.ts"
```

Expand Down Expand Up @@ -446,7 +446,7 @@ classDiagram
* **`clean()`** – teardown logic called once after `processRecord` completes
* **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic

You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function.
You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function.

```typescript hl_lines="21 30 41 62 73 84" title="Creating a custom batch processor"
--8<-- "docs/snippets/batch/customPartialProcessor.ts"
Expand All @@ -466,7 +466,7 @@ You can use Tracer to create subsegments for each batch record processed. To do

## Testing your code

As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.
As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily.

**Example**:

Expand Down
Loading