Skip to content

Commit 246f132

Browse files
authored
fix(parser): add aws region to kinesis event (#3260)
1 parent 1ff97cb commit 246f132

File tree

3 files changed

+102
-28
lines changed

3 files changed

+102
-28
lines changed

Diff for: packages/parser/src/schemas/kinesis-firehose.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { z } from 'zod';
22
import { SqsRecordSchema } from './sqs.js';
33

4-
const KinesisRecordMetaData = z.object({
4+
const KinesisRecordMetadata = z.object({
55
shardId: z.string(),
66
partitionKey: z.string(),
77
approximateArrivalTimestamp: z.number().positive(),
@@ -12,7 +12,7 @@ const KinesisRecordMetaData = z.object({
1212
const KinesisFireHoseRecordBase = z.object({
1313
recordId: z.string(),
1414
approximateArrivalTimestamp: z.number().positive(),
15-
kinesisRecordMetaData: KinesisRecordMetaData.optional(),
15+
kinesisRecordMetadata: KinesisRecordMetadata.nullish(),
1616
});
1717

1818
const KinesisFireHoseBaseSchema = z.object({

Diff for: packages/parser/src/schemas/kinesis.ts

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const KinesisDataStreamRecord = z.object({
3131
eventVersion: z.string(),
3232
eventID: z.string(),
3333
eventName: z.literal('aws:kinesis:record'),
34+
awsRegion: z.string(),
3435
invokeIdentityArn: z.string(),
3536
eventSourceARN: z.string(),
3637
kinesis: KinesisDataStreamRecordPayload,

Diff for: packages/parser/tests/unit/schema/kinesis.test.ts

+99-26
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
* @group unit/parser/schema/
55
*/
66

7+
import { gunzipSync } from 'node:zlib';
78
import {
89
KinesisDataStreamRecord,
910
KinesisDataStreamSchema,
1011
KinesisFirehoseRecordSchema,
1112
KinesisFirehoseSchema,
1213
KinesisFirehoseSqsRecordSchema,
1314
KinesisFirehoseSqsSchema,
15+
SqsRecordSchema,
1416
} from '../../../src/schemas/';
1517
import type {
1618
KinesisDataStreamEvent,
@@ -25,61 +27,131 @@ import { TestEvents } from './utils.js';
2527

2628
describe('Kinesis ', () => {
2729
it('should parse kinesis event', () => {
28-
const kinesisStreamEvent = TestEvents.kinesisStreamEvent;
30+
const kinesisStreamEvent =
31+
TestEvents.kinesisStreamEvent as KinesisDataStreamEvent;
2932
const parsed = KinesisDataStreamSchema.parse(kinesisStreamEvent);
3033

31-
expect(parsed.Records[0].kinesis.data).toEqual('Hello, this is a test.');
34+
const transformedInput = {
35+
Records: kinesisStreamEvent.Records.map((record, index) => {
36+
return {
37+
...record,
38+
kinesis: {
39+
...record.kinesis,
40+
data: Buffer.from(record.kinesis.data, 'base64').toString(),
41+
},
42+
};
43+
}),
44+
};
45+
46+
expect(parsed).toStrictEqual(transformedInput);
3247
});
3348
it('should parse single kinesis record', () => {
34-
const kinesisStreamEventOneRecord = TestEvents.kinesisStreamEventOneRecord;
49+
const kinesisStreamEventOneRecord =
50+
TestEvents.kinesisStreamEventOneRecord as KinesisDataStreamEvent;
3551
const parsed = KinesisDataStreamSchema.parse(kinesisStreamEventOneRecord);
3652

37-
expect(parsed.Records[0].kinesis.data).toEqual({
38-
message: 'test message',
39-
username: 'test',
40-
});
53+
const transformedInput = {
54+
Records: kinesisStreamEventOneRecord.Records.map((record, index) => {
55+
return {
56+
...record,
57+
kinesis: {
58+
...record.kinesis,
59+
data: JSON.parse(
60+
Buffer.from(record.kinesis.data, 'base64').toString()
61+
),
62+
},
63+
};
64+
}),
65+
};
66+
67+
expect(parsed).toStrictEqual(transformedInput);
4168
});
4269
it('should parse Firehose event', () => {
43-
const kinesisFirehoseKinesisEvent = TestEvents.kinesisFirehoseKinesisEvent;
70+
const kinesisFirehoseKinesisEvent =
71+
TestEvents.kinesisFirehoseKinesisEvent as KinesisFireHoseEvent;
4472
const parsed = KinesisFirehoseSchema.parse(kinesisFirehoseKinesisEvent);
45-
expect(parsed.records[0].data).toEqual('Hello World');
73+
74+
const transformedInput = {
75+
...kinesisFirehoseKinesisEvent,
76+
records: kinesisFirehoseKinesisEvent.records.map((record) => {
77+
return {
78+
...record,
79+
data: Buffer.from(record.data, 'base64').toString(),
80+
kinesisRecordMetadata: record.kinesisRecordMetadata,
81+
};
82+
}),
83+
};
84+
expect(parsed).toStrictEqual(transformedInput);
4685
});
4786
it('should parse Kinesis Firehose PutEvents event', () => {
48-
const kinesisFirehosePutEvent = TestEvents.kinesisFirehosePutEvent;
87+
const kinesisFirehosePutEvent =
88+
TestEvents.kinesisFirehosePutEvent as KinesisFireHoseEvent;
4989
const parsed = KinesisFirehoseSchema.parse(kinesisFirehosePutEvent);
50-
expect(JSON.parse(parsed.records[1].data)).toEqual({
51-
Hello: 'World',
52-
});
90+
91+
const transformedInput = {
92+
...kinesisFirehosePutEvent,
93+
records: kinesisFirehosePutEvent.records.map((record) => {
94+
return {
95+
...record,
96+
data: Buffer.from(record.data, 'base64').toString(),
97+
};
98+
}),
99+
};
100+
101+
expect(parsed).toStrictEqual(transformedInput);
53102
});
54103
it('should parse Firehose event with SQS event', () => {
55-
const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent;
104+
const kinesisFirehoseSQSEvent =
105+
TestEvents.kinesisFirehoseSQSEvent as KinesisFireHoseSqsEvent;
56106
const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent);
57-
expect(parsed.records[0].data).toMatchObject({
58-
messageId: '5ab807d4-5644-4c55-97a3-47396635ac74',
59-
body: 'Test message.',
60-
});
107+
108+
const transformedInput = {
109+
...kinesisFirehoseSQSEvent,
110+
records: kinesisFirehoseSQSEvent.records.map((record) => {
111+
return {
112+
...record,
113+
data: JSON.parse(
114+
Buffer.from(record.data as string, 'base64').toString()
115+
),
116+
};
117+
}),
118+
};
119+
120+
expect(parsed).toStrictEqual(transformedInput);
61121
});
62122
it('should parse Kinesis event with CloudWatch event', () => {
63123
const kinesisStreamCloudWatchLogsEvent =
64-
TestEvents.kinesisStreamCloudWatchLogsEvent;
124+
TestEvents.kinesisStreamCloudWatchLogsEvent as KinesisDataStreamEvent;
65125
const parsed = KinesisDataStreamSchema.parse(
66126
kinesisStreamCloudWatchLogsEvent
67127
);
68128

69-
expect(parsed.Records[0].kinesis.data).toMatchObject({
70-
messageType: 'DATA_MESSAGE',
71-
owner: '231436140809',
72-
logGroup: '/aws/lambda/pt-1488-DummyLogDataFunction-gnWXPvL6jJyG',
73-
logStream: '2022/11/10/[$LATEST]26b6a45d574f442ea28438923cbf7bf7',
74-
});
129+
const transformedInput = {
130+
Records: kinesisStreamCloudWatchLogsEvent.Records.map((record, index) => {
131+
return {
132+
...record,
133+
kinesis: {
134+
...record.kinesis,
135+
data: JSON.parse(
136+
gunzipSync(Buffer.from(record.kinesis.data, 'base64')).toString(
137+
'utf8'
138+
)
139+
),
140+
},
141+
};
142+
}),
143+
};
144+
145+
expect(parsed).toStrictEqual(transformedInput);
75146
});
76147
it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => {
77148
const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent as {
78149
records: { data: string }[];
79150
};
80151
kinesisFirehoseSQSEvent.records[0].data = 'not a valid json';
81152
const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent);
82-
expect(parsed.records[0].data).toEqual('not a valid json');
153+
154+
expect(parsed).toStrictEqual(kinesisFirehoseSQSEvent);
83155
});
84156
it('should parse a kinesis record from a kinesis event', () => {
85157
const kinesisStreamEvent: KinesisDataStreamEvent =
@@ -88,6 +160,7 @@ describe('Kinesis ', () => {
88160
kinesisStreamEvent.Records[0]
89161
);
90162

163+
expect(parsedRecord.eventSource).toEqual('aws:kinesis');
91164
expect(parsedRecord.eventName).toEqual('aws:kinesis:record');
92165
});
93166

0 commit comments

Comments
 (0)