Skip to content

Commit bd6b24a

Browse files
dreamorosiam29d
andauthored
fix(parser): Kafka Envelope + tests (#3489)
Co-authored-by: Alexander Schueren <[email protected]>
1 parent 2b19658 commit bd6b24a

File tree

6 files changed

+253
-181
lines changed

6 files changed

+253
-181
lines changed

Diff for: packages/parser/src/envelopes/kafka.ts

+59-28
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,33 @@
1-
import type { ZodSchema, z } from 'zod';
1+
import { ZodError, type ZodIssue, type ZodSchema, z } from 'zod';
22
import { ParseError } from '../errors.js';
33
import {
44
KafkaMskEventSchema,
55
KafkaSelfManagedEventSchema,
66
} from '../schemas/kafka.js';
77
import type { KafkaMskEvent, ParsedResult } from '../types/index.js';
8-
import { Envelope, envelopeDiscriminator } from './envelope.js';
8+
import { envelopeDiscriminator } from './envelope.js';
9+
10+
/**
11+
* Get the event source from the data.
12+
*
13+
* Before we can access the event source, we need to parse the data with a minimal schema.
14+
*
15+
* @param data - The data to extract the event source from
16+
*/
17+
const extractEventSource = (
18+
data: unknown
19+
): 'aws:kafka' | 'SelfManagedKafka' => {
20+
const verifiedData = z
21+
.object({
22+
eventSource: z.union([
23+
z.literal('aws:kafka'),
24+
z.literal('SelfManagedKafka'),
25+
]),
26+
})
27+
.parse(data);
28+
29+
return verifiedData.eventSource;
30+
};
931

1032
/**
1133
* Kafka event envelope to extract data within body key
@@ -15,29 +37,28 @@ import { Envelope, envelopeDiscriminator } from './envelope.js';
1537
* Note: Records will be parsed the same way so if model is str,
1638
* all items in the list will be parsed as str and not as JSON (and vice versa)
1739
*/
18-
1940
export const KafkaEnvelope = {
2041
/**
2142
* This is a discriminator to differentiate whether an envelope returns an array or an object
2243
* @hidden
2344
*/
2445
[envelopeDiscriminator]: 'array' as const,
2546
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
26-
// manually fetch event source to decide between Msk or SelfManaged
27-
const eventSource = (data as KafkaMskEvent).eventSource;
47+
const eventSource = extractEventSource(data);
2848

29-
const parsedEnvelope:
30-
| z.infer<typeof KafkaMskEventSchema>
31-
| z.infer<typeof KafkaSelfManagedEventSchema> =
49+
const parsedEnvelope =
3250
eventSource === 'aws:kafka'
3351
? KafkaMskEventSchema.parse(data)
3452
: KafkaSelfManagedEventSchema.parse(data);
3553

36-
return Object.values(parsedEnvelope.records).map((topicRecord) => {
37-
return topicRecord.map((record) => {
38-
return Envelope.parse(record.value, schema);
39-
});
40-
});
54+
const values: z.infer<T>[] = [];
55+
for (const topicRecord of Object.values(parsedEnvelope.records)) {
56+
for (const record of topicRecord) {
57+
values.push(schema.parse(record.value));
58+
}
59+
}
60+
61+
return values;
4162
},
4263

4364
safeParse<T extends ZodSchema>(
@@ -61,27 +82,37 @@ export const KafkaEnvelope = {
6182
originalEvent: data,
6283
};
6384
}
64-
const parsedRecords: z.infer<T>[] = [];
6585

66-
for (const topicRecord of Object.values(parsedEnvelope.data.records)) {
86+
const values: z.infer<T>[] = [];
87+
const issues: ZodIssue[] = [];
88+
for (const [topicKey, topicRecord] of Object.entries(
89+
parsedEnvelope.data.records
90+
)) {
6791
for (const record of topicRecord) {
68-
const parsedRecord = Envelope.safeParse(record.value, schema);
92+
const parsedRecord = schema.safeParse(record.value);
6993
if (!parsedRecord.success) {
70-
return {
71-
success: false,
72-
error: new ParseError('Failed to parse Kafka record', {
73-
cause: parsedRecord.error,
74-
}),
75-
originalEvent: data,
76-
};
94+
issues.push(
95+
...(parsedRecord.error as ZodError).issues.map((issue) => ({
96+
...issue,
97+
path: ['records', topicKey, ...issue.path],
98+
}))
99+
);
77100
}
78-
parsedRecords.push(parsedRecord.data);
101+
values.push(parsedRecord.data);
79102
}
80103
}
81104

82-
return {
83-
success: true,
84-
data: parsedRecords,
85-
};
105+
return issues.length > 0
106+
? {
107+
success: false,
108+
error: new ParseError('Failed to parse Kafka envelope', {
109+
cause: new ZodError(issues),
110+
}),
111+
originalEvent: data,
112+
}
113+
: {
114+
success: true,
115+
data: values,
116+
};
86117
},
87118
};

