Skip to content

Commit 4d7f05f

Browse files
authored
fix(parser): SNS Envelope handles non-JSON (#3506)
1 parent 41bb08e commit 4d7f05f

File tree

10 files changed

+437
-294
lines changed

10 files changed

+437
-294
lines changed

Diff for: packages/parser/src/envelopes/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export { KafkaEnvelope } from './kafka.js';
77
export { KinesisEnvelope } from './kinesis.js';
88
export { KinesisFirehoseEnvelope } from './kinesis-firehose.js';
99
export { LambdaFunctionUrlEnvelope } from './lambda.js';
10-
export { SnsEnvelope, SnsSqsEnvelope } from './sns.js';
11-
export { SqsEnvelope } from './sqs.js';
10+
export { SnsEnvelope } from './sns.js';
11+
export { SqsEnvelope, SnsSqsEnvelope } from './sqs.js';
1212
export { VpcLatticeEnvelope } from './vpc-lattice.js';
1313
export { VpcLatticeV2Envelope } from './vpc-latticev2.js';

Diff for: packages/parser/src/envelopes/sns.ts

+53-108
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import type { ZodSchema, z } from 'zod';
1+
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
22
import { ParseError } from '../errors.js';
3-
import { SnsSchema, SnsSqsNotificationSchema } from '../schemas/sns.js';
4-
import { SqsSchema } from '../schemas/sqs.js';
3+
import { SnsSchema } from '../schemas/sns.js';
54
import type { ParsedResult } from '../types/index.js';
6-
import { Envelope, envelopeDiscriminator } from './envelope.js';
5+
import { envelopeDiscriminator } from './envelope.js';
76

87
/**
98
* SNS Envelope to extract array of Records
@@ -23,8 +22,19 @@ export const SnsEnvelope = {
2322
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
2423
const parsedEnvelope = SnsSchema.parse(data);
2524

26-
return parsedEnvelope.Records.map((record) => {
27-
return Envelope.parse(record.Sns.Message, schema);
25+
return parsedEnvelope.Records.map((record, index) => {
26+
try {
27+
return schema.parse(record.Sns.Message);
28+
} catch (error) {
29+
throw new ParseError(`Failed to parse SNS record at index ${index}`, {
30+
cause: new ZodError(
31+
(error as ZodError).issues.map((issue) => ({
32+
...issue,
33+
path: ['Records', index, 'Sns', 'Message', ...issue.path],
34+
}))
35+
),
36+
});
37+
}
2838
});
2939
},
3040

@@ -44,112 +54,47 @@ export const SnsEnvelope = {
4454
};
4555
}
4656

47-
const parsedMessages: z.infer<T>[] = [];
48-
for (const record of parsedEnvelope.data.Records) {
49-
const parsedMessage = Envelope.safeParse(record.Sns.Message, schema);
50-
if (!parsedMessage.success) {
51-
return {
52-
success: false,
53-
error: new ParseError('Failed to parse SNS message', {
54-
cause: parsedMessage.error,
55-
}),
56-
originalEvent: data,
57-
};
58-
}
59-
parsedMessages.push(parsedMessage.data);
60-
}
61-
62-
return {
63-
success: true,
64-
data: parsedMessages,
65-
};
66-
},
67-
};
68-
69-
/**
70-
* SNS plus SQS Envelope to extract array of Records
71-
*
72-
* Published messages from SNS to SQS has a slightly different payload.
73-
* Since SNS payload is marshalled into `Record` key in SQS, we have to:
74-
*
75-
* 1. Parse SQS schema with incoming data
76-
* 2. Unmarshall SNS payload and parse against SNS Notification schema not SNS/SNS Record
77-
* 3. Finally, parse provided model against payload extracted
78-
*
79-
*/
80-
export const SnsSqsEnvelope = {
81-
/**
82-
* This is a discriminator to differentiate whether an envelope returns an array or an object
83-
* @hidden
84-
*/
85-
[envelopeDiscriminator]: 'array' as const,
86-
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
87-
const parsedEnvelope = SqsSchema.parse(data);
88-
89-
return parsedEnvelope.Records.map((record) => {
90-
const snsNotification = SnsSqsNotificationSchema.parse(
91-
JSON.parse(record.body)
92-
);
57+
const result = parsedEnvelope.data.Records.reduce<{
58+
success: boolean;
59+
messages: z.infer<T>[];
60+
errors: { index?: number; issues?: ZodIssue[] };
61+
}>(
62+
(acc, message, index) => {
63+
const parsedMessage = schema.safeParse(message.Sns.Message);
64+
if (!parsedMessage.success) {
65+
acc.success = false;
66+
const issues = parsedMessage.error.issues.map((issue) => ({
67+
...issue,
68+
path: ['Records', index, 'Sns', 'Message', ...issue.path],
69+
}));
70+
// @ts-expect-error - index is assigned
71+
acc.errors[index] = { issues };
72+
return acc;
73+
}
9374

94-
return Envelope.parse(snsNotification.Message, schema);
95-
});
96-
},
75+
acc.messages.push(parsedMessage.data);
76+
return acc;
77+
},
78+
{ success: true, messages: [], errors: {} }
79+
);
9780

98-
safeParse<T extends ZodSchema>(
99-
data: unknown,
100-
schema: T
101-
): ParsedResult<unknown, z.infer<T>[]> {
102-
const parsedEnvelope = SqsSchema.safeParse(data);
103-
if (!parsedEnvelope.success) {
104-
return {
105-
success: false,
106-
error: new ParseError('Failed to parse SQS envelope', {
107-
cause: parsedEnvelope.error,
108-
}),
109-
originalEvent: data,
110-
};
81+
if (result.success) {
82+
return { success: true, data: result.messages };
11183
}
11284

113-
const parsedMessages: z.infer<T>[] = [];
85+
const errorMessage =
86+
Object.keys(result.errors).length > 1
87+
? `Failed to parse SNS messages at indexes ${Object.keys(result.errors).join(', ')}`
88+
: `Failed to parse SNS message at index ${Object.keys(result.errors)[0]}`;
89+
const errorCause = new ZodError(
90+
// @ts-expect-error - issues are assigned because success is false
91+
Object.values(result.errors).flatMap((error) => error.issues)
92+
);
11493

115-
// JSON.parse can throw an error, thus we catch it and return ParsedErrorResult
116-
try {
117-
for (const record of parsedEnvelope.data.Records) {
118-
const snsNotification = SnsSqsNotificationSchema.safeParse(
119-
JSON.parse(record.body)
120-
);
121-
if (!snsNotification.success) {
122-
return {
123-
success: false,
124-
error: new ParseError('Failed to parse SNS notification', {
125-
cause: snsNotification.error,
126-
}),
127-
originalEvent: data,
128-
};
129-
}
130-
const parsedMessage = Envelope.safeParse(
131-
snsNotification.data.Message,
132-
schema
133-
);
134-
if (!parsedMessage.success) {
135-
return {
136-
success: false,
137-
error: new ParseError('Failed to parse SNS message', {
138-
cause: parsedMessage.error,
139-
}),
140-
originalEvent: data,
141-
};
142-
}
143-
parsedMessages.push(parsedMessage.data);
144-
}
145-
} catch (e) {
146-
return {
147-
success: false,
148-
error: e as Error,
149-
originalEvent: data,
150-
};
151-
}
152-
153-
return { success: true, data: parsedMessages };
94+
return {
95+
success: false,
96+
error: new ParseError(errorMessage, { cause: errorCause }),
97+
originalEvent: data,
98+
};
15499
},
155100
};

Diff for: packages/parser/src/envelopes/sqs.ts

+89
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { ZodSchema, z } from 'zod';
22
import { ParseError } from '../errors.js';
3+
import { SnsSqsNotificationSchema } from '../schemas/sns.js';
34
import { SqsSchema } from '../schemas/sqs.js';
45
import type { ParsedResult } from '../types/index.js';
56
import { Envelope, envelopeDiscriminator } from './envelope.js';
@@ -60,3 +61,91 @@ export const SqsEnvelope = {
6061
return { success: true, data: parsedRecords };
6162
},
6263
};
64+
65+
/**
66+
* SNS plus SQS Envelope to extract array of Records
67+
*
68+
* Published messages from SNS to SQS has a slightly different payload.
69+
* Since SNS payload is marshalled into `Record` key in SQS, we have to:
70+
*
71+
* 1. Parse SQS schema with incoming data
72+
* 2. Unmarshall SNS payload and parse against SNS Notification schema not SNS/SNS Record
73+
* 3. Finally, parse provided model against payload extracted
74+
*
75+
*/
76+
export const SnsSqsEnvelope = {
77+
/**
78+
* This is a discriminator to differentiate whether an envelope returns an array or an object
79+
* @hidden
80+
*/
81+
[envelopeDiscriminator]: 'array' as const,
82+
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
83+
const parsedEnvelope = SqsSchema.parse(data);
84+
85+
return parsedEnvelope.Records.map((record) => {
86+
const snsNotification = SnsSqsNotificationSchema.parse(
87+
JSON.parse(record.body)
88+
);
89+
90+
return Envelope.parse(snsNotification.Message, schema);
91+
});
92+
},
93+
94+
safeParse<T extends ZodSchema>(
95+
data: unknown,
96+
schema: T
97+
): ParsedResult<unknown, z.infer<T>[]> {
98+
const parsedEnvelope = SqsSchema.safeParse(data);
99+
if (!parsedEnvelope.success) {
100+
return {
101+
success: false,
102+
error: new ParseError('Failed to parse SQS envelope', {
103+
cause: parsedEnvelope.error,
104+
}),
105+
originalEvent: data,
106+
};
107+
}
108+
109+
const parsedMessages: z.infer<T>[] = [];
110+
111+
// JSON.parse can throw an error, thus we catch it and return ParsedErrorResult
112+
try {
113+
for (const record of parsedEnvelope.data.Records) {
114+
const snsNotification = SnsSqsNotificationSchema.safeParse(
115+
JSON.parse(record.body)
116+
);
117+
if (!snsNotification.success) {
118+
return {
119+
success: false,
120+
error: new ParseError('Failed to parse SNS notification', {
121+
cause: snsNotification.error,
122+
}),
123+
originalEvent: data,
124+
};
125+
}
126+
const parsedMessage = Envelope.safeParse(
127+
snsNotification.data.Message,
128+
schema
129+
);
130+
if (!parsedMessage.success) {
131+
return {
132+
success: false,
133+
error: new ParseError('Failed to parse SNS message', {
134+
cause: parsedMessage.error,
135+
}),
136+
originalEvent: data,
137+
};
138+
}
139+
parsedMessages.push(parsedMessage.data);
140+
}
141+
} catch (e) {
142+
return {
143+
success: false,
144+
error: e as Error,
145+
originalEvent: data,
146+
};
147+
}
148+
149+
return { success: true, data: parsedMessages };
150+
},
151+
};

Diff for: packages/parser/src/schemas/sns.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ const SnsRecordSchema = z.object({
113113
* @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html#sns-sample-event}
114114
*/
115115
const SnsSchema = z.object({
116-
Records: z.array(SnsRecordSchema),
116+
Records: z.array(SnsRecordSchema).min(1),
117117
});
118118

119119
export {

Diff for: packages/parser/tests/events/snsEvent.json renamed to packages/parser/tests/events/sns/base.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"Records": [
33
{
44
"EventVersion": "1.0",
5-
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-la ...",
5+
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:ExampleTopic",
66
"EventSource": "aws:sns",
77
"Sns": {
88
"SignatureVersion": "1",
@@ -23,9 +23,9 @@
2323
},
2424
"Type": "Notification",
2525
"UnsubscribeUrl": "https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe",
26-
"TopicArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda",
26+
"TopicArn": "arn:aws:sns:us-east-2:123456789012:ExampleTopic",
2727
"Subject": "TestInvoke"
2828
}
2929
}
3030
]
31-
}
31+
}

0 commit comments

Comments
 (0)