Skip to content

Commit 3e0f0b0

Browse files
authored
Merge branch 'main' into fix/idempotency_error_identity
2 parents 23739df + 74198ef commit 3e0f0b0

8 files changed

+333
-5
lines changed

docs/utilities/batch.md

+13-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if
261261

262262
* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`
263263
* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing
264-
* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing
264+
* **All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled.
265265

266266
The following sequence diagrams explain how each Batch processor behaves under different scenarios.
267267

@@ -450,6 +450,18 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam
450450
--8<-- "examples/snippets/batch/accessLambdaContext.ts"
451451
```
452452

453+
### Working with full batch failures
454+
455+
By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.
456+
457+
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.
458+
459+
For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling.
460+
461+
```typescript hl_lines="17"
462+
--8<-- "examples/snippets/batch/noThrowOnFullBatchFailure.ts"
463+
```
464+
453465
### Extending BatchProcessor
454466

455467
You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import {
2+
BatchProcessor,
3+
EventType,
4+
processPartialResponse,
5+
} from '@aws-lambda-powertools/batch';
6+
import type { SQSHandler, SQSRecord } from 'aws-lambda';
7+
8+
const processor = new BatchProcessor(EventType.SQS);
9+
10+
const recordHandler = async (_record: SQSRecord): Promise<void> => {
11+
// Process the record
12+
};
13+
14+
export const handler: SQSHandler = async (event, context) =>
15+
processPartialResponse(event, recordHandler, processor, {
16+
context,
17+
throwOnFullBatchFailure: false,
18+
});

packages/batch/src/BasePartialBatchProcessor.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
6363
/**
6464
* Clean up logic to be run after processing a batch
6565
*
66-
* If the entire batch failed, and the utility is not configured otherwise,
67-
* this method will throw a `FullBatchFailureError` with the list of errors
68-
* that occurred during processing.
66+
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of
67+
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`.
6968
*
7069
* Otherwise, it will build the partial failure response based on the event type.
7170
*/
@@ -74,7 +73,10 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
7473
return;
7574
}
7675

77-
if (this.entireBatchFailed()) {
76+
if (
77+
this.options?.throwOnFullBatchFailure !== false &&
78+
this.entireBatchFailed()
79+
) {
7880
throw new FullBatchFailureError(this.errors);
7981
}
8082

packages/batch/src/processPartialResponse.ts

+25
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,31 @@ import type {
4242
* });
4343
* ```
4444
*
45+
* By default, if the entire batch fails, the function will throw an error.
46+
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
47+
*
48+
* @example
49+
* ```typescript
50+
* import {
51+
* BatchProcessor,
52+
* EventType,
53+
* processPartialResponse,
54+
* } from '@aws-lambda-powertools/batch';
55+
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda';
56+
*
57+
* const processor = new BatchProcessor(EventType.KinesisDataStreams);
58+
*
59+
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => {
60+
* const payload = JSON.parse(record.kinesis.data);
61+
* };
62+
*
63+
* export const handler: KinesisStreamHandler = async (event, context) =>
64+
* processPartialResponse(event, recordHandler, processor, {
65+
* context,
66+
* throwOnFullBatchFailure: false
67+
* });
68+
* ```
69+
*
4570
* @param event The event object containing the batch of records
4671
* @param recordHandler Async function to process each record from the batch
4772
* @param processor Batch processor instance to handle the batch processing

packages/batch/src/processPartialResponseSync.ts

+24
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,30 @@ import type {
6868
* });
6969
* ```
7070
*
71+
* By default, if the entire batch fails, the function will throw an error.
72+
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
73+
*
74+
* @example
75+
* ```typescript
76+
* import {
77+
* SqsFifoPartialProcessor,
78+
* processPartialResponseSync,
79+
* } from '@aws-lambda-powertools/batch';
80+
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
81+
*
82+
* const processor = new SqsFifoPartialProcessor();
83+
*
84+
* const recordHandler = async (record: SQSRecord): Promise<void> => {
85+
* const payload = JSON.parse(record.body);
86+
* };
87+
*
88+
* export const handler: SQSHandler = async (event, context) =>
89+
* processPartialResponseSync(event, recordHandler, processor, {
90+
* context,
91+
* throwOnFullBatchFailure: false
92+
* });
93+
* ```
94+
*
7195
* @param event The event object containing the batch of records
7296
* @param recordHandler Sync function to process each record from the batch
7397
* @param processor Batch processor instance to handle the batch processing

