Skip to content

Commit 1bb39c8

Browse files
erikayao93dreamorosi
authored andcommitted
feat(batch): Support for Lambda context access in batch processing (#1609)
* Added types and parameter for lambda context, added unit tests * Refactor parameter checking * Added test for malformed context handling
1 parent f7dc25d commit 1bb39c8

7 files changed

+163
-8
lines changed

Diff for: packages/batch/src/BasePartialProcessor.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
import {
55
BaseRecord,
6+
BatchProcessingOptions,
67
EventSourceDataClassTypes,
78
FailureResponse,
89
ResultType,
@@ -16,6 +17,8 @@ abstract class BasePartialProcessor {
1617

1718
public handler: CallableFunction;
1819

20+
public options?: BatchProcessingOptions;
21+
1922
public records: BaseRecord[];
2023

2124
public successMessages: EventSourceDataClassTypes[];
@@ -92,11 +95,16 @@ abstract class BasePartialProcessor {
9295
*/
9396
public register(
9497
records: BaseRecord[],
95-
handler: CallableFunction
98+
handler: CallableFunction,
99+
options?: BatchProcessingOptions
96100
): BasePartialProcessor {
97101
this.records = records;
98102
this.handler = handler;
99103

104+
if (options) {
105+
this.options = options;
106+
}
107+
100108
return this;
101109
}
102110

Diff for: packages/batch/src/BatchProcessor.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ class BatchProcessor extends BasePartialBatchProcessor {
1919
): Promise<SuccessResponse | FailureResponse> {
2020
try {
2121
const data = this.toBatchType(record, this.eventType);
22-
const result = await this.handler(data);
22+
23+
const result = await this.handler(data, this.options);
2324

2425
return this.successHandler(record, result);
2526
} catch (e) {

Diff for: packages/batch/src/processPartialResponse.ts

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
BasePartialBatchProcessor,
33
BaseRecord,
4+
BatchProcessingOptions,
45
EventType,
56
PartialItemFailureResponse,
67
} from '.';
@@ -15,7 +16,8 @@ import {
1516
const processPartialResponse = async (
1617
event: { Records: BaseRecord[] },
1718
recordHandler: CallableFunction,
18-
processor: BasePartialBatchProcessor
19+
processor: BasePartialBatchProcessor,
20+
options?: BatchProcessingOptions
1921
): Promise<PartialItemFailureResponse> => {
2022
if (!event.Records) {
2123
const eventTypes: string = Object.values(EventType).toString();
@@ -26,9 +28,8 @@ const processPartialResponse = async (
2628
);
2729
}
2830

29-
const records = event['Records'];
31+
processor.register(event.Records, recordHandler, options);
3032

31-
processor.register(records, recordHandler);
3233
await processor.process();
3334

3435
return processor.response();

Diff for: packages/batch/src/types.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
/**
22
* Types for batch processing utility
33
*/
4-
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
4+
import {
5+
Context,
6+
DynamoDBRecord,
7+
KinesisStreamRecord,
8+
SQSRecord,
9+
} from 'aws-lambda';
10+
11+
type BatchProcessingOptions = {
12+
context: Context;
13+
};
514

6-
// types from base.py
715
type EventSourceDataClassTypes =
816
| SQSRecord
917
| KinesisStreamRecord
@@ -21,6 +29,7 @@ type PartialItemFailures = { itemIdentifier: string };
2129
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };
2230

2331
export type {
32+
BatchProcessingOptions,
2433
BaseRecord,
2534
EventSourceDataClassTypes,
2635
ResultType,

Diff for: packages/batch/tests/helpers/handlers.ts

+19
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
2+
import { BatchProcessingOptions } from '../../src';
23

34
const sqsRecordHandler = (record: SQSRecord): string => {
45
const body = record.body;
@@ -58,11 +59,29 @@ const asyncDynamodbRecordHandler = async (
5859
return body;
5960
};
6061

62+
const handlerWithContext = (
63+
record: SQSRecord,
64+
options: BatchProcessingOptions
65+
): string => {
66+
const context = options.context;
67+
68+
try {
69+
if (context.getRemainingTimeInMillis() == 0) {
70+
throw Error('No time remaining.');
71+
}
72+
} catch (e) {
73+
throw Error('Context possibly malformed. Displaying context:\n' + context);
74+
}
75+
76+
return record.body;
77+
};
78+
6179
export {
6280
sqsRecordHandler,
6381
asyncSqsRecordHandler,
6482
kinesisRecordHandler,
6583
asyncKinesisRecordHandler,
6684
dynamodbRecordHandler,
6785
asyncDynamodbRecordHandler,
86+
handlerWithContext,
6887
};

Diff for: packages/batch/tests/unit/BatchProcessor.test.ts

+64-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
* @group unit/batch/class/batchprocessor
55
*/
66

7-
import { BatchProcessingError, BatchProcessor, EventType } from '../../src';
7+
import {
8+
BatchProcessingError,
9+
BatchProcessingOptions,
10+
BatchProcessor,
11+
EventType,
12+
} from '../../src';
813
import {
914
sqsRecordFactory,
1015
kinesisRecordFactory,
@@ -17,10 +22,14 @@ import {
1722
asyncKinesisRecordHandler,
1823
dynamodbRecordHandler,
1924
asyncDynamodbRecordHandler,
25+
handlerWithContext,
2026
} from '../../tests/helpers/handlers';
27+
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
28+
import { Context } from 'aws-lambda';
2129

2230
describe('Class: BatchProcessor', () => {
2331
const ENVIRONMENT_VARIABLES = process.env;
32+
const options: BatchProcessingOptions = { context: dummyContext };
2433

2534
beforeEach(() => {
2635
jest.clearAllMocks();
@@ -418,4 +427,58 @@ describe('Class: BatchProcessor', () => {
418427
);
419428
});
420429
});
430+
431+
describe('Batch processing with Lambda context', () => {
432+
test('Batch processing when context is provided and handler accepts', async () => {
433+
// Prepare
434+
const firstRecord = sqsRecordFactory('success');
435+
const secondRecord = sqsRecordFactory('success');
436+
const records = [firstRecord, secondRecord];
437+
const processor = new BatchProcessor(EventType.SQS);
438+
439+
// Act
440+
processor.register(records, handlerWithContext, options);
441+
const processedMessages = await processor.process();
442+
443+
// Assess
444+
expect(processedMessages).toStrictEqual([
445+
['success', firstRecord.body, firstRecord],
446+
['success', secondRecord.body, secondRecord],
447+
]);
448+
});
449+
450+
test('Batch processing when context is provided and handler does not accept', async () => {
451+
// Prepare
452+
const firstRecord = sqsRecordFactory('success');
453+
const secondRecord = sqsRecordFactory('success');
454+
const records = [firstRecord, secondRecord];
455+
const processor = new BatchProcessor(EventType.SQS);
456+
457+
// Act
458+
processor.register(records, sqsRecordHandler, options);
459+
const processedMessages = await processor.process();
460+
461+
// Assess
462+
expect(processedMessages).toStrictEqual([
463+
['success', firstRecord.body, firstRecord],
464+
['success', secondRecord.body, secondRecord],
465+
]);
466+
});
467+
468+
test('Batch processing when malformed context is provided and handler attempts to use', async () => {
469+
// Prepare
470+
const firstRecord = sqsRecordFactory('success');
471+
const secondRecord = sqsRecordFactory('success');
472+
const records = [firstRecord, secondRecord];
473+
const processor = new BatchProcessor(EventType.SQS);
474+
const badContext = { foo: 'bar' };
475+
const badOptions = { context: badContext as unknown as Context };
476+
477+
// Act
478+
processor.register(records, handlerWithContext, badOptions);
479+
await expect(processor.process()).rejects.toThrowError(
480+
BatchProcessingError
481+
);
482+
});
483+
});
421484
});

Diff for: packages/batch/tests/unit/processPartialResponse.test.ts

+54
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
SQSEvent,
1212
} from 'aws-lambda';
1313
import {
14+
BatchProcessingOptions,
1415
BatchProcessor,
1516
EventType,
1617
PartialItemFailureResponse,
@@ -24,6 +25,7 @@ import {
2425
import {
2526
asyncSqsRecordHandler,
2627
dynamodbRecordHandler,
28+
handlerWithContext,
2729
kinesisRecordHandler,
2830
sqsRecordHandler,
2931
} from '../../tests/helpers/handlers';
@@ -33,6 +35,7 @@ import { Custom as dummyEvent } from '../../../commons/src/samples/resources/eve
3335
describe('Function: processPartialResponse()', () => {
3436
const ENVIRONMENT_VARIABLES = process.env;
3537
const context = dummyContext;
38+
const options: BatchProcessingOptions = { context: dummyContext };
3639

3740
beforeEach(() => {
3841
jest.clearAllMocks();
@@ -84,6 +87,27 @@ describe('Function: processPartialResponse()', () => {
8487
// Assess
8588
expect(ret).toStrictEqual({ batchItemFailures: [] });
8689
});
90+
91+
test('Process partial response function call with context provided', async () => {
92+
// Prepare
93+
const records = [
94+
sqsRecordFactory('success'),
95+
sqsRecordFactory('success'),
96+
];
97+
const batch = { Records: records };
98+
const processor = new BatchProcessor(EventType.SQS);
99+
100+
// Act
101+
const ret = await processPartialResponse(
102+
batch,
103+
handlerWithContext,
104+
processor,
105+
options
106+
);
107+
108+
// Assess
109+
expect(ret).toStrictEqual({ batchItemFailures: [] });
110+
});
87111
});
88112

89113
describe('Process partial response function call through handler', () => {
@@ -188,5 +212,35 @@ describe('Function: processPartialResponse()', () => {
188212
)
189213
);
190214
});
215+
216+
test('Process partial response through handler with context provided', async () => {
217+
// Prepare
218+
const records = [
219+
sqsRecordFactory('success'),
220+
sqsRecordFactory('success'),
221+
];
222+
const processor = new BatchProcessor(EventType.SQS);
223+
const event: SQSEvent = { Records: records };
224+
225+
const handler = async (
226+
event: SQSEvent,
227+
_context: Context
228+
): Promise<PartialItemFailureResponse> => {
229+
const options: BatchProcessingOptions = { context: _context };
230+
231+
return await processPartialResponse(
232+
event,
233+
handlerWithContext,
234+
processor,
235+
options
236+
);
237+
};
238+
239+
// Act
240+
const result = await handler(event, context);
241+
242+
// Assess
243+
expect(result).toStrictEqual({ batchItemFailures: [] });
244+
});
191245
});
192246
});

0 commit comments

Comments
 (0)