Skip to content

Commit e73b575

Browse files
arnabrahmanam29d
andauthored
feat(batch): Async Processing of Records for for SQS Fifo (#3160)
Co-authored-by: Alexander Schueren <[email protected]> Co-authored-by: Alexander Schueren <[email protected]>
1 parent 3789076 commit e73b575

9 files changed

+478
-181
lines changed

Diff for: docs/utilities/batch.md

+9-3
Original file line numberDiff line numberDiff line change
@@ -149,21 +149,27 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va
149149

150150
=== "Recommended"
151151

152-
```typescript hl_lines="1-4 8"
152+
```typescript hl_lines="1-4 8 20"
153153
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
154154
```
155155

156156
1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)
157157

158+
=== "Async processing"
159+
160+
```typescript hl_lines="1-4 8 20"
161+
--8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts"
162+
```
163+
158164
=== "Enabling skipGroupOnError flag"
159165

160166
```typescript hl_lines="1-4 13 30"
161167
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
162168
```
163169

164170
!!! Note
165-
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
166-
This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details.
171+
Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`.
172+
If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`.
167173

168174
### Processing messages from Kinesis
169175

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import {
2+
SqsFifoPartialProcessorAsync,
3+
processPartialResponse,
4+
} from '@aws-lambda-powertools/batch';
5+
import { Logger } from '@aws-lambda-powertools/logger';
6+
import type { SQSHandler, SQSRecord } from 'aws-lambda';
7+
8+
const processor = new SqsFifoPartialProcessorAsync();
9+
const logger = new Logger();
10+
11+
const recordHandler = async (record: SQSRecord): Promise<void> => {
12+
const payload = record.body;
13+
if (payload) {
14+
const item = JSON.parse(payload);
15+
logger.info('Processed item', { item });
16+
}
17+
};
18+
19+
export const handler: SQSHandler = async (event, context) =>
20+
processPartialResponse(event, recordHandler, processor, {
21+
context,
22+
});

Diff for: packages/batch/src/SqsFifoPartialProcessor.ts

+12-42
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { SQSRecord } from 'aws-lambda';
22
import { BatchProcessorSync } from './BatchProcessorSync.js';
3+
import { SqsFifoProcessor } from './SqsFifoProcessor.js';
34
import { EventType } from './constants.js';
45
import {
56
type BatchProcessingError,
@@ -46,17 +47,13 @@ import type {
4647
*/
4748
class SqsFifoPartialProcessor extends BatchProcessorSync {
4849
/**
49-
* The ID of the current message group being processed.
50+
* Processor for handling SQS FIFO message
5051
*/
51-
#currentGroupId?: string;
52-
/**
53-
* A set of group IDs that have already encountered failures.
54-
*/
55-
#failedGroupIds: Set<string>;
52+
readonly #processor: SqsFifoProcessor;
5653

5754
public constructor() {
5855
super(EventType.SQS);
59-
this.#failedGroupIds = new Set<string>();
56+
this.#processor = new SqsFifoProcessor();
6057
}
6158

6259
/**
@@ -70,9 +67,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
7067
record: EventSourceDataClassTypes,
7168
exception: Error
7269
): FailureResponse {
73-
if (this.options?.skipGroupOnError && this.#currentGroupId) {
74-
this.#addToFailedGroup(this.#currentGroupId);
75-
}
70+
this.#processor.processFailureForCurrentGroup(this.options);
7671

7772
return super.failureHandler(record, exception);
7873
}
@@ -101,24 +96,17 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
10196
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
10297
let currentIndex = 0;
10398
for (const record of this.records) {
104-
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);
99+
this.#processor.setCurrentGroup(
100+
(record as SQSRecord).attributes?.MessageGroupId
101+
);
105102

106-
// If we have any failed messages, we should then short circuit the process and
107-
// fail remaining messages unless `skipGroupOnError` is true
108-
const shouldShortCircuit =
109-
!this.options?.skipGroupOnError && this.failureMessages.length !== 0;
110-
if (shouldShortCircuit) {
103+
if (
104+
this.#processor.shouldShortCircuit(this.failureMessages, this.options)
105+
) {
111106
return this.shortCircuitProcessing(currentIndex, processedRecords);
112107
}
113108

114-
// If `skipGroupOnError` is true and the current group has previously failed,
115-
// then we should skip processing the current group.
116-
const shouldSkipCurrentGroup =
117-
this.options?.skipGroupOnError &&
118-
this.#currentGroupId &&
119-
this.#failedGroupIds.has(this.#currentGroupId);
120-
121-
const result = shouldSkipCurrentGroup
109+
const result = this.#processor.shouldSkipCurrentGroup(this.options)
122110
? this.#processFailRecord(
123111
record,
124112
new SqsFifoMessageGroupShortCircuitError()
@@ -161,15 +149,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
161149
return processedRecords;
162150
}
163151

164-
/**
165-
* Adds the specified group ID to the set of failed group IDs.
166-
*
167-
* @param group - The group ID to be added to the set of failed group IDs.
168-
*/
169-
#addToFailedGroup(group: string): void {
170-
this.#failedGroupIds.add(group);
171-
}
172-
173152
/**
174153
* Processes a fail record.
175154
*
@@ -184,15 +163,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
184163

185164
return this.failureHandler(data, exception);
186165
}
187-
188-
/**
189-
* Sets the current group ID for the message being processed.
190-
*
191-
* @param group - The group ID of the current message being processed.
192-
*/
193-
#setCurrentGroup(group?: string): void {
194-
this.#currentGroupId = group;
195-
}
196166
}
197167

198168
export { SqsFifoPartialProcessor };

Diff for: packages/batch/src/SqsFifoPartialProcessorAsync.ts

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import type { SQSRecord } from 'aws-lambda';
2+
import { BatchProcessor } from './BatchProcessor.js';
3+
import { SqsFifoProcessor } from './SqsFifoProcessor.js';
4+
import { EventType } from './constants.js';
5+
import {
6+
type BatchProcessingError,
7+
SqsFifoMessageGroupShortCircuitError,
8+
SqsFifoShortCircuitError,
9+
} from './errors.js';
10+
import type {
11+
BaseRecord,
12+
EventSourceDataClassTypes,
13+
FailureResponse,
14+
SuccessResponse,
15+
} from './types.js';
16+
17+
/**
18+
* Batch processor for SQS FIFO queues
19+
*
20+
* This class extends the {@link BatchProcessor} class and provides
21+
* a mechanism to process records from SQS FIFO queues asynchronously.
22+
*
23+
* By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering.
24+
*
25+
* However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
26+
*
27+
* @example
28+
* ```typescript
29+
* import {
30+
* BatchProcessor,
31+
* SqsFifoPartialProcessorAsync,
32+
* processPartialResponse,
33+
* } from '@aws-lambda-powertools/batch';
34+
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
35+
*
36+
* const processor = new SqsFifoPartialProcessorAsync();
37+
*
38+
* const recordHandler = async (record: SQSRecord): Promise<void> => {
39+
* const payload = JSON.parse(record.body);
40+
* };
41+
*
42+
* export const handler: SQSHandler = async (event, context) =>
43+
* processPartialResponse(event, recordHandler, processor, {
44+
* context,
45+
* });
46+
* ```
47+
*/
48+
class SqsFifoPartialProcessorAsync extends BatchProcessor {
49+
/**
50+
* Processor for handling SQS FIFO message
51+
*/
52+
readonly #processor: SqsFifoProcessor;
53+
54+
public constructor() {
55+
super(EventType.SQS);
56+
this.#processor = new SqsFifoProcessor();
57+
}
58+
59+
/**
60+
* Handles a failure for a given record.
61+
*
62+
* @param record - The record that failed.
63+
* @param exception - The error that occurred.
64+
*/
65+
public failureHandler(
66+
record: EventSourceDataClassTypes,
67+
exception: Error
68+
): FailureResponse {
69+
this.#processor.processFailureForCurrentGroup(this.options);
70+
71+
return super.failureHandler(record, exception);
72+
}
73+
74+
/**
75+
* Process a record with a asynchronous handler
76+
*
77+
* This method orchestrates the processing of a batch of records asynchronously
78+
* for SQS FIFO queues.
79+
*
80+
* The method calls the prepare hook to initialize the processor and then
81+
* iterates over each record in the batch, processing them one by one.
82+
*
83+
* If one of them fails and `skipGroupOnError` is not true, the method short circuits
84+
* the processing and fails the remaining records in the batch.
85+
*
86+
* If one of them fails and `skipGroupOnError` is true, then the method fails the current record
87+
* if the message group has any previous failure, otherwise keeps processing.
88+
*
89+
* Then, it calls the clean hook to clean up the processor and returns the
90+
* processed records.
91+
*/
92+
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
93+
this.prepare();
94+
95+
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
96+
let currentIndex = 0;
97+
for (const record of this.records) {
98+
this.#processor.setCurrentGroup(
99+
(record as SQSRecord).attributes?.MessageGroupId
100+
);
101+
102+
if (
103+
this.#processor.shouldShortCircuit(this.failureMessages, this.options)
104+
) {
105+
return this.shortCircuitProcessing(currentIndex, processedRecords);
106+
}
107+
108+
const result = this.#processor.shouldSkipCurrentGroup(this.options)
109+
? this.#processFailRecord(
110+
record,
111+
new SqsFifoMessageGroupShortCircuitError()
112+
)
113+
: await this.processRecord(record);
114+
115+
processedRecords.push(result);
116+
currentIndex++;
117+
}
118+
119+
this.clean();
120+
121+
return processedRecords;
122+
}
123+
124+
/**
125+
* Starting from the first failure index, fail all remaining messages regardless
126+
* of their group ID.
127+
*
128+
* This short circuit mechanism is used when we detect a failed message in the batch.
129+
*
130+
* Since messages in a FIFO queue are processed in order, we must stop processing any
131+
* remaining messages in the batch to prevent out-of-order processing.
132+
*
133+
* @param firstFailureIndex Index of first message that failed
134+
* @param processedRecords Array of response items that have been processed both successfully and unsuccessfully
135+
*/
136+
protected shortCircuitProcessing(
137+
firstFailureIndex: number,
138+
processedRecords: (SuccessResponse | FailureResponse)[]
139+
): (SuccessResponse | FailureResponse)[] {
140+
const remainingRecords = this.records.slice(firstFailureIndex);
141+
142+
for (const record of remainingRecords) {
143+
this.#processFailRecord(record, new SqsFifoShortCircuitError());
144+
}
145+
146+
this.clean();
147+
148+
return processedRecords;
149+
}
150+
151+
/**
152+
* Processes a fail record.
153+
*
154+
* @param record - The record that failed.
155+
* @param exception - The error that occurred.
156+
*/
157+
#processFailRecord(
158+
record: BaseRecord,
159+
exception: BatchProcessingError
160+
): FailureResponse {
161+
const data = this.toBatchType(record, this.eventType);
162+
163+
return this.failureHandler(data, exception);
164+
}
165+
}
166+
167+
export { SqsFifoPartialProcessorAsync };

0 commit comments

Comments
 (0)