Skip to content

fix(parser): allow Kinesis envelopes to handle non-JSON strings #3531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions packages/parser/src/envelopes/kinesis-firehose.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
import { ParseError } from '../errors.js';
import { KinesisFirehoseSchema } from '../schemas/index.js';
import {
type KinesisFirehoseRecordSchema,
KinesisFirehoseSchema,
} from '../schemas/index.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* Kinesis Firehose Envelope to extract array of Records
Expand All @@ -23,10 +26,33 @@ export const KinesisFirehoseEnvelope = {
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = KinesisFirehoseSchema.parse(data);
let parsedEnvelope: z.infer<typeof KinesisFirehoseSchema>;
try {
parsedEnvelope = KinesisFirehoseSchema.parse(data);
} catch (error) {
throw new ParseError('Failed to parse Kinesis Firehose envelope', {
cause: error as Error,
});
}

return parsedEnvelope.records.map((record) => {
return Envelope.parse(record.data, schema);
return parsedEnvelope.records.map((record, recordIndex) => {
let parsedRecord: z.infer<typeof KinesisFirehoseRecordSchema>;
try {
parsedRecord = schema.parse(record.data);
} catch (error) {
throw new ParseError(
`Failed to parse Kinesis Firehose record at index ${recordIndex}`,
{
cause: new ZodError(
(error as ZodError).issues.map((issue) => ({
...issue,
path: ['records', recordIndex, 'data', ...issue.path],
}))
),
}
);
}
return parsedRecord;
});
},

Expand All @@ -35,7 +61,6 @@ export const KinesisFirehoseEnvelope = {
schema: T
): ParsedResult<unknown, z.infer<T>[]> {
const parsedEnvelope = KinesisFirehoseSchema.safeParse(data);

if (!parsedEnvelope.success) {
return {
success: false,
Expand All @@ -45,25 +70,49 @@ export const KinesisFirehoseEnvelope = {
originalEvent: data,
};
}
const parsedRecords: z.infer<T>[] = [];

for (const record of parsedEnvelope.data.records) {
const parsedData = Envelope.safeParse(record.data, schema);
if (!parsedData.success) {
return {
success: false,
error: new ParseError('Failed to parse Kinesis Firehose record', {
cause: parsedData.error,
}),
originalEvent: data,
};
}
parsedRecords.push(parsedData.data);
const result = parsedEnvelope.data.records.reduce<{
success: boolean;
records: z.infer<T>[];
errors: {
[key: number | string]: { issues: ZodIssue[] };
};
}>(
(acc, record, index) => {
const parsedRecord = schema.safeParse(record.data);

if (!parsedRecord.success) {
const issues = parsedRecord.error.issues.map((issue) => ({
...issue,
path: ['records', index, 'data', ...issue.path],
}));
acc.success = false;
acc.errors[index] = { issues };
return acc;
}

acc.records.push(parsedRecord.data);
return acc;
},
{ success: true, records: [], errors: {} }
);

if (result.success) {
return { success: true, data: result.records };
}

const errorMessage =
Object.keys(result.errors).length > 1
? `Failed to parse Kinesis Firehose records at indexes ${Object.keys(result.errors).join(', ')}`
: `Failed to parse Kinesis Firehose record at index ${Object.keys(result.errors)[0]}`;
return {
success: true,
data: parsedRecords,
success: false,
error: new ParseError(errorMessage, {
cause: new ZodError(
Object.values(result.errors).flatMap((error) => error.issues)
),
}),
originalEvent: data,
};
},
};
97 changes: 76 additions & 21 deletions packages/parser/src/envelopes/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
import { ParseError } from '../errors.js';
import { KinesisDataStreamSchema } from '../schemas/kinesis.js';
import {
type KinesisDataStreamRecord,
KinesisDataStreamSchema,
} from '../schemas/kinesis.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* Kinesis Data Stream Envelope to extract array of Records
Expand All @@ -21,10 +24,39 @@ export const KinesisEnvelope = {
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = KinesisDataStreamSchema.parse(data);
let parsedEnvelope: z.infer<typeof KinesisDataStreamSchema>;
try {
parsedEnvelope = KinesisDataStreamSchema.parse(data);
} catch (error) {
throw new ParseError('Failed to parse Kinesis Data Stream envelope', {
cause: error as Error,
});
}

return parsedEnvelope.Records.map((record) => {
return Envelope.parse(record.kinesis.data, schema);
return parsedEnvelope.Records.map((record, recordIndex) => {
let parsedRecord: z.infer<typeof KinesisDataStreamRecord>;
try {
parsedRecord = schema.parse(record.kinesis.data);
} catch (error) {
throw new ParseError(
`Failed to parse Kinesis Data Stream record at index ${recordIndex}`,
{
cause: new ZodError(
(error as ZodError).issues.map((issue) => ({
...issue,
path: [
'Records',
recordIndex,
'kinesis',
'data',
...issue.path,
],
}))
),
}
);
}
return parsedRecord;
});
},

Expand All @@ -43,25 +75,48 @@ export const KinesisEnvelope = {
};
}

const parsedRecords: z.infer<T>[] = [];
const result = parsedEnvelope.data.Records.reduce<{
success: boolean;
records: z.infer<T>[];
errors: { index?: number; issues?: ZodIssue[] };
}>(
(acc, record, index) => {
const parsedRecord = schema.safeParse(record.kinesis.data);

for (const record of parsedEnvelope.data.Records) {
const parsedRecord = Envelope.safeParse(record.kinesis.data, schema);
if (!parsedRecord.success) {
return {
success: false,
error: new ParseError('Failed to parse Kinesis Data Stream record', {
cause: parsedRecord.error,
}),
originalEvent: data,
};
}
parsedRecords.push(parsedRecord.data);
if (!parsedRecord.success) {
const issues = parsedRecord.error.issues.map((issue) => ({
...issue,
path: ['Records', index, 'kinesis', 'data', ...issue.path],
}));
acc.success = false;
// @ts-expect-error - index is assigned
acc.errors[index] = { issues };
return acc;
}

acc.records.push(parsedRecord.data);
return acc;
},
{ success: true, records: [], errors: {} }
);

if (result.success) {
return { success: true, data: result.records };
}

const errorMessage =
Object.keys(result.errors).length > 1
? `Failed to parse Kinesis Data Stream records at indexes ${Object.keys(result.errors).join(', ')}`
: `Failed to parse Kinesis Data Stream record at index ${Object.keys(result.errors)[0]}`;
return {
success: true,
data: parsedRecords,
success: false,
error: new ParseError(errorMessage, {
cause: new ZodError(
// @ts-expect-error - issues are assigned because success is false
Object.values(result.errors).flatMap((error) => error.issues)
),
}),
originalEvent: data,
};
},
};
Loading
Loading