diff --git a/packages/parser/src/envelopes/kafka.ts b/packages/parser/src/envelopes/kafka.ts index f0f0e36844..04fbb678d6 100644 --- a/packages/parser/src/envelopes/kafka.ts +++ b/packages/parser/src/envelopes/kafka.ts @@ -1,11 +1,33 @@ -import type { ZodSchema, z } from 'zod'; +import { ZodError, type ZodIssue, type ZodSchema, z } from 'zod'; import { ParseError } from '../errors.js'; import { KafkaMskEventSchema, KafkaSelfManagedEventSchema, } from '../schemas/kafka.js'; import type { KafkaMskEvent, ParsedResult } from '../types/index.js'; -import { Envelope, envelopeDiscriminator } from './envelope.js'; +import { envelopeDiscriminator } from './envelope.js'; + +/** + * Get the event source from the data. + * + * Before we can access the event source, we need to parse the data with a minimal schema. + * + * @param data - The data to extract the event source from + */ +const extractEventSource = ( + data: unknown +): 'aws:kafka' | 'SelfManagedKafka' => { + const verifiedData = z + .object({ + eventSource: z.union([ + z.literal('aws:kafka'), + z.literal('SelfManagedKafka'), + ]), + }) + .parse(data); + + return verifiedData.eventSource; +}; /** * Kafka event envelope to extract data within body key @@ -15,7 +37,6 @@ import { Envelope, envelopeDiscriminator } from './envelope.js'; * Note: Records will be parsed the same way so if model is str, * all items in the list will be parsed as str and not as JSON (and vice versa) */ - export const KafkaEnvelope = { /** * This is a discriminator to differentiate whether an envelope returns an array or an object @@ -23,21 +44,21 @@ export const KafkaEnvelope = { */ [envelopeDiscriminator]: 'array' as const, parse(data: unknown, schema: T): z.infer[] { - // manually fetch event source to decide between Msk or SelfManaged - const eventSource = (data as KafkaMskEvent).eventSource; + const eventSource = extractEventSource(data); - const parsedEnvelope: - | z.infer - | z.infer = + const parsedEnvelope = eventSource === 'aws:kafka' ? KafkaMskEventSchema.parse(data) : KafkaSelfManagedEventSchema.parse(data); - return Object.values(parsedEnvelope.records).map((topicRecord) => { - return topicRecord.map((record) => { - return Envelope.parse(record.value, schema); - }); - }); + const values: z.infer[] = []; + for (const topicRecord of Object.values(parsedEnvelope.records)) { + for (const record of topicRecord) { + values.push(schema.parse(record.value)); + } + } + + return values; }, safeParse( @@ -61,27 +82,37 @@ export const KafkaEnvelope = { originalEvent: data, }; } - const parsedRecords: z.infer[] = []; - for (const topicRecord of Object.values(parsedEnvelope.data.records)) { + const values: z.infer[] = []; + const issues: ZodIssue[] = []; + for (const [topicKey, topicRecord] of Object.entries( + parsedEnvelope.data.records + )) { for (const record of topicRecord) { - const parsedRecord = Envelope.safeParse(record.value, schema); + const parsedRecord = schema.safeParse(record.value); if (!parsedRecord.success) { - return { - success: false, - error: new ParseError('Failed to parse Kafka record', { - cause: parsedRecord.error, - }), - originalEvent: data, - }; + issues.push( + ...(parsedRecord.error as ZodError).issues.map((issue) => ({ + ...issue, + path: ['records', topicKey, ...issue.path], + })) + ); } - parsedRecords.push(parsedRecord.data); + values.push(parsedRecord.data); } } - return { - success: true, - data: parsedRecords, - }; + return issues.length > 0 + ? { + success: false, + error: new ParseError('Failed to parse Kafka envelope', { + cause: new ZodError(issues), + }), + originalEvent: data, + } + : { + success: true, + data: values, + }; }, }; diff --git a/packages/parser/src/schemas/kafka.ts b/packages/parser/src/schemas/kafka.ts index bab669f448..70b5b74edf 100644 --- a/packages/parser/src/schemas/kafka.ts +++ b/packages/parser/src/schemas/kafka.ts @@ -28,11 +28,9 @@ const KafkaRecordSchema = z.object({ const KafkaBaseEventSchema = z.object({ bootstrapServers: z .string() - .transform((bootstrapServers) => { - return bootstrapServers ? bootstrapServers.split(',') : undefined; - }) + .transform((bootstrapServers) => bootstrapServers.split(',')) .nullish(), - records: z.record(z.string(), z.array(KafkaRecordSchema)), + records: z.record(z.string(), z.array(KafkaRecordSchema).min(1)), }); /** Zod schema for Kafka event from Self Managed Kafka diff --git a/packages/parser/tests/events/kafkaEventMsk.json b/packages/parser/tests/events/kafka/base.json similarity index 100% rename from packages/parser/tests/events/kafkaEventMsk.json rename to packages/parser/tests/events/kafka/base.json diff --git a/packages/parser/tests/events/kafkaEventSelfManaged.json b/packages/parser/tests/events/kafkaEventSelfManaged.json deleted file mode 100644 index 775f58cd8c..0000000000 --- a/packages/parser/tests/events/kafkaEventSelfManaged.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "eventSource": "SelfManagedKafka", - "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", - "records": { - "mytopic-0": [ - { - "topic": "mytopic", - "partition": 0, - "offset": 15, - "timestamp": 1545084650987, - "timestampType": "CREATE_TIME", - "key": "cmVjb3JkS2V5", - "value": "eyJrZXkiOiJ2YWx1ZSJ9", - "headers": [ - { - "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] - } - ] - } - ] - } -} diff --git a/packages/parser/tests/unit/envelopes/kafka.test.ts b/packages/parser/tests/unit/envelopes/kafka.test.ts index 33bb9eae60..3ac48dada7 100644 --- a/packages/parser/tests/unit/envelopes/kafka.test.ts +++ b/packages/parser/tests/unit/envelopes/kafka.test.ts @@ -1,95 +1,130 @@ -import { generateMock } from '@anatine/zod-mock'; -import type { MSKEvent, SelfManagedKafkaEvent } from 'aws-lambda'; import { describe, expect, it } from 'vitest'; -import { ParseError } from '../../../src'; +import { ZodError, z } from 'zod'; import { KafkaEnvelope } from '../../../src/envelopes/index.js'; -import { TestEvents, TestSchema } from '../schema/utils.js'; - -describe('Kafka', () => { - describe('parse', () => { - it('should parse MSK kafka envelope', () => { - const mock = generateMock(TestSchema); - - const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent; - kafkaEvent.records['mytopic-0'][0].value = Buffer.from( - JSON.stringify(mock) - ).toString('base64'); +import { ParseError } from '../../../src/errors.js'; +import { JSONStringified } from '../../../src/helpers.js'; +import { getTestEvent } from '../schema/utils.js'; + +describe('Envelope: Kafka', () => { + const baseEvent = getTestEvent({ + eventsPath: 'kafka', + filename: 'base', + }); - const result = KafkaEnvelope.parse(kafkaEvent, TestSchema); + describe('Method: parse', () => { + it('throws if the payload of the value does not match the schema', () => { + // Prepare + const event = structuredClone(baseEvent); - expect(result).toEqual([[mock]]); + // Act & Assess + expect(() => KafkaEnvelope.parse(event, z.number())).toThrow(); }); - it('should parse Self Managed kafka envelope', () => { - const mock = generateMock(TestSchema); - - const kafkaEvent = - TestEvents.kafkaEventSelfManaged as SelfManagedKafkaEvent; - kafkaEvent.records['mytopic-0'][0].value = Buffer.from( - JSON.stringify(mock) - ).toString('base64'); + it('parses a Kafka event', () => { + // Prepare + const event = structuredClone(baseEvent); - const result = KafkaEnvelope.parse(kafkaEvent, TestSchema); + // Act + const result = KafkaEnvelope.parse(event, z.string()); - expect(result).toEqual([[mock]]); + // Assess + expect(result).toEqual(['{"key":"value"}']); }); - describe('safeParse', () => { - it('should parse MSK kafka envelope', () => { - const mock = generateMock(TestSchema); + it('parses a Kafka event and applies the schema transformation', () => { + // Prepare + const event = structuredClone(baseEvent); - const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent; - kafkaEvent.records['mytopic-0'][0].value = Buffer.from( - JSON.stringify(mock) - ).toString('base64'); + // Act + const result = KafkaEnvelope.parse( + event, + JSONStringified(z.object({ key: z.string() })) + ); - const result = KafkaEnvelope.safeParse(kafkaEvent, TestSchema); - - expect(result).toEqual({ - success: true, - data: [mock], - }); - }); - - it('should parse Self Managed kafka envelope', () => { - const mock = generateMock(TestSchema); + // Assess + expect(result).toEqual([{ key: 'value' }]); + }); - const kafkaEvent = - TestEvents.kafkaEventSelfManaged as SelfManagedKafkaEvent; - kafkaEvent.records['mytopic-0'][0].value = Buffer.from( - JSON.stringify(mock) - ).toString('base64'); + it('parses a self managed Kafka event', () => { + // Prepare + const event = structuredClone(baseEvent); + event.eventSource = 'SelfManagedKafka'; - const result = KafkaEnvelope.safeParse(kafkaEvent, TestSchema); + // Act + const result = KafkaEnvelope.parse(event, z.string()); - expect(result).toEqual({ - success: true, - data: [mock], - }); - }); + // Assess + expect(result).toEqual(['{"key":"value"}']); + }); + }); - it('should return original event on failure', () => { - const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent; - kafkaEvent.records['mytopic-0'][0].value = 'not a valid json'; + describe('Method: safeParse', () => { + it('parses a Kafka event', () => { + // Prepare + const event = structuredClone(baseEvent); - const parseResult = KafkaEnvelope.safeParse(kafkaEvent, TestSchema); + // Act + const result = KafkaEnvelope.safeParse(event, z.string()); - expect(parseResult).toEqual({ - success: false, - error: expect.any(ParseError), - originalEvent: kafkaEvent, - }); + // Assess + expect(result).toEqual({ + success: true, + data: ['{"key":"value"}'], + }); + }); - if (!parseResult.success && parseResult.error) { - expect(parseResult.error.cause).toBeInstanceOf(SyntaxError); - } + it('returns an error if the event is not a valid Kafka event', () => { + // Prepare + const event = structuredClone(baseEvent); + event.eventSource = 'SelfManagedKafka'; + // @ts-expect-error - Intentionally invalid event + event.records['mytopic-0'] = []; + + // Act + const result = KafkaEnvelope.safeParse(event, z.string()); + + // Assess + expect(result).toEqual({ + success: false, + error: new ParseError('Failed to parse Kafka envelope', { + cause: new ZodError([ + { + code: 'too_small', + minimum: 1, + type: 'array', + inclusive: true, + exact: false, + message: 'Array must contain at least 1 element(s)', + path: ['records', 'mytopic-0'], + }, + ]), + }), + originalEvent: event, }); - it('should return original event and error if envelope is invalid', () => { - expect(KafkaEnvelope.safeParse({ foo: 'bar' }, TestSchema)).toEqual({ - success: false, - error: expect.any(ParseError), - originalEvent: { foo: 'bar' }, - }); + }); + + it('returns the original event and the error if the payload of the value does not match the schema', () => { + // Prepare + const event = structuredClone(baseEvent); + + // Act + const result = KafkaEnvelope.safeParse(event, z.number()); + + // Assess + expect(result).toEqual({ + success: false, + error: new ParseError('Failed to parse Kafka envelope', { + cause: new ZodError([ + { + code: 'invalid_type', + expected: 'number', + received: 'string', + path: ['records', 'mytopic-0'], + message: 'Expected number, received string', + }, + ]), + }), + originalEvent: event, }); }); }); diff --git a/packages/parser/tests/unit/schema/kafka.test.ts b/packages/parser/tests/unit/schema/kafka.test.ts index 0c4fe096ba..ac1c0316f8 100644 --- a/packages/parser/tests/unit/schema/kafka.test.ts +++ b/packages/parser/tests/unit/schema/kafka.test.ts @@ -1,69 +1,99 @@ import { describe, expect, it } from 'vitest'; import { KafkaMskEventSchema, - KafkaRecordSchema, KafkaSelfManagedEventSchema, -} from '../../../src/schemas/'; -import type { KafkaSelfManagedEvent } from '../../../src/types'; -import type { KafkaRecord } from '../../../src/types/schema'; -import { TestEvents } from './utils.js'; +} from '../../../src/schemas/kafka.js'; +import type { + KafkaMskEvent, + KafkaSelfManagedEvent, +} from '../../../src/types/schema.js'; +import { getTestEvent, omit } from './utils.js'; -describe('Kafka ', () => { - const expectedTestEvent = { - key: 'recordKey', - value: JSON.stringify({ key: 'value' }), - partition: 0, - topic: 'mytopic', - offset: 15, - timestamp: 1545084650987, - timestampType: 'CREATE_TIME', - headers: [ - { - headerKey: 'headerValue', - }, - ], - }; - it('should parse kafka MSK event', () => { - const kafkaEventMsk = TestEvents.kafkaEventMsk; - - expect( - KafkaMskEventSchema.parse(kafkaEventMsk).records['mytopic-0'][0] - ).toEqual(expectedTestEvent); +describe('Schema: Kafka', () => { + const baseEvent = getTestEvent({ + eventsPath: 'kafka', + filename: 'base', }); - it('should parse kafka self managed event', () => { - const kafkaEventSelfManaged = TestEvents.kafkaEventSelfManaged; - expect( - KafkaSelfManagedEventSchema.parse(kafkaEventSelfManaged).records[ - 'mytopic-0' - ][0] - ).toEqual(expectedTestEvent); - }); - it('should transform bootstrapServers to array', () => { - const kafkaEventSelfManaged = TestEvents.kafkaEventSelfManaged; + it('parses a Kafka MSK event', () => { + // Prepare + const event = structuredClone(baseEvent); - expect( - KafkaSelfManagedEventSchema.parse(kafkaEventSelfManaged).bootstrapServers - ).toEqual([ - 'b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092', - 'b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092', - ]); + // Act + const result = KafkaMskEventSchema.parse(event); + + // Assess + expect(result).toStrictEqual({ + eventSource: 'aws:kafka', + eventSourceArn: + 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4', + bootstrapServers: [ + 'b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092', + 'b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092', + ], + records: { + 'mytopic-0': [ + { + topic: 'mytopic', + partition: 0, + offset: 15, + timestamp: 1545084650987, + timestampType: 'CREATE_TIME', + key: 'recordKey', + value: `{"key":"value"}`, + headers: [ + { + headerKey: 'headerValue', + }, + ], + }, + ], + }, + }); }); - it('should return undefined if bootstrapServers is not present', () => { - const kafkaEventSelfManaged = TestEvents.kafkaEventSelfManaged as { - bootstrapServers: string; - }; - kafkaEventSelfManaged.bootstrapServers = ''; - const parsed = KafkaSelfManagedEventSchema.parse(kafkaEventSelfManaged); - expect(parsed.bootstrapServers).toBeUndefined(); + it('throws if the event is not a Kafka MSK event', () => { + // Prepare + const event = structuredClone(baseEvent); + event.records['mytopic-0'] = []; + + // Act & Assess + expect(() => KafkaMskEventSchema.parse(event)).toThrow(); }); - it('should parse kafka record from kafka event', () => { - const kafkaEventMsk: KafkaSelfManagedEvent = - TestEvents.kafkaEventSelfManaged as KafkaSelfManagedEvent; - const parsedRecord: KafkaRecord = KafkaRecordSchema.parse( - kafkaEventMsk.records['mytopic-0'][0] + + it('parses a Kafka self-managed event', () => { + // Prepare + const event = omit( + ['eventSourceArn', 'bootstrapServers'], + structuredClone(baseEvent) ); - expect(parsedRecord.topic).toEqual('mytopic'); + (event as unknown as KafkaSelfManagedEvent).eventSource = + 'SelfManagedKafka'; + + // Act + const result = KafkaSelfManagedEventSchema.parse(event); + + // Assess + expect(result).toStrictEqual({ + eventSource: 'SelfManagedKafka', + records: { + 'mytopic-0': [ + { + topic: 'mytopic', + partition: 0, + offset: 15, + timestamp: 1545084650987, + timestampType: 'CREATE_TIME', + key: 'recordKey', + value: `{"key":"value"}`, + headers: [ + { + headerKey: 'headerValue', + }, + ], + }, + ], + }, + }); }); });