forked from aws-powertools/powertools-lambda-typescript
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessPartialResponseSync.ts
117 lines (112 loc) · 3.9 KB
/
processPartialResponseSync.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
import { UnexpectedBatchTypeError } from './errors.js';
import type {
BaseRecord,
BatchProcessingOptions,
PartialItemFailureResponse,
} from './types.js';
/**
* Higher level function to process a batch of records synchronously
* and handle partial failure cases.
*
* This function is intended to be used within synchronous Lambda handlers
* and together with a batch processor that implements the {@link BasePartialBatchProcessor}
* interface.
*
* It accepts a batch of records, a record handler function, a batch processor,
* and an optional set of options to configure the batch processing.
*
* By default, the function will process the batch of records synchronously
* and in sequence. If you need to process the records asynchronously, you can
* use the {@link processPartialResponse} function instead.
*
* @example
* ```typescript
* import {
* BatchProcessor,
* EventType,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new BatchProcessor(EventType.SQS);
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* });
* ```
*
* When working with SQS FIFO queues, we will stop processing at the first failure
* and mark unprocessed messages as failed to preserve ordering. However, if you want to
* continue processing messages from different group IDs, you can enable the `skipGroupOnError`
* option for seamless processing of messages from various group IDs.
*
* @example
* ```typescript
* import {
* SqsFifoPartialProcessor,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessor();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* skipGroupOnError: true
* });
* ```
*
* By default, if the entire batch fails, the function will throw an error.
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
*
* @example
* ```typescript
* import {
* SqsFifoPartialProcessor,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessor();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* throwOnFullBatchFailure: false
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Sync function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
* @param options Batch processing options, which can vary with chosen batch processor implementation
*/
const processPartialResponseSync = <T extends BasePartialBatchProcessor>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: T,
options?: BatchProcessingOptions<T>
): PartialItemFailureResponse => {
if (!event.Records || !Array.isArray(event.Records)) {
throw new UnexpectedBatchTypeError();
}
processor.register(event.Records, recordHandler, options);
processor.processSync();
return processor.response();
};
export { processPartialResponseSync };