Skip to content

Commit a8dfa74

Browse files
am29ddreamorosi
andauthored
feat(parser): add schema for DynamoDB - Kinesis Stream event (#3328)
Co-authored-by: Andrea Amorosi <[email protected]>
1 parent 6156587 commit a8dfa74

16 files changed

+261
-88
lines changed

Diff for: docs/utilities/parser.md

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ Parser comes with the following built-in schemas:
7878
| **KafkaSelfManagedEventSchema** | Lambda Event Source payload for self managed Kafka payload |
7979
| **KinesisDataStreamSchema** | Lambda Event Source payload for Amazon Kinesis Data Streams |
8080
| **KinesisFirehoseSchema** | Lambda Event Source payload for Amazon Kinesis Firehose |
81+
| **KinesisDynamoDBStreamSchema** | Lambda Event Source payload for DynamodbStream record wrapped in Kinesis Data stream |
8182
| **KinesisFirehoseSqsSchema** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
8283
| **LambdaFunctionUrlSchema** | Lambda Event Source payload for Lambda Function URL payload |
8384
| **S3EventNotificationEventBridgeSchema** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |

Diff for: packages/parser/src/schemas/dynamodb.ts

+14
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,19 @@ const DynamoDBStreamRecord = z.object({
3131
userIdentity: UserIdentity.optional(),
3232
});
3333

34+
const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
35+
recordFormat: z.literal('application/json'),
36+
tableName: z.string(),
37+
userIdentity: UserIdentity.nullish(),
38+
dynamodb: DynamoDBStreamChangeRecord.omit({
39+
SequenceNumber: true,
40+
StreamViewType: true,
41+
}),
42+
}).omit({
43+
eventVersion: true,
44+
eventSourceARN: true,
45+
});
46+
3447
/**
3548
* Zod schema for Amazon DynamoDB Stream event.
3649
*
@@ -111,6 +124,7 @@ const DynamoDBStreamSchema = z.object({
111124
});
112125

113126
export {
127+
DynamoDBStreamToKinesisRecord,
114128
DynamoDBStreamSchema,
115129
DynamoDBStreamRecord,
116130
DynamoDBStreamChangeRecord,

Diff for: packages/parser/src/schemas/index.ts

+9-2
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,21 @@ export {
2020
CloudWatchLogsDecodeSchema,
2121
CloudWatchLogsSchema,
2222
} from './cloudwatch.js';
23-
export { DynamoDBStreamSchema } from './dynamodb.js';
23+
export {
24+
DynamoDBStreamSchema,
25+
DynamoDBStreamToKinesisRecord,
26+
} from './dynamodb.js';
2427
export { EventBridgeSchema } from './eventbridge.js';
2528
export {
2629
KafkaMskEventSchema,
2730
KafkaSelfManagedEventSchema,
2831
KafkaRecordSchema,
2932
} from './kafka.js';
30-
export { KinesisDataStreamSchema, KinesisDataStreamRecord } from './kinesis.js';
33+
export {
34+
KinesisDataStreamSchema,
35+
KinesisDynamoDBStreamSchema,
36+
KinesisDataStreamRecord,
37+
} from './kinesis.js';
3138
export {
3239
KinesisFirehoseSchema,
3340
KinesisFirehoseSqsSchema,

Diff for: packages/parser/src/schemas/kinesis.ts

+20-3
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
import { gunzipSync } from 'node:zlib';
22
import { z } from 'zod';
3+
import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';
34

45
const KinesisDataStreamRecordPayload = z.object({
56
kinesisSchemaVersion: z.string(),
67
partitionKey: z.string(),
78
sequenceNumber: z.string(),
89
approximateArrivalTimestamp: z.number(),
910
data: z.string().transform((data) => {
10-
const decompresed = decompress(data);
11+
const decompressed = decompress(data);
1112
const decoded = Buffer.from(data, 'base64').toString('utf-8');
1213
try {
1314
// If data was not compressed, try to parse it as JSON otherwise it must be string
14-
return decompresed === data ? JSON.parse(decoded) : decompresed;
15+
return decompressed === data ? JSON.parse(decoded) : decompressed;
1516
} catch (e) {
1617
return decoded;
1718
}
@@ -37,6 +38,21 @@ const KinesisDataStreamRecord = z.object({
3738
kinesis: KinesisDataStreamRecordPayload,
3839
});
3940

41+
const KinesisDynamoDBStreamSchema = z.object({
42+
Records: z.array(
43+
KinesisDataStreamRecord.extend({
44+
kinesis: KinesisDataStreamRecordPayload.extend({
45+
data: z
46+
.string()
47+
.transform((data) => {
48+
return JSON.parse(Buffer.from(data, 'base64').toString('utf8'));
49+
})
50+
.pipe(DynamoDBStreamToKinesisRecord),
51+
}),
52+
})
53+
),
54+
});
55+
4056
/**
4157
* Zod schema for Kinesis Data Stream event
4258
*
@@ -88,7 +104,8 @@ const KinesisDataStreamSchema = z.object({
88104
});
89105

90106
export {
91-
KinesisDataStreamSchema,
92107
KinesisDataStreamRecord,
93108
KinesisDataStreamRecordPayload,
109+
KinesisDataStreamSchema,
110+
KinesisDynamoDBStreamSchema,
94111
};

Diff for: packages/parser/src/types/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export type {
1919
SnsEvent,
2020
SqsEvent,
2121
DynamoDBStreamEvent,
22+
DynamoDBStreamToKinesisRecordEvent,
2223
CloudWatchLogsEvent,
2324
CloudFormationCustomResourceCreateEvent,
2425
CloudFormationCustomResourceDeleteEvent,
@@ -27,6 +28,7 @@ export type {
2728
KafkaSelfManagedEvent,
2829
KafkaMskEvent,
2930
KinesisDataStreamEvent,
31+
KinesisDynamoDBStreamEvent,
3032
KinesisFireHoseEvent,
3133
KinesisFireHoseSqsEvent,
3234
LambdaFunctionUrlEvent,

Diff for: packages/parser/src/types/schema.ts

+10
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import type {
1111
CloudFormationCustomResourceUpdateSchema,
1212
CloudWatchLogsSchema,
1313
DynamoDBStreamSchema,
14+
DynamoDBStreamToKinesisRecord,
1415
EventBridgeSchema,
1516
KafkaMskEventSchema,
1617
KafkaRecordSchema,
1718
KafkaSelfManagedEventSchema,
1819
KinesisDataStreamSchema,
20+
KinesisDynamoDBStreamSchema,
1921
KinesisFirehoseRecordSchema,
2022
KinesisFirehoseSchema,
2123
KinesisFirehoseSqsRecordSchema,
@@ -69,6 +71,10 @@ type CloudWatchLogsEvent = z.infer<typeof CloudWatchLogsSchema>;
6971

7072
type DynamoDBStreamEvent = z.infer<typeof DynamoDBStreamSchema>;
7173

74+
type DynamoDBStreamToKinesisRecordEvent = z.infer<
75+
typeof DynamoDBStreamToKinesisRecord
76+
>;
77+
7278
type EventBridgeEvent = z.infer<typeof EventBridgeSchema>;
7379

7480
type KafkaSelfManagedEvent = z.infer<typeof KafkaSelfManagedEventSchema>;
@@ -79,6 +85,8 @@ type KafkaMskEvent = z.infer<typeof KafkaMskEventSchema>;
7985

8086
type KinesisDataStreamEvent = z.infer<typeof KinesisDataStreamSchema>;
8187

88+
type KinesisDynamoDBStreamEvent = z.infer<typeof KinesisDynamoDBStreamSchema>;
89+
8290
type KinesisFireHoseEvent = z.infer<typeof KinesisFirehoseSchema>;
8391

8492
type KinesisFirehoseRecord = z.infer<typeof KinesisFirehoseRecordSchema>;
@@ -131,11 +139,13 @@ export type {
131139
CloudFormationCustomResourceUpdateEvent,
132140
CloudWatchLogsEvent,
133141
DynamoDBStreamEvent,
142+
DynamoDBStreamToKinesisRecordEvent,
134143
EventBridgeEvent,
135144
KafkaSelfManagedEvent,
136145
KafkaMskEvent,
137146
KafkaRecord,
138147
KinesisDataStreamEvent,
148+
KinesisDynamoDBStreamEvent,
139149
KinesisFireHoseEvent,
140150
KinesisFirehoseRecord,
141151
KinesisFireHoseSqsEvent,
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "859F7C064A4818874FA67ABEC9BF2AF1",
7+
"sequenceNumber": "49657828409187701520019995242508390119953358325192589314",
8+
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiZDk0MjgwMjktMGY2My00MDU2LTg2ZGEtY2UxMGQ1NDViMWI5IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9LCJkYXRhIjp7IlMiOiJkYXRhLXg2YXE3Y2tkcGdrIn19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
9+
"approximateArrivalTimestamp": 1731924555.932
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000000:49657828409187701520019995242508390119953358325192589314",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
16+
"awsRegion": "eu-west-1",
17+
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
18+
},
19+
{
20+
"kinesis": {
21+
"kinesisSchemaVersion": "1.0",
22+
"partitionKey": "6037E47B707479B67E577C989D96E9F8",
23+
"sequenceNumber": "49657828409187701520019995242509599045772972954367295490",
24+
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiYWE1NmNhZDQtMzExYS00NmM4LWFiNWYtYzdhMTNhN2E2Mjk4IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9LCJkYXRhIjp7IlMiOiJkYXRhLTRlb21wanM4OW41In19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
25+
"approximateArrivalTimestamp": 1731924555.935
26+
},
27+
"eventSource": "aws:kinesis",
28+
"eventVersion": "1.0",
29+
"eventID": "shardId-000000000000:49657828409187701520019995242509599045772972954367295490",
30+
"eventName": "aws:kinesis:record",
31+
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
32+
"awsRegion": "eu-west-1",
33+
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
34+
}
35+
]
36+
}

Diff for: packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts

+31-30
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,35 @@
11
import { generateMock } from '@anatine/zod-mock';
22
import { describe, expect, it } from 'vitest';
3-
import { ZodError, type z } from 'zod';
3+
import { ZodError } from 'zod';
44
import { ParseError } from '../../../src';
55
import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js';
6-
import type { KinesisFirehoseSchema } from '../../../src/schemas/';
7-
import { TestEvents, TestSchema } from '../schema/utils.js';
6+
import type {
7+
KinesisFireHoseEvent,
8+
KinesisFireHoseSqsEvent,
9+
} from '../../../src/types';
10+
import { TestSchema, getTestEvent } from '../schema/utils.js';
811

912
describe('Kinesis Firehose Envelope', () => {
13+
const eventsPath = 'kinesis';
14+
const kinesisFirehosePutEvent = getTestEvent<KinesisFireHoseEvent>({
15+
eventsPath,
16+
filename: 'firehose-put',
17+
});
18+
19+
const kinesisFirehoseSQSEvent = getTestEvent<KinesisFireHoseSqsEvent>({
20+
eventsPath,
21+
filename: 'firehose-sqs',
22+
});
23+
24+
const kinesisFirehoseEvent = getTestEvent<KinesisFireHoseEvent>({
25+
eventsPath,
26+
filename: 'firehose',
27+
});
28+
1029
describe('parse', () => {
1130
it('should parse records for PutEvent', () => {
1231
const mock = generateMock(TestSchema);
13-
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
14-
typeof KinesisFirehoseSchema
15-
>;
32+
const testEvent = structuredClone(kinesisFirehosePutEvent);
1633

1734
testEvent.records.map((record) => {
1835
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
@@ -24,9 +41,7 @@ describe('Kinesis Firehose Envelope', () => {
2441

2542
it('should parse a single record for SQS event', () => {
2643
const mock = generateMock(TestSchema);
27-
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
28-
typeof KinesisFirehoseSchema
29-
>;
44+
const testEvent = structuredClone(kinesisFirehoseSQSEvent);
3045

3146
testEvent.records.map((record) => {
3247
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
@@ -38,9 +53,7 @@ describe('Kinesis Firehose Envelope', () => {
3853

3954
it('should parse records for kinesis event', () => {
4055
const mock = generateMock(TestSchema);
41-
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
42-
typeof KinesisFirehoseSchema
43-
>;
56+
const testEvent = structuredClone(kinesisFirehoseEvent);
4457

4558
testEvent.records.map((record) => {
4659
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
@@ -50,9 +63,7 @@ describe('Kinesis Firehose Envelope', () => {
5063
expect(resp).toEqual([mock, mock]);
5164
});
5265
it('should throw if record is not base64 encoded', () => {
53-
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
54-
typeof KinesisFirehoseSchema
55-
>;
66+
const testEvent = structuredClone(kinesisFirehosePutEvent);
5667

5768
testEvent.records.map((record) => {
5869
record.data = 'not base64 encoded';
@@ -68,9 +79,7 @@ describe('Kinesis Firehose Envelope', () => {
6879
}).toThrow();
6980
});
7081
it('should throw when schema does not match record', () => {
71-
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
72-
typeof KinesisFirehoseSchema
73-
>;
82+
const testEvent = structuredClone(kinesisFirehosePutEvent);
7483

7584
testEvent.records.map((record) => {
7685
record.data = Buffer.from('not a valid json').toString('base64');
@@ -84,9 +93,7 @@ describe('Kinesis Firehose Envelope', () => {
8493
describe('safeParse', () => {
8594
it('should parse records for PutEvent', () => {
8695
const mock = generateMock(TestSchema);
87-
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
88-
typeof KinesisFirehoseSchema
89-
>;
96+
const testEvent = structuredClone(kinesisFirehosePutEvent);
9097

9198
testEvent.records.map((record) => {
9299
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
@@ -98,9 +105,7 @@ describe('Kinesis Firehose Envelope', () => {
98105

99106
it('should parse a single record for SQS event', () => {
100107
const mock = generateMock(TestSchema);
101-
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
102-
typeof KinesisFirehoseSchema
103-
>;
108+
const testEvent = structuredClone(kinesisFirehoseSQSEvent);
104109

105110
testEvent.records.map((record) => {
106111
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
@@ -112,9 +117,7 @@ describe('Kinesis Firehose Envelope', () => {
112117

113118
it('should parse records for kinesis event', () => {
114119
const mock = generateMock(TestSchema);
115-
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
116-
typeof KinesisFirehoseSchema
117-
>;
120+
const testEvent = structuredClone(kinesisFirehoseEvent);
118121

119122
testEvent.records.map((record) => {
120123
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
@@ -139,9 +142,7 @@ describe('Kinesis Firehose Envelope', () => {
139142
}
140143
});
141144
it('should return original event if record is not base64 encoded', () => {
142-
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
143-
typeof KinesisFirehoseSchema
144-
>;
145+
const testEvent = structuredClone(kinesisFirehosePutEvent);
145146

146147
testEvent.records.map((record) => {
147148
record.data = 'not base64 encoded';

Diff for: packages/parser/tests/unit/envelopes/kinesis.test.ts

+14-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import { generateMock } from '@anatine/zod-mock';
2-
import type { KinesisStreamEvent } from 'aws-lambda';
32
import { describe, expect, it } from 'vitest';
43
import { KinesisEnvelope } from '../../../src/envelopes/index.js';
54
import { ParseError } from '../../../src/errors.js';
6-
import { TestEvents, TestSchema } from '../schema/utils.js';
5+
import type { KinesisDataStreamEvent } from '../../../src/types/schema.js';
6+
import { TestSchema, getTestEvent } from '../schema/utils.js';
77

88
describe('KinesisEnvelope', () => {
9+
const eventsPath = 'kinesis';
10+
11+
const kinesisStreamEvent = getTestEvent<KinesisDataStreamEvent>({
12+
eventsPath,
13+
filename: 'stream',
14+
});
15+
916
describe('parse', () => {
1017
it('should parse Kinesis Stream event', () => {
1118
const mock = generateMock(TestSchema);
12-
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
19+
const testEvent = structuredClone(kinesisStreamEvent);
1320

1421
testEvent.Records.map((record) => {
1522
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
@@ -24,16 +31,17 @@ describe('KinesisEnvelope', () => {
2431
expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow();
2532
});
2633
it('should throw if record is invalid', () => {
27-
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
34+
const testEvent = structuredClone(kinesisStreamEvent);
2835
testEvent.Records[0].kinesis.data = 'invalid';
36+
2937
expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow();
3038
});
3139
});
3240

3341
describe('safeParse', () => {
3442
it('should parse Kinesis Stream event', () => {
3543
const mock = generateMock(TestSchema);
36-
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
44+
const testEvent = structuredClone(kinesisStreamEvent);
3745

3846
testEvent.Records.map((record) => {
3947
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
@@ -54,7 +62,7 @@ describe('KinesisEnvelope', () => {
5462
});
5563
});
5664
it('should return original event if record is invalid', () => {
57-
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
65+
const testEvent = structuredClone(kinesisStreamEvent);
5866
testEvent.Records[0].kinesis.data = 'invalid';
5967
const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema);
6068
expect(parseResult).toEqual({

0 commit comments

Comments
 (0)