Skip to content

feat(batch): add option to continue processing other group IDs on failure in SqsFifoPartialProcessor #2590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c2339ed
feat: generic type for BatchProcessingOptions
arnabrahman May 26, 2024
c080b8d
refactor: update processPartialResponseSync function for new generic …
arnabrahman May 26, 2024
b9197ce
feat: SqsFifoMessageGroupShortCircuitError exception
arnabrahman May 27, 2024
f9a5ab0
feat: stick to the default behavior of short-circuit
arnabrahman May 28, 2024
2405629
feat: skip processing record of group if it has already failed before
arnabrahman May 28, 2024
c5b5041
test: skipGroupOnError flag for SqsFifoPartialProcessor
arnabrahman May 28, 2024
6a5477b
doc: describe skipGroupOnError with example/diagram
arnabrahman May 28, 2024
ca18ea7
test: SQS Fifo for different scenarios by `skipGroupOnError` flag
arnabrahman May 29, 2024
249ca8b
doc: fix SQS FIFO queues aws documentation link
arnabrahman May 29, 2024
a5203e2
doc: update comments of SqsFifoPartialProcessor
arnabrahman May 29, 2024
8713413
test: change the last record to success
arnabrahman May 29, 2024
e3224f0
fix: merge conflict with main branch
arnabrahman May 30, 2024
ffe9366
doc: update comments for `skipGroupOnError` flag
arnabrahman May 30, 2024
adf5d3e
test: FIFO Batch processor processes everything on success despite th…
arnabrahman May 31, 2024
4cf9839
style: space after example
arnabrahman May 31, 2024
4f50be8
Merge branch 'main' into 2561-improve-sqs-fifo-processing
dreamorosi Jun 3, 2024
082f8b7
fix: check if messageGroupId exists
arnabrahman Jun 4, 2024
9a0ac3b
doc: styling for processFailRecord docString
arnabrahman Jun 4, 2024
c6d8c74
doc: enhance processPartialResponseSync docstring
arnabrahman Jun 4, 2024
ff76ead
fix: code highlight numbers
arnabrahman Jun 4, 2024
c7dfdd3
fix: indentation of batch readme
arnabrahman Jun 4, 2024
a2d4e5b
doc: move annotation to fix rendering
arnabrahman Jun 4, 2024
03b471c
refactor: make `context' optional
arnabrahman Jun 4, 2024
a1ab420
fix: remove `context` from `skipGroupOnError` tests
arnabrahman Jun 4, 2024
f442956
refactor: change TS privates to JS private fields
arnabrahman Jun 4, 2024
b7cd35b
Merge branch 'main' into 2561-improve-sqs-fifo-processing
dreamorosi Jun 4, 2024
0354524
style: fix docstring spacing
arnabrahman Jun 4, 2024
984fd53
Merge branch '2561-improve-sqs-fifo-processing' of github.com:arnabra…
arnabrahman Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,25 @@ Processing batches from SQS works in three stages:

#### FIFO queues

When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
This helps preserve the ordering of messages in your queue.
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html){target="_blank"}, a batch may include messages from different group IDs.

```typescript hl_lines="1-4 8 20-22"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.

Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)
=== "Recommended"

```typescript hl_lines="1-4 8"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)

=== "Enabling skipGroupOnError flag"

```typescript hl_lines="1-4 13 30"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
```

!!! Note
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
Expand Down Expand Up @@ -283,7 +294,7 @@ sequenceDiagram

> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.

Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skipGroupOnError` flag.

<center>
```mermaid
Expand All @@ -307,6 +318,31 @@ sequenceDiagram
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>

Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skipGroupOnError` flag.

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Process messages from another MessageGroupID
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages processed
SQS queue-->>SQS queue: Failed messages return
deactivate SQS queue
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>

#### Kinesis and DynamoDB Streams

> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
Expand Down
32 changes: 32 additions & 0 deletions examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {
SqsFifoPartialProcessor,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

const processor = new SqsFifoPartialProcessor();
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponseSync(event, recordHandler, processor, {
context,
skipGroupOnError: true,
});
};
113 changes: 101 additions & 12 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import { SQSRecord } from 'aws-lambda';
import { BatchProcessorSync } from './BatchProcessorSync.js';
import { EventType } from './constants.js';
import { SqsFifoShortCircuitError } from './errors.js';
import type { FailureResponse, SuccessResponse } from './types.js';
import {
BatchProcessingError,
SqsFifoMessageGroupShortCircuitError,
SqsFifoShortCircuitError,
} from './errors.js';
import type {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
SuccessResponse,
} from './types.js';

/**
* Batch processor for SQS FIFO queues
Expand Down Expand Up @@ -35,8 +45,36 @@ import type { FailureResponse, SuccessResponse } from './types.js';
* ```
*/
class SqsFifoPartialProcessor extends BatchProcessorSync {
/**
* The ID of the current message group being processed.
*/
#currentGroupId?: string;
/**
* A set of group IDs that have already encountered failures.
*/
#failedGroupIds: Set<string>;

public constructor() {
super(EventType.SQS);
this.#failedGroupIds = new Set<string>();
}

/**
* Handles a failure for a given record.
* Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true.
* @param record - The record that failed.
* @param exception - The error that occurred.
* @returns The failure response.
*/
public failureHandler(
record: EventSourceDataClassTypes,
exception: Error
): FailureResponse {
if (this.options?.skipGroupOnError && this.#currentGroupId) {
this.#addToFailedGroup(this.#currentGroupId);
}

return super.failureHandler(record, exception);
}

/**
Expand All @@ -48,8 +86,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
* The method calls the prepare hook to initialize the processor and then
* iterates over each record in the batch, processing them one by one.
*
* If one of them fails, the method short circuits the processing and fails
* the remaining records in the batch.
* If one of them fails and `skipGroupOnError` is not true, the method short circuits
* the processing and fails the remaining records in the batch.
*
* If one of them fails and `skipGroupOnError` is true, then the method fails the current record
* if the message group has any previous failure, otherwise keeps processing.
*
* Then, it calls the clean hook to clean up the processor and returns the
* processed records.
Expand All @@ -60,13 +101,31 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
// If we have any failed messages, it means the last message failed
// We should then short circuit the process and fail remaining messages
if (this.failureMessages.length != 0) {
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);

// If we have any failed messages, we should then short circuit the process and
// fail remaining messages unless `skipGroupOnError` is true
const shouldShortCircuit =
!this.options?.skipGroupOnError && this.failureMessages.length !== 0;
if (shouldShortCircuit) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

processedRecords.push(this.processRecordSync(record));
// If `skipGroupOnError` is true and the current group has previously failed,
// then we should skip processing the current group.
const shouldSkipCurrentGroup =
this.options?.skipGroupOnError &&
this.#currentGroupId &&
this.#failedGroupIds.has(this.#currentGroupId);

const result = shouldSkipCurrentGroup
? this.#processFailRecord(
record,
new SqsFifoMessageGroupShortCircuitError()
)
: this.processRecordSync(record);

processedRecords.push(result);
currentIndex++;
}

Expand Down Expand Up @@ -94,16 +153,46 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
const data = this.toBatchType(record, this.eventType);
processedRecords.push(
this.failureHandler(data, new SqsFifoShortCircuitError())
);
this.#processFailRecord(record, new SqsFifoShortCircuitError());
}

this.clean();

return processedRecords;
}

/**
* Adds the specified group ID to the set of failed group IDs.
*
* @param group - The group ID to be added to the set of failed group IDs.
*/
#addToFailedGroup(group: string): void {
this.#failedGroupIds.add(group);
}

