Skip to content

feat(batch): Support for Lambda context access in batch processing #1609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
import {
BaseRecord,
BatchProcessingOptions,
EventSourceDataClassTypes,
FailureResponse,
ResultType,
Expand All @@ -16,6 +17,8 @@ abstract class BasePartialProcessor {

public handler: CallableFunction;

public options?: BatchProcessingOptions;

public records: BaseRecord[];

public successMessages: EventSourceDataClassTypes[];
Expand Down Expand Up @@ -92,11 +95,16 @@ abstract class BasePartialProcessor {
*/
public register(
records: BaseRecord[],
handler: CallableFunction
handler: CallableFunction,
options?: BatchProcessingOptions
): BasePartialProcessor {
this.records = records;
this.handler = handler;

if (options) {
this.options = options;
}

return this;
}

Expand Down
3 changes: 2 additions & 1 deletion packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class BatchProcessor extends BasePartialBatchProcessor {
): Promise<SuccessResponse | FailureResponse> {
try {
const data = this.toBatchType(record, this.eventType);
const result = await this.handler(data);

const result = await this.handler(data, this.options);

return this.successHandler(record, result);
} catch (e) {
Expand Down
7 changes: 4 additions & 3 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BasePartialBatchProcessor,
BaseRecord,
BatchProcessingOptions,
EventType,
PartialItemFailureResponse,
} from '.';
Expand All @@ -15,7 +16,8 @@ import {
const processPartialResponse = async (
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): Promise<PartialItemFailureResponse> => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
Expand All @@ -26,9 +28,8 @@ const processPartialResponse = async (
);
}

const records = event['Records'];
processor.register(event.Records, recordHandler, options);

processor.register(records, recordHandler);
await processor.process();

return processor.response();
Expand Down
13 changes: 11 additions & 2 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
/**
* Types for batch processing utility
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import {
Context,
DynamoDBRecord,
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';

type BatchProcessingOptions = {
context: Context;
};

// types from base.py
type EventSourceDataClassTypes =
| SQSRecord
| KinesisStreamRecord
Expand All @@ -21,6 +29,7 @@ type PartialItemFailures = { itemIdentifier: string };
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };

export type {
BatchProcessingOptions,
BaseRecord,
EventSourceDataClassTypes,
ResultType,
Expand Down
19 changes: 19 additions & 0 deletions packages/batch/tests/helpers/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import { BatchProcessingOptions } from '../../src';

const sqsRecordHandler = (record: SQSRecord): string => {
const body = record.body;
Expand Down Expand Up @@ -58,11 +59,29 @@ const asyncDynamodbRecordHandler = async (
return body;
};

const handlerWithContext = (
record: SQSRecord,
options: BatchProcessingOptions
): string => {
const context = options.context;

try {
if (context.getRemainingTimeInMillis() == 0) {
throw Error('No time remaining.');
}
} catch (e) {
throw Error('Context possibly malformed. Displaying context:\n' + context);
}

return record.body;
};

export {
sqsRecordHandler,
asyncSqsRecordHandler,
kinesisRecordHandler,
asyncKinesisRecordHandler,
dynamodbRecordHandler,
asyncDynamodbRecordHandler,
handlerWithContext,
};
65 changes: 64 additions & 1 deletion packages/batch/tests/unit/BatchProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
* @group unit/batch/class/batchprocessor
*/

import { BatchProcessingError, BatchProcessor, EventType } from '../../src';
import {
BatchProcessingError,
BatchProcessingOptions,
BatchProcessor,
EventType,
} from '../../src';
import {
sqsRecordFactory,
kinesisRecordFactory,
Expand All @@ -17,10 +22,14 @@ import {
asyncKinesisRecordHandler,
dynamodbRecordHandler,
asyncDynamodbRecordHandler,
handlerWithContext,
} from '../../tests/helpers/handlers';
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
import { Context } from 'aws-lambda';

describe('Class: BatchProcessor', () => {
const ENVIRONMENT_VARIABLES = process.env;
const options: BatchProcessingOptions = { context: dummyContext };

beforeEach(() => {
jest.clearAllMocks();
Expand Down Expand Up @@ -418,4 +427,58 @@ describe('Class: BatchProcessor', () => {
);
});
});

describe('Batch processing with Lambda context', () => {
test('Batch processing when context is provided and handler accepts', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const records = [firstRecord, secondRecord];
const processor = new BatchProcessor(EventType.SQS);

// Act
processor.register(records, handlerWithContext, options);
const processedMessages = await processor.process();

// Assess
expect(processedMessages).toStrictEqual([
['success', firstRecord.body, firstRecord],
['success', secondRecord.body, secondRecord],
]);
});

test('Batch processing when context is provided and handler does not accept', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const records = [firstRecord, secondRecord];
const processor = new BatchProcessor(EventType.SQS);

// Act
processor.register(records, sqsRecordHandler, options);
const processedMessages = await processor.process();

// Assess
expect(processedMessages).toStrictEqual([
['success', firstRecord.body, firstRecord],
['success', secondRecord.body, secondRecord],
]);
});

test('Batch processing when malformed context is provided and handler attempts to use', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const records = [firstRecord, secondRecord];
const processor = new BatchProcessor(EventType.SQS);
const badContext = { foo: 'bar' };
const badOptions = { context: badContext as unknown as Context };

// Act
processor.register(records, handlerWithContext, badOptions);
await expect(processor.process()).rejects.toThrowError(
BatchProcessingError
);
});
});
});
54 changes: 54 additions & 0 deletions packages/batch/tests/unit/processPartialResponse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
SQSEvent,
} from 'aws-lambda';
import {
BatchProcessingOptions,
BatchProcessor,
EventType,
PartialItemFailureResponse,
Expand All @@ -24,6 +25,7 @@ import {
import {
asyncSqsRecordHandler,
dynamodbRecordHandler,
handlerWithContext,
kinesisRecordHandler,
sqsRecordHandler,
} from '../../tests/helpers/handlers';
Expand All @@ -33,6 +35,7 @@ import { Custom as dummyEvent } from '../../../commons/src/samples/resources/eve
describe('Function: processPartialResponse()', () => {
const ENVIRONMENT_VARIABLES = process.env;
const context = dummyContext;
const options: BatchProcessingOptions = { context: dummyContext };

beforeEach(() => {
jest.clearAllMocks();
Expand Down Expand Up @@ -84,6 +87,27 @@ describe('Function: processPartialResponse()', () => {
// Assess
expect(ret).toStrictEqual({ batchItemFailures: [] });
});

test('Process partial response function call with context provided', async () => {
// Prepare
const records = [
sqsRecordFactory('success'),
sqsRecordFactory('success'),
];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act
const ret = await processPartialResponse(
batch,
handlerWithContext,
processor,
options
);

// Assess
expect(ret).toStrictEqual({ batchItemFailures: [] });
});
});

describe('Process partial response function call through handler', () => {
Expand Down Expand Up @@ -188,5 +212,35 @@ describe('Function: processPartialResponse()', () => {
)
);
});

test('Process partial response through handler with context provided', async () => {
// Prepare
const records = [
sqsRecordFactory('success'),
sqsRecordFactory('success'),
];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
const options: BatchProcessingOptions = { context: _context };

return await processPartialResponse(
event,
handlerWithContext,
processor,
options
);
};

// Act
const result = await handler(event, context);

// Assess
expect(result).toStrictEqual({ batchItemFailures: [] });
});
});
});