Skip to content

feat(parser): add schema for DynamoDB - Kinesis Stream event #3328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Parser comes with the following built-in schemas:
| **KafkaSelfManagedEventSchema** | Lambda Event Source payload for self managed Kafka payload |
| **KinesisDataStreamSchema** | Lambda Event Source payload for Amazon Kinesis Data Streams |
| **KinesisFirehoseSchema** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **KinesisDynamoDBStreamSchema** | Lambda Event Source payload for DynamodbStream record wrapped in Kinesis Data stream |
| **KinesisFirehoseSqsSchema** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
| **LambdaFunctionUrlSchema** | Lambda Event Source payload for Lambda Function URL payload |
| **S3EventNotificationEventBridgeSchema** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
Expand Down
14 changes: 14 additions & 0 deletions packages/parser/src/schemas/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ const DynamoDBStreamRecord = z.object({
userIdentity: UserIdentity.optional(),
});

const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
recordFormat: z.literal('application/json'),
tableName: z.string(),
userIdentity: UserIdentity.nullish(),
dynamodb: DynamoDBStreamChangeRecord.omit({
SequenceNumber: true,
StreamViewType: true,
}),
}).omit({
eventVersion: true,
eventSourceARN: true,
});

/**
* Zod schema for Amazon DynamoDB Stream event.
*
Expand Down Expand Up @@ -111,6 +124,7 @@ const DynamoDBStreamSchema = z.object({
});

export {
DynamoDBStreamToKinesisRecord,
DynamoDBStreamSchema,
DynamoDBStreamRecord,
DynamoDBStreamChangeRecord,
Expand Down
6 changes: 5 additions & 1 deletion packages/parser/src/schemas/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ export {
KafkaSelfManagedEventSchema,
KafkaRecordSchema,
} from './kafka.js';
export { KinesisDataStreamSchema, KinesisDataStreamRecord } from './kinesis.js';
export {
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
KinesisDataStreamRecord,
} from './kinesis.js';
export {
KinesisFirehoseSchema,
KinesisFirehoseSqsSchema,
Expand Down
23 changes: 20 additions & 3 deletions packages/parser/src/schemas/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { gunzipSync } from 'node:zlib';
import { z } from 'zod';
import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';

const KinesisDataStreamRecordPayload = z.object({
kinesisSchemaVersion: z.string(),
partitionKey: z.string(),
sequenceNumber: z.string(),
approximateArrivalTimestamp: z.number(),
data: z.string().transform((data) => {
const decompresed = decompress(data);
const decompressed = decompress(data);
const decoded = Buffer.from(data, 'base64').toString('utf-8');
try {
// If data was not compressed, try to parse it as JSON otherwise it must be string
return decompresed === data ? JSON.parse(decoded) : decompresed;
return decompressed === data ? JSON.parse(decoded) : decompressed;
} catch (e) {
return decoded;
}
Expand All @@ -37,6 +38,21 @@ const KinesisDataStreamRecord = z.object({
kinesis: KinesisDataStreamRecordPayload,
});

const KinesisDynamoDBStreamSchema = z.object({
Records: z.array(
KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: z
.string()
.transform((data) => {
return JSON.parse(Buffer.from(data, 'base64').toString('utf8'));
})
.pipe(DynamoDBStreamToKinesisRecord),
}),
})
),
});

/**
* Zod schema for Kinesis Data Stream event
*
Expand Down Expand Up @@ -88,7 +104,8 @@ const KinesisDataStreamSchema = z.object({
});

export {
KinesisDataStreamSchema,
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
};
4 changes: 4 additions & 0 deletions packages/parser/src/types/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
KafkaRecordSchema,
KafkaSelfManagedEventSchema,
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
KinesisFirehoseRecordSchema,
KinesisFirehoseSchema,
KinesisFirehoseSqsRecordSchema,
Expand Down Expand Up @@ -79,6 +80,8 @@ type KafkaMskEvent = z.infer<typeof KafkaMskEventSchema>;

type KinesisDataStreamEvent = z.infer<typeof KinesisDataStreamSchema>;

type KinesisDynamoDBStreamEvent = z.infer<typeof KinesisDynamoDBStreamSchema>;

type KinesisFireHoseEvent = z.infer<typeof KinesisFirehoseSchema>;

type KinesisFirehoseRecord = z.infer<typeof KinesisFirehoseRecordSchema>;
Expand Down Expand Up @@ -136,6 +139,7 @@ export type {
KafkaMskEvent,
KafkaRecord,
KinesisDataStreamEvent,
KinesisDynamoDBStreamEvent,
KinesisFireHoseEvent,
KinesisFirehoseRecord,
KinesisFireHoseSqsEvent,
Expand Down
36 changes: 36 additions & 0 deletions packages/parser/tests/events/kinesis/dynamodb-stream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "859F7C064A4818874FA67ABEC9BF2AF1",
"sequenceNumber": "49657828409187701520019995242508390119953358325192589314",
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiZDk0MjgwMjktMGY2My00MDU2LTg2ZGEtY2UxMGQ1NDViMWI5IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9LCJkYXRhIjp7IlMiOiJkYXRhLXg2YXE3Y2tkcGdrIn19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
"approximateArrivalTimestamp": 1731924555.932
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49657828409187701520019995242508390119953358325192589314",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "6037E47B707479B67E577C989D96E9F8",
"sequenceNumber": "49657828409187701520019995242509599045772972954367295490",
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiYWE1NmNhZDQtMzExYS00NmM4LWFiNWYtYzdhMTNhN2E2Mjk4IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9LCJkYXRhIjp7IlMiOiJkYXRhLTRlb21wanM4OW41In19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
"approximateArrivalTimestamp": 1731924555.935
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49657828409187701520019995242509599045772972954367295490",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
}
]
}
73 changes: 43 additions & 30 deletions packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import { generateMock } from '@anatine/zod-mock';
import { describe, expect, it } from 'vitest';
import { ZodError, type z } from 'zod';
import { ZodError } from 'zod';
import { ParseError } from '../../../src';
import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js';
import type { KinesisFirehoseSchema } from '../../../src/schemas/';
import { TestEvents, TestSchema } from '../schema/utils.js';
import type {
KinesisFireHoseEvent,
KinesisFireHoseSqsEvent,
} from '../../../src/types';
import { TestSchema, getTestEvent } from '../schema/utils.js';

describe('Kinesis Firehose Envelope', () => {
const eventsPath = 'kinesis';
describe('parse', () => {
it('should parse records for PutEvent', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -24,9 +29,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse a single record for SQS event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseSqsEvent>({
eventsPath,
filename: 'firehose-sqs',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -38,9 +44,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse records for kinesis event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -50,9 +57,10 @@ describe('Kinesis Firehose Envelope', () => {
expect(resp).toEqual([mock, mock]);
});
it('should throw if record is not base64 encoded', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = 'not base64 encoded';
Expand All @@ -68,9 +76,10 @@ describe('Kinesis Firehose Envelope', () => {
}).toThrow();
});
it('should throw when schema does not match record', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = Buffer.from('not a valid json').toString('base64');
Expand All @@ -84,9 +93,10 @@ describe('Kinesis Firehose Envelope', () => {
describe('safeParse', () => {
it('should parse records for PutEvent', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -98,9 +108,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse a single record for SQS event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseSqsEvent>({
eventsPath,
filename: 'firehose-sqs',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -112,9 +123,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse records for kinesis event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -139,9 +151,10 @@ describe('Kinesis Firehose Envelope', () => {
}
});
it('should return original event if record is not base64 encoded', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = 'not base64 encoded';
Expand Down
25 changes: 19 additions & 6 deletions packages/parser/tests/unit/envelopes/kinesis.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { generateMock } from '@anatine/zod-mock';
import type { KinesisStreamEvent } from 'aws-lambda';
import { describe, expect, it } from 'vitest';
import { KinesisEnvelope } from '../../../src/envelopes/index.js';
import { ParseError } from '../../../src/errors.js';
import { TestEvents, TestSchema } from '../schema/utils.js';
import type { KinesisDataStreamEvent } from '../../../src/types/schema.js';
import { TestSchema, getTestEvent } from '../schema/utils.js';

describe('KinesisEnvelope', () => {
const eventsPath = 'kinesis';
describe('parse', () => {
it('should parse Kinesis Stream event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});

testEvent.Records.map((record) => {
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
Expand All @@ -24,7 +28,10 @@ describe('KinesisEnvelope', () => {
expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow();
});
it('should throw if record is invalid', () => {
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});
testEvent.Records[0].kinesis.data = 'invalid';
expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow();
});
Expand All @@ -33,7 +40,10 @@ describe('KinesisEnvelope', () => {
describe('safeParse', () => {
it('should parse Kinesis Stream event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});

testEvent.Records.map((record) => {
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
Expand All @@ -54,7 +64,10 @@ describe('KinesisEnvelope', () => {
});
});
it('should return original event if record is invalid', () => {
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});
testEvent.Records[0].kinesis.data = 'invalid';
const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema);
expect(parseResult).toEqual({
Expand Down
Loading