Skip to content

fix(parser): SNS Envelope handles non-JSON #3506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/parser/src/envelopes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export { KafkaEnvelope } from './kafka.js';
export { KinesisEnvelope } from './kinesis.js';
export { KinesisFirehoseEnvelope } from './kinesis-firehose.js';
export { LambdaFunctionUrlEnvelope } from './lambda.js';
export { SnsEnvelope, SnsSqsEnvelope } from './sns.js';
export { SqsEnvelope } from './sqs.js';
export { SnsEnvelope } from './sns.js';
export { SqsEnvelope, SnsSqsEnvelope } from './sqs.js';
export { VpcLatticeEnvelope } from './vpc-lattice.js';
export { VpcLatticeV2Envelope } from './vpc-latticev2.js';
161 changes: 53 additions & 108 deletions packages/parser/src/envelopes/sns.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
import { ParseError } from '../errors.js';
import { SnsSchema, SnsSqsNotificationSchema } from '../schemas/sns.js';
import { SqsSchema } from '../schemas/sqs.js';
import { SnsSchema } from '../schemas/sns.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* SNS Envelope to extract array of Records
Expand All @@ -23,8 +22,19 @@ export const SnsEnvelope = {
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = SnsSchema.parse(data);

return parsedEnvelope.Records.map((record) => {
return Envelope.parse(record.Sns.Message, schema);
return parsedEnvelope.Records.map((record, index) => {
try {
return schema.parse(record.Sns.Message);
} catch (error) {
throw new ParseError(`Failed to parse SNS record at index ${index}`, {
cause: new ZodError(
(error as ZodError).issues.map((issue) => ({
...issue,
path: ['Records', index, 'Sns', 'Message', ...issue.path],
}))
),
});
}
});
},

Expand All @@ -44,112 +54,47 @@ export const SnsEnvelope = {
};
}

const parsedMessages: z.infer<T>[] = [];
for (const record of parsedEnvelope.data.Records) {
const parsedMessage = Envelope.safeParse(record.Sns.Message, schema);
if (!parsedMessage.success) {
return {
success: false,
error: new ParseError('Failed to parse SNS message', {
cause: parsedMessage.error,
}),
originalEvent: data,
};
}
parsedMessages.push(parsedMessage.data);
}

return {
success: true,
data: parsedMessages,
};
},
};