Diff for: packages/parser/src/schemas/kafka.ts

+2-4
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@ const KafkaRecordSchema = z.object({
2828
const KafkaBaseEventSchema = z.object({
2929
bootstrapServers: z
3030
.string()
31-
.transform((bootstrapServers) => {
32-
return bootstrapServers ? bootstrapServers.split(',') : undefined;
33-
})
31+
.transform((bootstrapServers) => bootstrapServers.split(','))
3432
.nullish(),
35-
records: z.record(z.string(), z.array(KafkaRecordSchema)),
33+
records: z.record(z.string(), z.array(KafkaRecordSchema).min(1)),
3634
});
3735

3836
/** Zod schema for Kafka event from Self Managed Kafka

Diff for: packages/parser/tests/events/kafkaEventSelfManaged.json

-22
This file was deleted.

Diff for: packages/parser/tests/unit/envelopes/kafka.test.ts

+107-72
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,130 @@
1-
import { generateMock } from '@anatine/zod-mock';
2-
import type { MSKEvent, SelfManagedKafkaEvent } from 'aws-lambda';
31
import { describe, expect, it } from 'vitest';
4-
import { ParseError } from '../../../src';
2+
import { ZodError, z } from 'zod';
53
import { KafkaEnvelope } from '../../../src/envelopes/index.js';
6-
import { TestEvents, TestSchema } from '../schema/utils.js';
7-
8-
describe('Kafka', () => {
9-
describe('parse', () => {
10-
it('should parse MSK kafka envelope', () => {
11-
const mock = generateMock(TestSchema);
12-
13-
const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent;
14-
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
15-
JSON.stringify(mock)
16-
).toString('base64');
4+
import { ParseError } from '../../../src/errors.js';
5+
import { JSONStringified } from '../../../src/helpers.js';
6+
import { getTestEvent } from '../schema/utils.js';
7+
8+
describe('Envelope: Kafka', () => {
9+
const baseEvent = getTestEvent({
10+
eventsPath: 'kafka',
11+
filename: 'base',
12+
});
1713

18-
const result = KafkaEnvelope.parse(kafkaEvent, TestSchema);
14+
describe('Method: parse', () => {
15+
it('throws if the payload of the value does not match the schema', () => {
16+
// Prepare
17+
const event = structuredClone(baseEvent);
1918

20-
expect(result).toEqual([[mock]]);
19+
// Act & Assess
20+
expect(() => KafkaEnvelope.parse(event, z.number())).toThrow();
2121
});
2222

23-
it('should parse Self Managed kafka envelope', () => {
24-
const mock = generateMock(TestSchema);
25-
26-
const kafkaEvent =
27-
TestEvents.kafkaEventSelfManaged as SelfManagedKafkaEvent;
28-
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
29-
JSON.stringify(mock)
30-
).toString('base64');
23+
it('parses a Kafka event', () => {
24+
// Prepare
25+
const event = structuredClone(baseEvent);
3126

32-
const result = KafkaEnvelope.parse(kafkaEvent, TestSchema);
27+
// Act
28+
const result = KafkaEnvelope.parse(event, z.string());
3329

34-
expect(result).toEqual([[mock]]);
30+
// Assess
31+
expect(result).toEqual(['{"key":"value"}']);
3532
});
3633

37-
describe('safeParse', () => {
38-
it('should parse MSK kafka envelope', () => {
39-
const mock = generateMock(TestSchema);
34+
it('parses a Kafka event and applies the schema transformation', () => {
35+
// Prepare
36+
const event = structuredClone(baseEvent);
4037

41-
const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent;
42-
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
43-
JSON.stringify(mock)
44-
).toString('base64');
38+
// Act
39+
const result = KafkaEnvelope.parse(
40+
event,
41+
JSONStringified(z.object({ key: z.string() }))
42+
);
4543

46-
const result = KafkaEnvelope.safeParse(kafkaEvent, TestSchema);
47-
48-
expect(result).toEqual({
49-
success: true,
50-
data: [mock],
51-
});
52-
});
53-
54-
it('should parse Self Managed kafka envelope', () => {
55-
const mock = generateMock(TestSchema);
44+
// Assess
45+
expect(result).toEqual([{ key: 'value' }]);
46+
});
5647

57-
const kafkaEvent =
58-
TestEvents.kafkaEventSelfManaged as SelfManagedKafkaEvent;
59-
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
60-
JSON.stringify(mock)
61-
).toString('base64');
48+
it('parses a self managed Kafka event', () => {
49+
// Prepare
50+
const event = structuredClone(baseEvent);
51+
event.eventSource = 'SelfManagedKafka';
6252

63-
const result = KafkaEnvelope.safeParse(kafkaEvent, TestSchema);
53+
// Act
54+
const result = KafkaEnvelope.parse(event, z.string());
6455

65-
expect(result).toEqual({
66-
success: true,
67-
data: [mock],
68-
});
69-
});
56+
// Assess
57+
expect(result).toEqual(['{"key":"value"}']);
58+
});
59+
});
7060

71-
it('should return original event on failure', () => {
72-
const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent;
73-
kafkaEvent.records['mytopic-0'][0].value = 'not a valid json';
61+
describe('Method: safeParse', () => {
62+
it('parses a Kafka event', () => {
63+
// Prepare
64+
const event = structuredClone(baseEvent);
7465

75-
const parseResult = KafkaEnvelope.safeParse(kafkaEvent, TestSchema);
66+
// Act
67+
const result = KafkaEnvelope.safeParse(event, z.string());
7668

77-
expect(parseResult).toEqual({
78-
success: false,
79-
error: expect.any(ParseError),
80-
originalEvent: kafkaEvent,
81-
});
69+
// Assess
70+
expect(result).toEqual({
71+
success: true,
72+
data: ['{"key":"value"}'],
73+
});
74+
});
8275

83-
if (!parseResult.success && parseResult.error) {
84-
expect(parseResult.error.cause).toBeInstanceOf(SyntaxError);
85-
}
76+
it('returns an error if the event is not a valid Kafka event', () => {
77+
// Prepare
78+
const event = structuredClone(baseEvent);
79+
event.eventSource = 'SelfManagedKafka';
80+
// @ts-expect-error - Intentionally invalid event
81+
event.records['mytopic-0'] = [];
82+
83+
// Act
84+
const result = KafkaEnvelope.safeParse(event, z.string());
85+
86+
// Assess
87+
expect(result).toEqual({
88+
success: false,
89+
error: new ParseError('Failed to parse Kafka envelope', {
90+
cause: new ZodError([
91+
{
92+
code: 'too_small',
93+
minimum: 1,
94+
type: 'array',
95+
inclusive: true,
96+
exact: false,
97+
message: 'Array must contain at least 1 element(s)',
98+
path: ['records', 'mytopic-0'],
99+
},
100+
]),
101+
}),
102+
originalEvent: event,
86103
});
87-
it('should return original event and error if envelope is invalid', () => {
88-
expect(KafkaEnvelope.safeParse({ foo: 'bar' }, TestSchema)).toEqual({
89-
success: false,
90-
error: expect.any(ParseError),
91-
originalEvent: { foo: 'bar' },
92-
});
104+
});
105+
106+
it('returns the original event and the error if the payload of the value does not match the schema', () => {
107+
// Prepare
108+
const event = structuredClone(baseEvent);
109+
110+
// Act
111+
const result = KafkaEnvelope.safeParse(event, z.number());
112+
113+
// Assess
114+
expect(result).toEqual({
115+
success: false,
116+
error: new ParseError('Failed to parse Kafka envelope', {
117+
cause: new ZodError([
118+
{
119+
code: 'invalid_type',
120+
expected: 'number',
121+
received: 'string',
122+
path: ['records', 'mytopic-0'],
123+
message: 'Expected number, received string',
124+
},
125+
]),
126+
}),
127+
originalEvent: event,
93128
});
94129
});
95130
});

0 commit comments

Comments
 (0)