From da37777a3bb9e1a098f006b1089055679dede3ef Mon Sep 17 00:00:00 2001 From: erikayao93 Date: Fri, 14 Jul 2023 00:00:49 +0000 Subject: [PATCH 1/3] Added types and parameter for lambda context, added unit tests --- packages/batch/src/BasePartialProcessor.ts | 10 +++- packages/batch/src/BatchProcessor.ts | 8 ++- packages/batch/src/processPartialResponse.ts | 11 +++- packages/batch/src/types.ts | 13 ++++- packages/batch/tests/helpers/handlers.ts | 14 +++++ .../batch/tests/unit/BatchProcessor.test.ts | 48 ++++++++++++++++- .../tests/unit/processPartialResponse.test.ts | 54 +++++++++++++++++++ 7 files changed, 151 insertions(+), 7 deletions(-) diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 00bf1bcdd2..e1200a75f7 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -3,6 +3,7 @@ */ import { BaseRecord, + BatchProcessingOptions, EventSourceDataClassTypes, FailureResponse, ResultType, @@ -16,6 +17,8 @@ abstract class BasePartialProcessor { public handler: CallableFunction; + public options?: BatchProcessingOptions; + public records: BaseRecord[]; public successMessages: EventSourceDataClassTypes[]; @@ -92,11 +95,16 @@ abstract class BasePartialProcessor { */ public register( records: BaseRecord[], - handler: CallableFunction + handler: CallableFunction, + options?: BatchProcessingOptions ): BasePartialProcessor { this.records = records; this.handler = handler; + if (options) { + this.options = options; + } + return this; } diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index c3ddf4d25b..3fd2b72855 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -19,7 +19,13 @@ class BatchProcessor extends BasePartialBatchProcessor { ): Promise { try { const data = this.toBatchType(record, this.eventType); - const result = await this.handler(data); + + let result: unknown; + if (this.options) { + result = await this.handler(data, this.options); + } else { + result = await this.handler(data); + } return this.successHandler(record, result); } catch (e) { diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index a29a01e903..ffa58253b4 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -1,6 +1,7 @@ import { BasePartialBatchProcessor, BaseRecord, + BatchProcessingOptions, EventType, PartialItemFailureResponse, } from '.'; @@ -15,7 +16,8 @@ import { const processPartialResponse = async ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, - processor: BasePartialBatchProcessor + processor: BasePartialBatchProcessor, + options?: BatchProcessingOptions ): Promise => { if (!event.Records) { const eventTypes: string = Object.values(EventType).toString(); @@ -28,7 +30,12 @@ const processPartialResponse = async ( const records = event['Records']; - processor.register(records, recordHandler); + if (options) { + processor.register(records, recordHandler, options); + } else { + processor.register(records, recordHandler); + } + await processor.process(); return processor.response(); diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 38065a3d66..add07642fa 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -1,9 +1,17 @@ /** * Types for batch processing utility */ -import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; +import { + Context, + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; + +type BatchProcessingOptions = { + context: Context; +}; -// types from base.py type EventSourceDataClassTypes = | SQSRecord | KinesisStreamRecord @@ -21,6 +29,7 @@ type PartialItemFailures = { itemIdentifier: string }; type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; export type { + BatchProcessingOptions, BaseRecord, EventSourceDataClassTypes, ResultType, diff --git a/packages/batch/tests/helpers/handlers.ts b/packages/batch/tests/helpers/handlers.ts index 7f4dd933d3..45bac72d21 100644 --- a/packages/batch/tests/helpers/handlers.ts +++ b/packages/batch/tests/helpers/handlers.ts @@ -1,4 +1,5 @@ import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; +import { BatchProcessingOptions } from '../../src'; const sqsRecordHandler = (record: SQSRecord): string => { const body = record.body; @@ -58,6 +59,18 @@ const asyncDynamodbRecordHandler = async ( return body; }; +const handlerWithContext = ( + record: SQSRecord, + options: BatchProcessingOptions +): string => { + const context = options.context; + if (context.getRemainingTimeInMillis() == 0) { + throw Error('No time remaining.'); + } + + return record.body; +}; + export { sqsRecordHandler, asyncSqsRecordHandler, @@ -65,4 +78,5 @@ export { asyncKinesisRecordHandler, dynamodbRecordHandler, asyncDynamodbRecordHandler, + handlerWithContext, }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 4080196af5..677dfaf021 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -4,7 +4,12 @@ * @group unit/batch/class/batchprocessor */ -import { BatchProcessingError, BatchProcessor, EventType } from '../../src'; +import { + BatchProcessingError, + BatchProcessingOptions, + BatchProcessor, + EventType, +} from '../../src'; import { sqsRecordFactory, kinesisRecordFactory, @@ -17,10 +22,13 @@ import { asyncKinesisRecordHandler, dynamodbRecordHandler, asyncDynamodbRecordHandler, + handlerWithContext, } from '../../tests/helpers/handlers'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; describe('Class: BatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; + const options: BatchProcessingOptions = { context: dummyContext }; beforeEach(() => { jest.clearAllMocks(); @@ -418,4 +426,42 @@ describe('Class: BatchProcessor', () => { ); }); }); + + describe('Batch processing with Lambda context', () => { + test('Batch processing when context is provided and handler accepts', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, handlerWithContext, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when context is provided and handler does not accept', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, sqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + }); }); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index ea70123b2c..1d3c4fb844 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -11,6 +11,7 @@ import { SQSEvent, } from 'aws-lambda'; import { + BatchProcessingOptions, BatchProcessor, EventType, PartialItemFailureResponse, @@ -24,6 +25,7 @@ import { import { asyncSqsRecordHandler, dynamodbRecordHandler, + handlerWithContext, kinesisRecordHandler, sqsRecordHandler, } from '../../tests/helpers/handlers'; @@ -33,6 +35,7 @@ import { Custom as dummyEvent } from '../../../commons/src/samples/resources/eve describe('Function: processPartialResponse()', () => { const ENVIRONMENT_VARIABLES = process.env; const context = dummyContext; + const options: BatchProcessingOptions = { context: dummyContext }; beforeEach(() => { jest.clearAllMocks(); @@ -84,6 +87,27 @@ describe('Function: processPartialResponse()', () => { // Assess expect(ret).toStrictEqual({ batchItemFailures: [] }); }); + + test('Process partial response function call with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = await processPartialResponse( + batch, + handlerWithContext, + processor, + options + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); }); describe('Process partial response function call through handler', () => { @@ -188,5 +212,35 @@ describe('Function: processPartialResponse()', () => { ) ); }); + + test('Process partial response through handler with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + const options: BatchProcessingOptions = { context: _context }; + + return await processPartialResponse( + event, + handlerWithContext, + processor, + options + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); }); }); From a2d607df3b17eb09c0c3b346504c49c23467deb3 Mon Sep 17 00:00:00 2001 From: erikayao93 Date: Fri, 14 Jul 2023 18:04:00 +0000 Subject: [PATCH 2/3] Refactor parameter checking --- packages/batch/src/BatchProcessor.ts | 7 +------ packages/batch/src/processPartialResponse.ts | 8 +------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 3fd2b72855..40d8a2fcd8 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -20,12 +20,7 @@ class BatchProcessor extends BasePartialBatchProcessor { try { const data = this.toBatchType(record, this.eventType); - let result: unknown; - if (this.options) { - result = await this.handler(data, this.options); - } else { - result = await this.handler(data); - } + const result = await this.handler(data, this.options); return this.successHandler(record, result); } catch (e) { diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index ffa58253b4..15abaa10af 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -28,13 +28,7 @@ const processPartialResponse = async ( ); } - const records = event['Records']; - - if (options) { - processor.register(records, recordHandler, options); - } else { - processor.register(records, recordHandler); - } + processor.register(event.Records, recordHandler, options); await processor.process(); From e32f1c10873950f487073eca64996ddfcae4746f Mon Sep 17 00:00:00 2001 From: erikayao93 Date: Fri, 14 Jul 2023 18:24:18 +0000 Subject: [PATCH 3/3] Added test for malformed context handling --- packages/batch/tests/helpers/handlers.ts | 9 +++++++-- .../batch/tests/unit/BatchProcessor.test.ts | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/packages/batch/tests/helpers/handlers.ts b/packages/batch/tests/helpers/handlers.ts index 45bac72d21..e2862730c3 100644 --- a/packages/batch/tests/helpers/handlers.ts +++ b/packages/batch/tests/helpers/handlers.ts @@ -64,8 +64,13 @@ const handlerWithContext = ( options: BatchProcessingOptions ): string => { const context = options.context; - if (context.getRemainingTimeInMillis() == 0) { - throw Error('No time remaining.'); + + try { + if (context.getRemainingTimeInMillis() == 0) { + throw Error('No time remaining.'); + } + } catch (e) { + throw Error('Context possibly malformed. Displaying context:\n' + context); } return record.body; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 677dfaf021..36529ee533 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -25,6 +25,7 @@ import { handlerWithContext, } from '../../tests/helpers/handlers'; import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { Context } from 'aws-lambda'; describe('Class: BatchProcessor', () => { const ENVIRONMENT_VARIABLES = process.env; @@ -463,5 +464,21 @@ describe('Class: BatchProcessor', () => { ['success', secondRecord.body, secondRecord], ]); }); + + test('Batch processing when malformed context is provided and handler attempts to use', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + const badContext = { foo: 'bar' }; + const badOptions = { context: badContext as unknown as Context }; + + // Act + processor.register(records, handlerWithContext, badOptions); + await expect(processor.process()).rejects.toThrowError( + BatchProcessingError + ); + }); }); });