/**
* SNS plus SQS Envelope to extract array of Records
*
* Published messages from SNS to SQS has a slightly different payload.
* Since SNS payload is marshalled into `Record` key in SQS, we have to:
*
* 1. Parse SQS schema with incoming data
* 2. Unmarshall SNS payload and parse against SNS Notification schema not SNS/SNS Record
* 3. Finally, parse provided model against payload extracted
*
*/
export const SnsSqsEnvelope = {
/**
* This is a discriminator to differentiate whether an envelope returns an array or an object
* @hidden
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = SqsSchema.parse(data);

return parsedEnvelope.Records.map((record) => {
const snsNotification = SnsSqsNotificationSchema.parse(
JSON.parse(record.body)
);
const result = parsedEnvelope.data.Records.reduce<{
success: boolean;
messages: z.infer<T>[];
errors: { index?: number; issues?: ZodIssue[] };
}>(
(acc, message, index) => {
const parsedMessage = schema.safeParse(message.Sns.Message);
if (!parsedMessage.success) {
acc.success = false;
const issues = parsedMessage.error.issues.map((issue) => ({
...issue,
path: ['Records', index, 'Sns', 'Message', ...issue.path],
}));
// @ts-expect-error - index is assigned
acc.errors[index] = { issues };
return acc;
}

return Envelope.parse(snsNotification.Message, schema);
});
},
acc.messages.push(parsedMessage.data);
return acc;
},
{ success: true, messages: [], errors: {} }
);

safeParse<T extends ZodSchema>(
data: unknown,
schema: T
): ParsedResult<unknown, z.infer<T>[]> {
const parsedEnvelope = SqsSchema.safeParse(data);
if (!parsedEnvelope.success) {
return {
success: false,
error: new ParseError('Failed to parse SQS envelope', {
cause: parsedEnvelope.error,
}),
originalEvent: data,
};
if (result.success) {
return { success: true, data: result.messages };
}

const parsedMessages: z.infer<T>[] = [];
const errorMessage =
Object.keys(result.errors).length > 1
? `Failed to parse SNS messages at indexes ${Object.keys(result.errors).join(', ')}`
: `Failed to parse SNS message at index ${Object.keys(result.errors)[0]}`;
const errorCause = new ZodError(
// @ts-expect-error - issues are assigned because success is false
Object.values(result.errors).flatMap((error) => error.issues)
);

// JSON.parse can throw an error, thus we catch it and return ParsedErrorResult
try {
for (const record of parsedEnvelope.data.Records) {
const snsNotification = SnsSqsNotificationSchema.safeParse(
JSON.parse(record.body)
);
if (!snsNotification.success) {
return {
success: false,
error: new ParseError('Failed to parse SNS notification', {
cause: snsNotification.error,
}),
originalEvent: data,
};
}
const parsedMessage = Envelope.safeParse(
snsNotification.data.Message,
schema
);
if (!parsedMessage.success) {
return {
success: false,
error: new ParseError('Failed to parse SNS message', {
cause: parsedMessage.error,
}),
originalEvent: data,
};
}
parsedMessages.push(parsedMessage.data);
}
} catch (e) {
return {
success: false,
error: e as Error,
originalEvent: data,
};
}

return { success: true, data: parsedMessages };
return {
success: false,
error: new ParseError(errorMessage, { cause: errorCause }),
originalEvent: data,
};
},
};
89 changes: 89 additions & 0 deletions packages/parser/src/envelopes/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ZodSchema, z } from 'zod';
import { ParseError } from '../errors.js';
import { SnsSqsNotificationSchema } from '../schemas/sns.js';
import { SqsSchema } from '../schemas/sqs.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
Expand Down Expand Up @@ -60,3 +61,91 @@ export const SqsEnvelope = {
return { success: true, data: parsedRecords };
},
};

/**
* SNS plus SQS Envelope to extract array of Records
*
* Published messages from SNS to SQS has a slightly different payload.
* Since SNS payload is marshalled into `Record` key in SQS, we have to:
*
* 1. Parse SQS schema with incoming data
* 2. Unmarshall SNS payload and parse against SNS Notification schema not SNS/SNS Record
* 3. Finally, parse provided model against payload extracted
*
*/
export const SnsSqsEnvelope = {
/**
* This is a discriminator to differentiate whether an envelope returns an array or an object
* @hidden
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = SqsSchema.parse(data);

return parsedEnvelope.Records.map((record) => {
const snsNotification = SnsSqsNotificationSchema.parse(
JSON.parse(record.body)
);

return Envelope.parse(snsNotification.Message, schema);
});
},

safeParse<T extends ZodSchema>(
data: unknown,
schema: T
): ParsedResult<unknown, z.infer<T>[]> {
const parsedEnvelope = SqsSchema.safeParse(data);
if (!parsedEnvelope.success) {
return {
success: false,
error: new ParseError('Failed to parse SQS envelope', {
cause: parsedEnvelope.error,
}),
originalEvent: data,
};
}

const parsedMessages: z.infer<T>[] = [];

// JSON.parse can throw an error, thus we catch it and return ParsedErrorResult
try {
for (const record of parsedEnvelope.data.Records) {
const snsNotification = SnsSqsNotificationSchema.safeParse(
JSON.parse(record.body)
);
if (!snsNotification.success) {
return {
success: false,
error: new ParseError('Failed to parse SNS notification', {
cause: snsNotification.error,
}),
originalEvent: data,
};
}
const parsedMessage = Envelope.safeParse(
snsNotification.data.Message,
schema
);
if (!parsedMessage.success) {
return {
success: false,
error: new ParseError('Failed to parse SNS message', {
cause: parsedMessage.error,
}),
originalEvent: data,
};
}
parsedMessages.push(parsedMessage.data);
}
} catch (e) {
return {
success: false,
error: e as Error,
originalEvent: data,
};
}

return { success: true, data: parsedMessages };
},
};
2 changes: 1 addition & 1 deletion packages/parser/src/schemas/sns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const SnsRecordSchema = z.object({
* @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html#sns-sample-event}
*/
const SnsSchema = z.object({
Records: z.array(SnsRecordSchema),
Records: z.array(SnsRecordSchema).min(1),
});

export {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-la ...",
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:ExampleTopic",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
Expand All @@ -23,9 +23,9 @@
},
"Type": "Notification",
"UnsubscribeUrl": "https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe",
"TopicArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda",
"TopicArn": "arn:aws:sns:us-east-2:123456789012:ExampleTopic",
"Subject": "TestInvoke"
}
}
]
}
}
Loading
Loading