From 3935a47164978e75fb006efb8eac019909285b1c Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Fri, 24 Jan 2025 12:18:28 +0100 Subject: [PATCH] fix(parser): KinesisFirehose SQS shema should fail for invalid SQS message --- .../parser/src/schemas/kinesis-firehose.ts | 10 ++- .../events/kinesis/firehose-sqs-invalid.json | 12 +++ .../parser/tests/unit/schema/kinesis.test.ts | 77 ++++++++++++++----- 3 files changed, 78 insertions(+), 21 deletions(-) create mode 100644 packages/parser/tests/events/kinesis/firehose-sqs-invalid.json diff --git a/packages/parser/src/schemas/kinesis-firehose.ts b/packages/parser/src/schemas/kinesis-firehose.ts index a983a7b017..529eab2cf5 100644 --- a/packages/parser/src/schemas/kinesis-firehose.ts +++ b/packages/parser/src/schemas/kinesis-firehose.ts @@ -35,13 +35,19 @@ const KinesisFirehoseRecordSchema = KinesisFireHoseRecordBase.extend({ * Zod schema for a SQS record from an Kinesis Firehose event. */ const KinesisFirehoseSqsRecordSchema = KinesisFireHoseRecordBase.extend({ - data: z.string().transform((data) => { + data: z.string().transform((data, ctx) => { try { return SqsRecordSchema.parse( JSON.parse(Buffer.from(data, 'base64').toString('utf8')) ); } catch (e) { - return data; + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'Failed to parse SQS record', + fatal: true, + }); + + return z.NEVER; } }), }); diff --git a/packages/parser/tests/events/kinesis/firehose-sqs-invalid.json b/packages/parser/tests/events/kinesis/firehose-sqs-invalid.json new file mode 100644 index 0000000000..757b8afbad --- /dev/null +++ b/packages/parser/tests/events/kinesis/firehose-sqs-invalid.json @@ -0,0 +1,12 @@ +{ + "invocationId": "556b67a3-48fc-4385-af49-e133aade9cb9", + "deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE", + "region": "us-east-1", + "records": [ + { + "recordId": "49640912821178817833517986466168945147170627572855734274000000", + "approximateArrivalTimestamp": 1684864917398, + "data": "bm90IGEgdmFsaWQgamFzb24=" + } + ] +} diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index f2c638770a..5948e1fc45 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -21,7 +21,7 @@ import type { } from '../../../src/types/schema'; import { getTestEvent } from './utils.js'; -describe('Kinesis ', () => { +describe('Schema: Kinesis', () => { const eventsPath = 'kinesis'; const kinesisStreamEvent = getTestEvent({ @@ -56,8 +56,11 @@ describe('Kinesis ', () => { } ); - it('should parse kinesis event', () => { + it('parses kinesis event', () => { + // Prepare const testEvent = structuredClone(kinesisStreamEvent); + + // Act const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { @@ -72,10 +75,14 @@ describe('Kinesis ', () => { }), }; + // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('should parse single kinesis record', () => { + it('parses single kinesis record', () => { + // Prepare const testEvent = structuredClone(kinesisStreamEventOneRecord); + + // Act const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { @@ -92,10 +99,14 @@ describe('Kinesis ', () => { }), }; + // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('should parse Firehose event', () => { + it('parses Firehose event', () => { + // Prepare const testEvent = structuredClone(kinesisFirehoseEvent); + + // Act const parsed = KinesisFirehoseSchema.parse(testEvent); const transformedInput = { @@ -108,11 +119,15 @@ describe('Kinesis ', () => { }; }), }; + + // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('should parse Kinesis Firehose PutEvents event', () => { + it('parses Kinesis Firehose PutEvents event', () => { + // Prepare const testEvent = structuredClone(kinesisFirehosePutEvent); + // Act const parsed = KinesisFirehoseSchema.parse(testEvent); const transformedInput = { @@ -125,11 +140,14 @@ describe('Kinesis ', () => { }), }; + // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('should parse Firehose event with SQS event', () => { + it('parses Firehose event with SQS event', () => { + // Prepare const testEvent = structuredClone(kinesisFirehoseSQSEvent); + // Act const parsed = KinesisFirehoseSqsSchema.parse(testEvent); const transformedInput = { @@ -138,21 +156,24 @@ describe('Kinesis ', () => { return { ...record, data: JSON.parse( - Buffer.from(record.data as string, 'base64').toString() + Buffer.from(record.data as unknown as string, 'base64').toString() ), }; }), }; + // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('should parse Kinesis event with CloudWatch event', () => { + it('parses Kinesis event with CloudWatch event', () => { + // Prepare const testEvent = structuredClone(kinesisStreamCloudWatchLogsEvent); + // Act const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { - Records: testEvent.Records.map((record, index) => { + Records: testEvent.Records.map((record) => { return { ...record, kinesis: { @@ -167,31 +188,39 @@ describe('Kinesis ', () => { }), }; + // Assess expect(parsed).toStrictEqual(transformedInput); }); - it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => { - const testEvent = structuredClone(kinesisFirehoseSQSEvent); - testEvent.records[0].data = 'not a valid json'; - - const parsed = KinesisFirehoseSqsSchema.parse(testEvent); + it('throws if cannot parse SQS record of KinesisFirehoseSqsRecord', () => { + // Prepare + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs-invalid', + }); - expect(parsed).toStrictEqual(testEvent); + // Act & Assess + expect(() => KinesisFirehoseSqsSchema.parse(testEvent)).toThrow(); }); - it('should parse a kinesis record from a kinesis event', () => { + it('parses a kinesis record from a kinesis event', () => { + // Prepare const testEvent: KinesisDataStreamEvent = structuredClone(kinesisStreamEvent); + // Act const parsedRecord = KinesisDataStreamRecord.parse(testEvent.Records[0]); + // Assess expect(parsedRecord.eventSource).toEqual('aws:kinesis'); expect(parsedRecord.eventName).toEqual('aws:kinesis:record'); }); - it('should parse a kinesis record from dynamodb stream event', () => { + it('parses a kinesis record from dynamodb stream event', () => { + // Prepare const testEvent = getTestEvent({ eventsPath, filename: 'dynamodb-stream', }); + const expectedRecords = [ { awsRegion: 'eu-west-1', @@ -231,26 +260,36 @@ describe('Kinesis ', () => { }, ]; + // Act const parsedRecord = KinesisDynamoDBStreamSchema.parse(testEvent); + // Assess expect(parsedRecord.Records.map((record) => record.kinesis.data)).toEqual( expectedRecords ); }); - it('should parse a kinesis firehose record from a kinesis firehose event', () => { + it('parses a kinesis firehose record from a kinesis firehose event', () => { + // Prepare const testEvent = structuredClone(kinesisFirehoseEvent); + + // Act const parsedRecord: KinesisFirehoseRecord = KinesisFirehoseRecordSchema.parse(testEvent.records[0]); + // Assess expect(parsedRecord.data).toEqual('Hello World'); }); - it('should parse a sqs record from a kinesis firehose event', () => { + it('parses a sqs record from a kinesis firehose event', () => { + // Prepare const kinesisFireHoseSqsEvent = structuredClone(kinesisFirehoseSQSEvent); + + // Act const parsed: KinesisFirehoseSqsRecord = KinesisFirehoseSqsRecordSchema.parse(kinesisFireHoseSqsEvent.records[0]); + // Assess expect(parsed.recordId).toEqual( '49640912821178817833517986466168945147170627572855734274000000' );