Skip to content

Commit a68db2c

Browse files
erikayao93dreamorosi
authored andcommitted
feat(batch): Batch processing wrapper function (#1605)
* Refactored some types, added function wrapper and base test * Added record check and tests, renamed factories * Refactored type check logic in function * Refactor test to remove error ignore
1 parent cffe3ed commit a68db2c

8 files changed

+299
-75
lines changed

Diff for: packages/batch/src/BasePartialBatchProcessor.ts

+12-12
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ import {
99
DEFAULT_RESPONSE,
1010
EventSourceDataClassTypes,
1111
EventType,
12-
ItemIdentifier,
13-
BatchResponse,
12+
PartialItemFailures,
13+
PartialItemFailureResponse,
1414
} from '.';
1515

1616
abstract class BasePartialBatchProcessor extends BasePartialProcessor {
1717
public COLLECTOR_MAPPING;
1818

19-
public batchResponse: BatchResponse;
19+
public batchResponse: PartialItemFailureResponse;
2020

2121
public eventType: keyof typeof EventType;
2222

@@ -52,16 +52,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
5252
);
5353
}
5454

55-
const messages: ItemIdentifier[] = this.getMessagesToReport();
55+
const messages: PartialItemFailures[] = this.getMessagesToReport();
5656
this.batchResponse = { batchItemFailures: messages };
5757
}
5858

5959
/**
6060
* Collects identifiers of failed items for a DynamoDB stream
6161
* @returns list of identifiers for failed items
6262
*/
63-
public collectDynamoDBFailures(): ItemIdentifier[] {
64-
const failures: ItemIdentifier[] = [];
63+
public collectDynamoDBFailures(): PartialItemFailures[] {
64+
const failures: PartialItemFailures[] = [];
6565

6666
for (const msg of this.failureMessages) {
6767
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber;
@@ -77,8 +77,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
7777
* Collects identifiers of failed items for a Kinesis stream
7878
* @returns list of identifiers for failed items
7979
*/
80-
public collectKinesisFailures(): ItemIdentifier[] {
81-
const failures: ItemIdentifier[] = [];
80+
public collectKinesisFailures(): PartialItemFailures[] {
81+
const failures: PartialItemFailures[] = [];
8282

8383
for (const msg of this.failureMessages) {
8484
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber;
@@ -92,8 +92,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
9292
* Collects identifiers of failed items for an SQS batch
9393
* @returns list of identifiers for failed items
9494
*/
95-
public collectSqsFailures(): ItemIdentifier[] {
96-
const failures: ItemIdentifier[] = [];
95+
public collectSqsFailures(): PartialItemFailures[] {
96+
const failures: PartialItemFailures[] = [];
9797

9898
for (const msg of this.failureMessages) {
9999
const msgId = (msg as SQSRecord).messageId;
@@ -115,7 +115,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
115115
* Collects identifiers for failed batch items
116116
* @returns formatted messages to use in batch deletion
117117
*/
118-
public getMessagesToReport(): ItemIdentifier[] {
118+
public getMessagesToReport(): PartialItemFailures[] {
119119
return this.COLLECTOR_MAPPING[this.eventType]();
120120
}
121121

@@ -146,7 +146,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
146146
/**
147147
* @returns Batch items that failed processing, if any
148148
*/
149-
public response(): BatchResponse {
149+
public response(): PartialItemFailureResponse {
150150
return this.batchResponse;
151151
}
152152

Diff for: packages/batch/src/constants.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
* Constants for batch processor classes
33
*/
44
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
5-
import type { BatchResponse, EventSourceDataClassTypes } from '.';
5+
import type { PartialItemFailureResponse, EventSourceDataClassTypes } from '.';
66

77
const EventType = {
88
SQS: 'SQS',
99
KinesisDataStreams: 'KinesisDataStreams',
1010
DynamoDBStreams: 'DynamoDBStreams',
1111
} as const;
1212

13-
const DEFAULT_RESPONSE: BatchResponse = {
13+
const DEFAULT_RESPONSE: PartialItemFailureResponse = {
1414
batchItemFailures: [],
1515
};
1616

Diff for: packages/batch/src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ export * from './errors';
33
export * from './types';
44
export * from './BasePartialProcessor';
55
export * from './BasePartialBatchProcessor';
6+
export * from './BatchProcessor';
7+
export * from './processPartialResponse';

Diff for: packages/batch/src/processPartialResponse.ts

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import {
2+
BasePartialBatchProcessor,
3+
BaseRecord,
4+
EventType,
5+
PartialItemFailureResponse,
6+
} from '.';
7+
8+
const processPartialResponse = async (
9+
event: { Records: BaseRecord[] },
10+
recordHandler: CallableFunction,
11+
processor: BasePartialBatchProcessor
12+
): Promise<PartialItemFailureResponse> => {
13+
if (!event.Records) {
14+
const eventTypes: string = Object.values(EventType).toString();
15+
throw new Error(
16+
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
17+
eventTypes +
18+
' event.'
19+
);
20+
}
21+
22+
const records = event['Records'];
23+
24+
processor.register(records, recordHandler);
25+
await processor.process();
26+
27+
return processor.response();
28+
};
29+
30+
export { processPartialResponse };

Diff for: packages/batch/src/types.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ type SuccessResponse = [string, ResultType, EventSourceDataClassTypes];
1717

1818
type FailureResponse = [string, string, EventSourceDataClassTypes];
1919

20-
type ItemIdentifier = { [key: string]: string };
21-
type BatchResponse = { [key: string]: ItemIdentifier[] };
20+
type PartialItemFailures = { itemIdentifier: string };
21+
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };
2222

2323
export type {
2424
BaseRecord,
2525
EventSourceDataClassTypes,
2626
ResultType,
2727
SuccessResponse,
2828
FailureResponse,
29-
ItemIdentifier,
30-
BatchResponse,
29+
PartialItemFailures,
30+
PartialItemFailureResponse,
3131
};

Diff for: packages/batch/tests/helpers/factories.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
22
import { randomInt } from 'crypto';
33
import { v4 } from 'uuid';
44

5-
const sqsEventFactory = (body: string): SQSRecord => {
5+
const sqsRecordFactory = (body: string): SQSRecord => {
66
return {
77
messageId: v4(),
88
receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a',
@@ -21,7 +21,7 @@ const sqsEventFactory = (body: string): SQSRecord => {
2121
};
2222
};
2323

24-
const kinesisEventFactory = (body: string): KinesisStreamRecord => {
24+
const kinesisRecordFactory = (body: string): KinesisStreamRecord => {
2525
let seq = '';
2626
for (let i = 0; i < 52; i++) {
2727
seq = seq + randomInt(10);
@@ -46,7 +46,7 @@ const kinesisEventFactory = (body: string): KinesisStreamRecord => {
4646
};
4747
};
4848

49-
const dynamodbEventFactory = (body: string): DynamoDBRecord => {
49+
const dynamodbRecordFactory = (body: string): DynamoDBRecord => {
5050
let seq = '';
5151
for (let i = 0; i < 10; i++) {
5252
seq = seq + randomInt(10);
@@ -69,4 +69,4 @@ const dynamodbEventFactory = (body: string): DynamoDBRecord => {
6969
};
7070
};
7171

72-
export { sqsEventFactory, kinesisEventFactory, dynamodbEventFactory };
72+
export { sqsRecordFactory, kinesisRecordFactory, dynamodbRecordFactory };

0 commit comments

Comments
 (0)