Skip to content

Commit 536e7d9

Browse files
author
Alexander Melnyk
committed
combine async and sync processing functions in one class
1 parent d1f7408 commit 536e7d9

25 files changed

+770
-909
lines changed

docs/snippets/batch/accessProcessedMessages.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const handler = async (
2525
const batch = event.Records; // (1)!
2626

2727
processor.register(batch, recordHandler, { context }); // (2)!
28-
const processedMessages = processor.process();
28+
const processedMessages = await processor.process();
2929

3030
for (const message of processedMessages) {
3131
const status: 'success' | 'fail' = message[0];

docs/snippets/batch/advancedTracingRecordHandler.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
BatchProcessor,
33
EventType,
4-
processPartialResponse,
4+
processPartialResponseSync,
55
} from '@aws-lambda-powertools/batch';
66
import { Tracer, captureLambdaHandler } from '@aws-lambda-powertools/tracer';
77
import middy from '@middy/core';
@@ -36,7 +36,7 @@ const recordHandler = (record: SQSRecord): void => {
3636

3737
export const handler = middy(
3838
async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
39-
return processPartialResponse(event, recordHandler, processor, {
39+
return processPartialResponseSync(event, recordHandler, processor, {
4040
context,
4141
});
4242
}

docs/snippets/batch/customPartialProcessor.ts

+11-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { marshall } from '@aws-sdk/util-dynamodb';
77
import {
88
EventType,
99
BasePartialBatchProcessor,
10-
processPartialResponse,
10+
processPartialResponseSync,
1111
} from '@aws-lambda-powertools/batch';
1212
import type {
1313
SuccessResponse,
@@ -27,12 +27,6 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
2727
this.#tableName = tableName;
2828
}
2929

30-
public async asyncProcessRecord(
31-
_record: BaseRecord
32-
): Promise<SuccessResponse | FailureResponse> {
33-
throw new Error('Not implemented');
34-
}
35-
3630
/**
3731
* It's called once, **after** processing the batch.
3832
*
@@ -64,13 +58,21 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
6458
this.successMessages = [];
6559
}
6660

61+
public async processRecord(
62+
_record: BaseRecord
63+
): Promise<SuccessResponse | FailureResponse> {
64+
throw new Error('Not implemented');
65+
}
66+
6767
/**
6868
* It handles how your record is processed.
6969
*
7070
* Here we are keeping the status of each run, `this.handler` is
7171
* the function that is passed when calling `processor.register()`.
7272
*/
73-
public processRecord(record: BaseRecord): SuccessResponse | FailureResponse {
73+
public processRecordSync(
74+
record: BaseRecord
75+
): SuccessResponse | FailureResponse {
7476
try {
7577
const result = this.handler(record);
7678

@@ -91,7 +93,7 @@ export const handler = async (
9193
event: SQSEvent,
9294
context: Context
9395
): Promise<SQSBatchResponse> => {
94-
return processPartialResponse(event, recordHandler, processor, {
96+
return processPartialResponseSync(event, recordHandler, processor, {
9597
context,
9698
});
9799
};

docs/snippets/batch/extendingFailure.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
EventType,
55
FailureResponse,
66
EventSourceType,
7-
processPartialResponse,
7+
processPartialResponseSync,
88
} from '@aws-lambda-powertools/batch';
99
import { Logger } from '@aws-lambda-powertools/logger';
1010
import type {
@@ -47,7 +47,7 @@ export const handler = async (
4747
event: SQSEvent,
4848
context: Context
4949
): Promise<SQSBatchResponse> => {
50-
return processPartialResponse(event, recordHandler, processor, {
50+
return processPartialResponseSync(event, recordHandler, processor, {
5151
context,
5252
});
5353
};

docs/snippets/batch/gettingStartedAsync.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
2-
AsyncBatchProcessor,
2+
BatchProcessor,
33
EventType,
4-
asyncProcessPartialResponse,
4+
processPartialResponse,
55
} from '@aws-lambda-powertools/batch';
66
import axios from 'axios'; // axios is an external dependency
77
import type {
@@ -11,7 +11,7 @@ import type {
1111
SQSBatchResponse,
1212
} from 'aws-lambda';
1313

14-
const processor = new AsyncBatchProcessor(EventType.SQS);
14+
const processor = new BatchProcessor(EventType.SQS);
1515

1616
const recordHandler = async (record: SQSRecord): Promise<number> => {
1717
const res = await axios.post('https://httpbin.org/anything', {
@@ -25,7 +25,7 @@ export const handler = async (
2525
event: SQSEvent,
2626
context: Context
2727
): Promise<SQSBatchResponse> => {
28-
return await asyncProcessPartialResponse(event, recordHandler, processor, {
28+
return await processPartialResponse(event, recordHandler, processor, {
2929
context,
3030
});
3131
};

docs/snippets/batch/gettingStartedDynamoDBStreams.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
BatchProcessor,
33
EventType,
4-
processPartialResponse,
4+
processPartialResponseSync,
55
} from '@aws-lambda-powertools/batch';
66
import { Logger } from '@aws-lambda-powertools/logger';
77
import type {
@@ -29,7 +29,7 @@ export const handler = async (
2929
event: DynamoDBStreamEvent,
3030
context: Context
3131
): Promise<DynamoDBBatchResponse> => {
32-
return processPartialResponse(event, recordHandler, processor, {
32+
return processPartialResponseSync(event, recordHandler, processor, {
3333
context,
3434
});
3535
};

docs/snippets/batch/gettingStartedErrorHandling.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
BatchProcessor,
33
EventType,
4-
processPartialResponse,
4+
processPartialResponseSync,
55
} from '@aws-lambda-powertools/batch';
66
import { Logger } from '@aws-lambda-powertools/logger';
77
import type {
@@ -37,7 +37,7 @@ export const handler = async (
3737
context: Context
3838
): Promise<SQSBatchResponse> => {
3939
// prettier-ignore
40-
return processPartialResponse(event, recordHandler, processor, { // (2)!
40+
return processPartialResponseSync(event, recordHandler, processor, { // (2)!
4141
context,
4242
});
4343
};

docs/snippets/batch/gettingStartedKinesis.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
BatchProcessor,
33
EventType,
4-
processPartialResponse,
4+
processPartialResponseSync,
55
} from '@aws-lambda-powertools/batch';
66
import { Logger } from '@aws-lambda-powertools/logger';
77
import type {
@@ -24,7 +24,7 @@ export const handler = async (
2424
event: KinesisStreamEvent,
2525
context: Context
2626
): Promise<KinesisStreamBatchResponse> => {
27-
return processPartialResponse(event, recordHandler, processor, {
27+
return processPartialResponseSync(event, recordHandler, processor, {
2828
context,
2929
});
3030
};

docs/snippets/batch/gettingStartedSQS.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
BatchProcessor,
33
EventType,
4-
processPartialResponse,
4+
processPartialResponseSync,
55
} from '@aws-lambda-powertools/batch';
66
import { Logger } from '@aws-lambda-powertools/logger';
77
import type {
@@ -28,7 +28,7 @@ export const handler = async (
2828
context: Context
2929
): Promise<SQSBatchResponse> => {
3030
// prettier-ignore
31-
return processPartialResponse(event, recordHandler, processor, { // (3)!
31+
return processPartialResponseSync(event, recordHandler, processor, { // (3)!
3232
context,
3333
});
3434
};

docs/snippets/batch/gettingStartedSQSFifo.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
22
SqsFifoPartialProcessor,
3-
processPartialResponse,
3+
processPartialResponseSync,
44
} from '@aws-lambda-powertools/batch';
55
import { Logger } from '@aws-lambda-powertools/logger';
66
import type {
@@ -25,7 +25,7 @@ export const handler = async (
2525
event: SQSEvent,
2626
context: Context
2727
): Promise<SQSBatchResponse> => {
28-
return processPartialResponse(event, recordHandler, processor, {
28+
return processPartialResponseSync(event, recordHandler, processor, {
2929
context,
3030
});
3131
};

docs/utilities/batch.md

+26-26
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,23 @@ stateDiagram-v2
1313
direction LR
1414
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
1515
LambdaInit: Lambda invocation
16-
BatchProcessor: Batch Processor
16+
BatchProcessorSync: Batch Processor
1717
RecordHandler: Record Handler function
1818
YourLogic: Your logic to process each batch item
1919
LambdaResponse: Lambda response
2020
2121
BatchSource --> LambdaInit
2222
23-
LambdaInit --> BatchProcessor
24-
BatchProcessor --> RecordHandler
23+
LambdaInit --> BatchProcessorSync
24+
BatchProcessorSync --> RecordHandler
2525
26-
state BatchProcessor {
26+
state BatchProcessorSync {
2727
[*] --> RecordHandler: Your function
2828
RecordHandler --> YourLogic
2929
}
3030
31-
RecordHandler --> BatchProcessor: Collect results
32-
BatchProcessor --> LambdaResponse: Report items that failed processing
31+
RecordHandler --> BatchProcessorSync: Collect results
32+
BatchProcessorSync --> LambdaResponse: Report items that failed processing
3333
```
3434

3535
## Key features
@@ -99,9 +99,9 @@ The remaining sections of the documentation will rely on these samples. For comp
9999

100100
Processing batches from SQS works in three stages:
101101

102-
1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type
102+
1. Instantiate **`BatchProcessorSync`** and choose **`EventType.SQS`** for the event type
103103
2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion
104-
3. Use **`processPartialResponse`** to kick off processing
104+
3. Use **`processPartialResponseSync`** to kick off processing
105105

106106
???+ info
107107
This code example optionally uses Logger for completion.
@@ -149,9 +149,9 @@ This helps preserve the ordering of messages in your queue.
149149

150150
Processing batches from Kinesis works in three stages:
151151

152-
1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type
152+
1. Instantiate **`BatchProcessorSync`** and choose **`EventType.KinesisDataStreams`** for the event type
153153
2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion
154-
3. Use **`processPartialResponse`** to kick off processing
154+
3. Use **`processPartialResponseSync`** to kick off processing
155155

156156
???+ info
157157
This code example optionally uses Logger for completion.
@@ -182,9 +182,9 @@ Processing batches from Kinesis works in three stages:
182182

183183
Processing batches from DynamoDB Streams works in three stages:
184184

185-
1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type
185+
1. Instantiate **`BatchProcessorSync`** and choose **`EventType.DynamoDBStreams`** for the event type
186186
2. Define your function to handle each batch record, and use the `DynamoDBRecord` type annotation for autocompletion
187-
3. Use **`processPartialResponse`** to kick off processing
187+
3. Use **`processPartialResponseSync`** to kick off processing
188188

189189
???+ info
190190
This code example optionally uses Logger for completion.
@@ -225,7 +225,7 @@ By default, we catch any exception raised by your record handler function. This
225225
--8<--
226226
```
227227

228-
1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor)
228+
1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor)
229229

230230
2. Exceptions raised in `record_handler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).
231231

@@ -249,7 +249,7 @@ The following sequence diagrams explain how each Batch processor behaves under d
249249

250250
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
251251
252-
Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues.
252+
Sequence diagram to explain how [`BatchProcessorSync` works](#processing-messages-from-sqs) with SQS Standard queues.
253253

254254
<center>
255255
```mermaid
@@ -302,7 +302,7 @@ sequenceDiagram
302302

303303
> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
304304
305-
Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).
305+
Sequence diagram to explain how `BatchProcessorSync` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).
306306

307307
For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}
308308

@@ -358,7 +358,7 @@ sequenceDiagram
358358

359359
### Processing messages asynchronously
360360

361-
You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently.
361+
You can use `BatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently.
362362

363363
???+ question "When is this useful?"
364364
Your use case might be able to process multiple records at the same time without conflicting with one another.
@@ -367,15 +367,15 @@ You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` functi
367367

368368
The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
369369

370-
```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor"
370+
```typescript hl_lines="1-5 14 28-30" title="High-concurrency with BatchProcessor"
371371
--8<-- "docs/snippets/batch/gettingStartedAsync.ts"
372372
```
373373

374374
## Advanced
375375

376376
### Accessing processed messages
377377

378-
Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
378+
Use the `BatchProcessorSync` directly in your function to access a list of all returned values from your `recordHandler` function.
379379

380380
* **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record
381381
* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record
@@ -384,32 +384,32 @@ Use the `BatchProcessor` directly in your function to access a list of all retur
384384
--8<-- "docs/snippets/batch/accessProcessedMessages.ts"
385385
```
386386

387-
1. The processor requires the records array. This is typically handled by `processPartialResponse`.
387+
1. The processor requires the records array. This is typically handled by `processPartialResponseSync`.
388388
2. You need to register the `batch`, the `recordHandler` function, and optionally the `context` to access the Lambda context.
389389

390390
### Accessing Lambda Context
391391

392392
Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out.
393393

394-
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function.
394+
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function.
395395

396396
```typescript hl_lines="17 35"
397397
--8<-- "docs/snippets/batch/accessLambdaContext.ts"
398398
```
399399

400-
### Extending BatchProcessor
400+
### Extending BatchProcessorSync
401401

402-
You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
402+
You might want to bring custom logic to the existing `BatchProcessorSync` to slightly override how we handle successes and failures.
403403

404-
For these scenarios, you can subclass `BatchProcessor` and quickly override `successHandler` and `failureHandler` methods:
404+
For these scenarios, you can subclass `BatchProcessorSync` and quickly override `successHandler` and `failureHandler` methods:
405405

406406
* **`successHandler()`** – Keeps track of successful batch records
407407
* **`failureHandler()`** – Keeps track of failed batch records
408408

409409
???+ example
410410
Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing
411411

412-
```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor"
412+
```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessorSync"
413413
--8<-- "docs/snippets/batch/extendingFailure.ts"
414414
```
415415

@@ -446,7 +446,7 @@ classDiagram
446446
* **`clean()`** – teardown logic called once after `processRecord` completes
447447
* **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
448448

449-
You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function.
449+
You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function.
450450

451451
```typescript hl_lines="21 30 41 62 73 84" title="Creating a custom batch processor"
452452
--8<-- "docs/snippets/batch/customPartialProcessor.ts"
@@ -466,7 +466,7 @@ You can use Tracer to create subsegments for each batch record processed. To do
466466

467467
## Testing your code
468468

469-
As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.
469+
As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily.
470470

471471
**Example**:
472472

0 commit comments

Comments
 (0)