Skip to content

feat(batch): rename AsyncBatchProcessor to default BatchProcessor #1683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/snippets/batch/accessProcessedMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const handler = async (
const batch = event.Records; // (1)!

processor.register(batch, recordHandler, { context }); // (2)!
const processedMessages = processor.process();
const processedMessages = await processor.process();

for (const message of processedMessages) {
const status: 'success' | 'fail' = message[0];
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/batch/advancedTracingRecordHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type {
const processor = new BatchProcessor(EventType.SQS);
const tracer = new Tracer({ serviceName: 'serverlessAirline' });

const recordHandler = (record: SQSRecord): void => {
const recordHandler = async (record: SQSRecord): Promise<void> => {
const subsegment = tracer.getSegment()?.addNewSubsegment('### recordHandler'); // (1)!
subsegment?.addAnnotation('messageId', record.messageId); // (2)!

Expand Down
16 changes: 9 additions & 7 deletions docs/snippets/batch/customPartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
this.#tableName = tableName;
}

public async asyncProcessRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented');
}

/**
* It's called once, **after** processing the batch.
*
Expand Down Expand Up @@ -64,13 +58,21 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
this.successMessages = [];
}

public async processRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented');
}

/**
* It handles how your record is processed.
*
* Here we are keeping the status of each run, `this.handler` is
* the function that is passed when calling `processor.register()`.
*/
public processRecord(record: BaseRecord): SuccessResponse | FailureResponse {
public processRecordSync(
record: BaseRecord
): SuccessResponse | FailureResponse {
try {
const result = this.handler(record);

Expand Down
8 changes: 4 additions & 4 deletions docs/snippets/batch/gettingStartedAsync.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
AsyncBatchProcessor,
BatchProcessor,
EventType,
asyncProcessPartialResponse,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import axios from 'axios'; // axios is an external dependency
import type {
Expand All @@ -11,7 +11,7 @@ import type {
SQSBatchResponse,
} from 'aws-lambda';

const processor = new AsyncBatchProcessor(EventType.SQS);
const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (record: SQSRecord): Promise<number> => {
const res = await axios.post('https://httpbin.org/anything', {
Expand All @@ -25,7 +25,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return await asyncProcessPartialResponse(event, recordHandler, processor, {
return await processPartialResponse(event, recordHandler, processor, {
context,
});
};
2 changes: 1 addition & 1 deletion docs/snippets/batch/gettingStartedDynamoDBStreams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type {
const processor = new BatchProcessor(EventType.DynamoDBStreams); // (1)!
const logger = new Logger();

const recordHandler = (record: DynamoDBRecord): void => {
const recordHandler = async (record: DynamoDBRecord): Promise<void> => {
if (record.dynamodb && record.dynamodb.NewImage) {
logger.info('Processing record', { record: record.dynamodb.NewImage });
const message = record.dynamodb.NewImage.Message.S;
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/batch/gettingStartedErrorHandling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class InvalidPayload extends Error {
}
}

const recordHandler = (record: SQSRecord): void => {
const recordHandler = async (record: SQSRecord): Promise<void> => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/batch/gettingStartedKinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type {
const processor = new BatchProcessor(EventType.KinesisDataStreams); // (1)!
const logger = new Logger();

const recordHandler = (record: KinesisStreamRecord): void => {
const recordHandler = async (record: KinesisStreamRecord): Promise<void> => {
logger.info('Processing record', { record: record.kinesis.data });
const payload = JSON.parse(record.kinesis.data);
logger.info('Processed item', { item: payload });
Expand Down
2 changes: 1 addition & 1 deletion docs/snippets/batch/gettingStartedSQS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const processor = new BatchProcessor(EventType.SQS); // (1)!
const logger = new Logger();

// prettier-ignore
const recordHandler = (record: SQSRecord): void => { // (2)!
const recordHandler = async (record: SQSRecord): Promise<void> => { // (2)!
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
Expand Down
4 changes: 2 additions & 2 deletions docs/snippets/batch/gettingStartedSQSFifo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
SqsFifoPartialProcessor,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand All @@ -25,7 +25,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponse(event, recordHandler, processor, {
return processPartialResponseSync(event, recordHandler, processor, {
context,
});
};
69 changes: 40 additions & 29 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ journey
Records expired: 1: Failure
```

This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:
This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:

<!-- markdownlint-disable MD013 -->
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
Expand All @@ -69,11 +69,11 @@ This behavior changes when you enable Report Batch Item Failures feature in your

For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed.

You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.

### Required resources

The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries.

!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."

Expand Down Expand Up @@ -137,14 +137,18 @@ Processing batches from SQS works in three stages:
#### FIFO queues

When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
This helps preserve the ordering of messages in your queue.
This helps preserve the ordering of messages in your queue.

```typescript hl_lines="1-4 13 28-30"
--8<-- "docs/snippets/batch/gettingStartedSQSFifo.ts"
```

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)

!!! Note
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details.

### Processing messages from Kinesis

Processing batches from Kinesis works in three stages:
Expand Down Expand Up @@ -225,7 +229,7 @@ By default, we catch any exception raised by your record handler function. This
--8<--
```

1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor)
1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor)

2. Exceptions raised in `record_handler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).

Expand Down Expand Up @@ -356,21 +360,29 @@ sequenceDiagram
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
</center>

### Processing messages asynchronously
### Async or sync processing

You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently.
There are two processors you can use with this utility:

???+ question "When is this useful?"
Your use case might be able to process multiple records at the same time without conflicting with one another.
* **`BatchProcessor`** and **`processPartialResponse`** – Processes messages asynchronously
* **`BatchProcessorSync`** and **`processPartialResponseSync`** – Processes messages synchronously

In most cases your function will be `async` returning a `Promise`. Therefore, the `BatchProcessor` is the default processor handling your batch records asynchronously.
There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another.
For such cases we recommend to use the `BatchProcessorSync` and `processPartialResponseSync` functions.

!!! info "Note that you need match your processing function with the right batch processor"
* If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse`
* If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync`

The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order.

???+ question "When is this useful?"

For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.

The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).

```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor"
--8<-- "docs/snippets/batch/gettingStartedAsync.ts"
```

## Advanced

### Accessing processed messages
Expand All @@ -380,6 +392,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur
* **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record
* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record


```typescript hl_lines="25 27-28 30-33 38" title="Accessing processed messages"
--8<-- "docs/snippets/batch/accessProcessedMessages.ts"
```
Expand All @@ -391,7 +404,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur

Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out.

We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function.
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function.

```typescript hl_lines="17 35"
--8<-- "docs/snippets/batch/accessLambdaContext.ts"
Expand All @@ -408,14 +421,14 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc

???+ example
Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing

```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor"
--8<-- "docs/snippets/batch/extendingFailure.ts"
```
```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor"
--8<-- "docs/snippets/batch/extendingFailure.ts"
```

### Create your own partial processor

You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `asyncProcessRecord()` abstract methods.
You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods.

<center>
```mermaid
Expand All @@ -426,28 +439,26 @@ classDiagram
+prepare()
+clean()
+processRecord(record: BaseRecord)
+asyncProcessRecord(record: BaseRecord)
+processRecordSync(record: BaseRecord)
}

class YourCustomProcessor {
+prepare()
+clean()
+processRecord(record: BaseRecord)
+asyncProcessRecord(record: BaseRecord)
+processRecordSyc(record: BaseRecord)
}

BasePartialProcessor <|-- YourCustomProcessor : extends
```
<i>Visual representation to bring your own processor</i>
</center>

* **`processRecord()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)
* **`prepare()`** – called once as part of the processor initialization
* **`clean()`** – teardown logic called once after `processRecord` completes
* **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic

You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function.
* **`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
* **`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)

You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function.

```typescript hl_lines="21 30 41 62 73 84" title="Creating a custom batch processor"
--8<-- "docs/snippets/batch/customPartialProcessor.ts"
```
Expand All @@ -456,7 +467,7 @@ You can then use this class as a context manager, or pass it to `processPartialR

You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without calling `tracer.setSegment(subsegment)`. This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing.

```ts
```typescript
--8<-- "docs/snippets/batch/advancedTracingRecordHandler.ts"
```

Expand All @@ -466,7 +477,7 @@ You can use Tracer to create subsegments for each batch record processed. To do

## Testing your code

As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.
As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily.

**Example**:

Expand Down
Loading