Skip to content

feat(batch): Implementing SQS FIFO processor class #1606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { BatchProcessor, EventType, FailureResponse, SuccessResponse } from '.';

/**
* Process native partial responses from SQS FIFO queues
* Stops processing records when the first record fails
* The remaining records are reported as failed items
*/
class SqsFifoPartialProcessor extends BatchProcessor {
public constructor() {
super(EventType.SQS);
}

/**
* Call instance's handler for each record.
* When the first failed message is detected, the process is short-circuited
* And the remaining messages are reported as failed items
* TODO: change to synchronous execution if possible
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
// If we have any failed messages, it means the last message failed
// We should then short circuit the process and fail remaining messages
if (this.failureMessages.length != 0) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

processedRecords.push(await this.processRecord(record));
currentIndex++;
}

this.clean();

return processedRecords;
}

/**
* Starting from the first failure index, fail all remaining messages and append them to the result list
* @param firstFailureIndex Index of first message that failed
* @param result List of success and failure responses with remaining messages failed
*/
public shortCircuitProcessing(
firstFailureIndex: number,
processedRecords: (SuccessResponse | FailureResponse)[]
): (SuccessResponse | FailureResponse)[] {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
const data = this.toBatchType(record, this.eventType);
processedRecords.push(
this.failureHandler(
data,
new Error('A previous record failed processing')
)
);
}

this.clean();

return processedRecords;
}
}

export { SqsFifoPartialProcessor };
1 change: 1 addition & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './BasePartialProcessor';
export * from './BasePartialBatchProcessor';
export * from './BatchProcessor';
export * from './processPartialResponse';
export * from './SqsFifoPartialProcessor';
7 changes: 7 additions & 0 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import {
PartialItemFailureResponse,
} from '.';

/**
* Higher level function to handle batch event processing
* @param event Lambda's original event
* @param recordHandler Callable function to process each record from the batch
* @param processor Batch processor to handle partial failure cases
* @returns Lambda Partial Batch Response
*/
const processPartialResponse = async (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
Expand Down
116 changes: 116 additions & 0 deletions packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Test SqsFifoBatchProcessor class
*
* @group unit/batch/class/sqsfifobatchprocessor
*/

import { SqsFifoPartialProcessor, processPartialResponse } from '../../src';
import { sqsRecordFactory } from '../../tests/helpers/factories';
import {
asyncSqsRecordHandler,
sqsRecordHandler,
} from '../../tests/helpers/handlers';

describe('Class: SqsFifoBatchProcessor', () => {
const ENVIRONMENT_VARIABLES = process.env;

beforeEach(() => {
jest.clearAllMocks();
jest.resetModules();
process.env = { ...ENVIRONMENT_VARIABLES };
});

afterAll(() => {
process.env = ENVIRONMENT_VARIABLES;
});

describe('Synchronous SQS FIFO batch processing', () => {
test('SQS FIFO Batch processor with no failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
sqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures']).toStrictEqual([]);
});

test('SQS FIFO Batch processor with failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('fail');
const thirdRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord, thirdRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
sqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures'].length).toBe(2);
expect(result['batchItemFailures'][0]['itemIdentifier']).toBe(
secondRecord.messageId
);
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
thirdRecord.messageId
);
});
});

describe('Asynchronous SQS FIFO batch processing', () => {
test('SQS FIFO Batch processor with no failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
asyncSqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures']).toStrictEqual([]);
});

test('SQS FIFO Batch processor with failures', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('fail');
const thirdRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord, thirdRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = await processPartialResponse(
event,
asyncSqsRecordHandler,
processor
);

// Assess
expect(result['batchItemFailures'].length).toBe(2);
expect(result['batchItemFailures'][0]['itemIdentifier']).toBe(
secondRecord.messageId
);
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
thirdRecord.messageId
);
});
});
});