/**
* Processes a fail record.
*
* @param record - The record that failed.
* @param exception - The error that occurred.
*/
#processFailRecord(
record: BaseRecord,
exception: BatchProcessingError
): FailureResponse {
const data = this.toBatchType(record, this.eventType);

return this.failureHandler(data, exception);
}

/**
* Sets the current group ID for the message being processed.
*
* @param group - The group ID of the current message being processed.
*/
#setCurrentGroup(group?: string): void {
this.#currentGroupId = group;
}
}

export { SqsFifoPartialProcessor };
12 changes: 12 additions & 0 deletions packages/batch/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ class SqsFifoShortCircuitError extends BatchProcessingError {
}
}

/**
* Error thrown by the Batch Processing utility when a previous record from
* SQS FIFO queue message group fails processing.
*/
class SqsFifoMessageGroupShortCircuitError extends BatchProcessingError {
public constructor() {
super('A previous record from this message group failed processing');
this.name = 'SqsFifoMessageGroupShortCircuitError';
}
}

/**
* Error thrown by the Batch Processing utility when a partial processor receives an unexpected
* batch type.
Expand All @@ -56,5 +67,6 @@ export {
BatchProcessingError,
FullBatchFailureError,
SqsFifoShortCircuitError,
SqsFifoMessageGroupShortCircuitError,
UnexpectedBatchTypeError,
};
1 change: 1 addition & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export {
BatchProcessingError,
FullBatchFailureError,
SqsFifoShortCircuitError,
SqsFifoMessageGroupShortCircuitError,
UnexpectedBatchTypeError,
} from './errors.js';
export { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
Expand Down
34 changes: 30 additions & 4 deletions packages/batch/src/processPartialResponseSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,42 @@ import type {
* });
* ```
*
* When working with SQS FIFO queues, we will stop processing at the first failure
* and mark unprocessed messages as failed to preserve ordering. However, if you want to
* continue processing messages from different group IDs, you can enable the `skipGroupOnError`
* option for seamless processing of messages from various group IDs.
*
* @example
* ```typescript
* import {
* SqsFifoPartialProcessor,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessor();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* skipGroupOnError: true
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Sync function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
* @param options Batch processing options
* @param options Batch processing options, which can vary with chosen batch processor implementation
*/
const processPartialResponseSync = (
const processPartialResponseSync = <T extends BasePartialBatchProcessor>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
processor: T,
options?: BatchProcessingOptions<T>
): PartialItemFailureResponse => {
if (!event.Records || !Array.isArray(event.Records)) {
throw new UnexpectedBatchTypeError();
Expand Down
13 changes: 11 additions & 2 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ import type {
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';
import { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';

/**
* Options for batch processing
*
* @template T The type of the batch processor, defaults to BasePartialBatchProcessor
* @property context The context object provided by the AWS Lambda runtime
* @property skipGroupOnError The option to group on error during processing
*/
type BatchProcessingOptions = {
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
/**
* The context object provided by the AWS Lambda runtime. When provided,
* it's made available to the handler function you specify
*/
context: Context;
context?: Context;
/**
* This option is only available for SqsFifoPartialProcessor.
* If true skip the group on error during processing.
*/
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/batch/tests/helpers/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
} from 'aws-lambda';
import { randomInt, randomUUID } from 'node:crypto';

const sqsRecordFactory = (body: string): SQSRecord => {
const sqsRecordFactory = (body: string, messageGroupId?: string): SQSRecord => {
return {
messageId: randomUUID(),
receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a',
Expand All @@ -15,6 +15,7 @@ const sqsRecordFactory = (body: string): SQSRecord => {
SentTimestamp: '1545082649183',
SenderId: 'AIDAIENQZJOLO23YVJ4VO',
ApproximateFirstReceiveTimestamp: '1545082649185',
...(messageGroupId ? { MessageGroupId: messageGroupId } : {}),
},
messageAttributes: {},
md5OfBody: 'e4e68fb7bd0e697a0ae8f1bb342846b3',
Expand Down
Loading