packages/batch/src/types.ts

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
1313
* @template T The type of the batch processor, defaults to BasePartialBatchProcessor
1414
* @property context The context object provided by the AWS Lambda runtime
1515
* @property skipGroupOnError The option to group on error during processing
16+
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails
1617
*/
1718
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
1819
/**
@@ -25,6 +26,10 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
2526
* If true skip the group on error during processing.
2627
*/
2728
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never;
29+
/**
30+
* Set this to false to prevent throwing an error if the entire batch fails.
31+
*/
32+
throwOnFullBatchFailure?: boolean;
2833
};
2934

3035
/**

packages/batch/tests/unit/processPartialResponse.test.ts

+123
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
processPartialResponse,
1616
EventType,
1717
UnexpectedBatchTypeError,
18+
FullBatchFailureError,
1819
} from '../../src/index.js';
1920
import type {
2021
BatchProcessingOptions,
@@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => {
9091
// Assess
9192
expect(ret).toStrictEqual({ batchItemFailures: [] });
9293
});
94+
95+
test('Process partial response function call with asynchronous handler for full batch failure', async () => {
96+
// Prepare
97+
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
98+
const batch = { Records: records };
99+
const processor = new BatchProcessor(EventType.SQS);
100+
101+
// Act & Assess
102+
await expect(
103+
processPartialResponse(batch, asyncSqsRecordHandler, processor)
104+
).rejects.toThrow(FullBatchFailureError);
105+
});
106+
107+
test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => {
108+
// Prepare
109+
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
110+
const batch = { Records: records };
111+
const processor = new BatchProcessor(EventType.SQS);
112+
113+
// Act & Assess
114+
await expect(
115+
processPartialResponse(batch, asyncSqsRecordHandler, processor, {
116+
...options,
117+
throwOnFullBatchFailure: true,
118+
})
119+
).rejects.toThrow(FullBatchFailureError);
120+
});
121+
122+
test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => {
123+
// Prepare
124+
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
125+
const batch = { Records: records };
126+
const processor = new BatchProcessor(EventType.SQS);
127+
128+
// Act
129+
const response = await processPartialResponse(
130+
batch,
131+
asyncSqsRecordHandler,
132+
processor,
133+
{
134+
...options,
135+
throwOnFullBatchFailure: false,
136+
}
137+
);
138+
139+
// Assess
140+
expect(response).toStrictEqual({
141+
batchItemFailures: [
142+
{ itemIdentifier: records[0].messageId },
143+
{ itemIdentifier: records[1].messageId },
144+
],
145+
});
146+
});
93147
});
94148

95149
describe('Process partial response function call through handler', () => {
@@ -228,5 +282,74 @@ describe('Function: processPartialResponse()', () => {
228282
// Assess
229283
expect(result).toStrictEqual({ batchItemFailures: [] });
230284
});
285+
286+
test('Process partial response through handler for full batch failure', async () => {
287+
// Prepare
288+
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
289+
const processor = new BatchProcessor(EventType.SQS);
290+
const event: SQSEvent = { Records: records };
291+
292+
const handler = async (
293+
event: SQSEvent,
294+
_context: Context
295+
): Promise<PartialItemFailureResponse> => {
296+
return processPartialResponse(event, asyncSqsRecordHandler, processor);
297+
};
298+
299+
// Act & Assess
300+
await expect(handler(event, context)).rejects.toThrow(
301+
FullBatchFailureError
302+
);
303+
});
304+
305+
test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => {
306+
// Prepare
307+
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
308+
const processor = new BatchProcessor(EventType.SQS);
309+
const event: SQSEvent = { Records: records };
310+
311+
const handler = async (
312+
event: SQSEvent,
313+
_context: Context
314+
): Promise<PartialItemFailureResponse> => {
315+
return processPartialResponse(event, asyncSqsRecordHandler, processor, {
316+
...options,
317+
throwOnFullBatchFailure: true,
318+
});
319+
};
320+
321+
// Act & Assess
322+
await expect(handler(event, context)).rejects.toThrow(
323+
FullBatchFailureError
324+
);
325+
});
326+
327+
test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => {
328+
// Prepare
329+
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
330+
const processor = new BatchProcessor(EventType.SQS);
331+
const event: SQSEvent = { Records: records };
332+
333+
const handler = async (
334+
event: SQSEvent,
335+
_context: Context
336+
): Promise<PartialItemFailureResponse> => {
337+
return processPartialResponse(event, asyncSqsRecordHandler, processor, {
338+
...options,
339+
throwOnFullBatchFailure: false,
340+
});
341+
};
342+
343+
// Act
344+
const response = await handler(event, context);
345+
346+
// Assess
347+
expect(response).toStrictEqual({
348+
batchItemFailures: [
349+
{ itemIdentifier: records[0].messageId },
350+
{ itemIdentifier: records[1].messageId },
351+
],
352+
});
353+
});
231354
});
232355
});

0 commit comments

Comments
 (0)