Skip to content

Commit 89f0006

Browse files
authored
fix(parser): allow SQS envelopes to handle non-JSON strings (#3513)
1 parent 09aa287 commit 89f0006

File tree

13 files changed

+934
-489
lines changed

13 files changed

+934
-489
lines changed

Diff for: packages/parser/src/envelopes/index.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export { KinesisEnvelope } from './kinesis.js';
88
export { KinesisFirehoseEnvelope } from './kinesis-firehose.js';
99
export { LambdaFunctionUrlEnvelope } from './lambda.js';
1010
export { SnsEnvelope } from './sns.js';
11-
export { SqsEnvelope, SnsSqsEnvelope } from './sqs.js';
11+
export { SqsEnvelope } from './sqs.js';
12+
export { SnsSqsEnvelope } from './snssqs.js';
1213
export { VpcLatticeEnvelope } from './vpc-lattice.js';
1314
export { VpcLatticeV2Envelope } from './vpc-latticev2.js';

Diff for: packages/parser/src/envelopes/snssqs.ts

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
2+
import { ParseError } from '../errors.js';
3+
import { SnsSqsNotificationSchema } from '../schemas/sns.js';
4+
import { SqsSchema } from '../schemas/sqs.js';
5+
import type { ParsedResult, SnsSqsNotification } from '../types/index.js';
6+
import { envelopeDiscriminator } from './envelope.js';
7+
8+
const createError = (index: number, issues: ZodIssue[]) => ({
9+
issues: issues.map((issue) => ({
10+
...issue,
11+
path: ['Records', index, 'body', ...issue.path],
12+
})),
13+
});
14+
15+
type ParseStepSuccess<T> = {
16+
success: true;
17+
data: T;
18+
};
19+
20+
type ParseStepError = {
21+
success: false;
22+
error: { issues: ZodIssue[] };
23+
};
24+
25+
type ParseStepResult<T> = ParseStepSuccess<T> | ParseStepError;
26+
27+
const parseStep = <U>(
28+
parser: (data: unknown) => z.SafeParseReturnType<unknown, U>,
29+
data: unknown,
30+
index: number
31+
): ParseStepResult<U> => {
32+
const result = parser(data);
33+
return result.success
34+
? { success: true, data: result.data }
35+
: {
36+
success: false,
37+
error: createError(index, result.error.issues),
38+
};
39+
};
40+
41+
/**
42+
* SNS plus SQS Envelope to extract array of Records
43+
*
44+
* Published messages from SNS to SQS has a slightly different payload structure
45+
* than regular SNS messages, and when sent to SQS, they are stringified into the
46+
* `body` field of each SQS record.
47+
*
48+
* To parse the `Message` field of the SNS notification, we need to:
49+
* 1. Parse SQS schema with incoming data
50+
* 2. `JSON.parse()` the SNS payload and parse against SNS Notification schema
51+
* 3. Finally, parse the payload against the provided schema
52+
*/
53+
export const SnsSqsEnvelope = {
54+
/**
55+
* This is a discriminator to differentiate whether an envelope returns an array or an object
56+
* @hidden
57+
*/
58+
[envelopeDiscriminator]: 'array' as const,
59+
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
60+
let parsedEnvelope: z.infer<typeof SqsSchema>;
61+
try {
62+
parsedEnvelope = SqsSchema.parse(data);
63+
} catch (error) {
64+
throw new ParseError('Failed to parse SQS Envelope', {
65+
cause: error as Error,
66+
});
67+
}
68+
69+
return parsedEnvelope.Records.map((record, recordIndex) => {
70+
try {
71+
return schema.parse(
72+
SnsSqsNotificationSchema.parse(JSON.parse(record.body)).Message
73+
);
74+
} catch (error) {
75+
throw new ParseError(
76+
`Failed to parse SQS Record at index ${recordIndex}`,
77+
{
78+
cause: new ZodError(
79+
error instanceof ZodError
80+
? (error as ZodError).issues.map((issue) => ({
81+
...issue,
82+
path: ['Records', recordIndex, 'body', ...issue.path],
83+
}))
84+
: [
85+
{
86+
code: 'custom',
87+
message: 'Invalid JSON',
88+
path: ['Records', recordIndex, 'body'],
89+
},
90+
]
91+
),
92+
}
93+
);
94+
}
95+
});
96+
},
97+
98+
safeParse<T extends ZodSchema>(
99+
data: unknown,
100+
schema: T
101+
): ParsedResult<unknown, z.infer<T>[]> {
102+
const parsedEnvelope = SqsSchema.safeParse(data);
103+
if (!parsedEnvelope.success) {
104+
return {
105+
success: false,
106+
error: new ParseError('Failed to parse SQS envelope', {
107+
cause: parsedEnvelope.error,
108+
}),
109+
originalEvent: data,
110+
};
111+
}
112+
113+
const parseRecord = (
114+
record: { body: string },
115+
index: number
116+
): ParseStepResult<z.infer<T>> => {
117+
try {
118+
const body = JSON.parse(record.body);
119+
const notification = parseStep<SnsSqsNotification>(
120+
(data) => SnsSqsNotificationSchema.safeParse(data),
121+
body,
122+
index
123+
);
124+
if (!notification.success) return notification;
125+
126+
return parseStep<z.infer<T>>(
127+
(data) => schema.safeParse(data),
128+
notification.data.Message,
129+
index
130+
);
131+
} catch {
132+
return {
133+
success: false,
134+
error: createError(index, [
135+
{
136+
code: 'custom',
137+
message: 'Invalid JSON',
138+
path: [],
139+
},
140+
]),
141+
};
142+
}
143+
};
144+
145+
const result = parsedEnvelope.data.Records.reduce<{
146+
success: boolean;
147+
records: z.infer<T>[];
148+
errors: {
149+
[key: number | string]: { issues: ZodIssue[] };
150+
};
151+
}>(
152+
(acc, record, index) => {
153+
const parsed = parseRecord(record, index);
154+
if (!parsed.success) {
155+
acc.success = false;
156+
acc.errors[index] = parsed.error;
157+
} else {
158+
acc.records.push(parsed.data);
159+
}
160+
return acc;
161+
},
162+
{ success: true, records: [], errors: {} }
163+
);
164+
165+
if (result.success) {
166+
return { success: true, data: result.records };
167+
}
168+
169+
const indexes = Object.keys(result.errors);
170+
const errorMessage =
171+
indexes.length > 1
172+
? `Failed to parse SQS Records at indexes ${indexes.join(', ')}`
173+
: `Failed to parse SQS Record at index ${indexes[0]}`;
174+
175+
return {
176+
success: false,
177+
error: new ParseError(errorMessage, {
178+
cause: new ZodError(
179+
Object.values(result.errors).flatMap((e) => e.issues)
180+
),
181+
}),
182+
originalEvent: data,
183+
};
184+
},
185+
};

0 commit comments

Comments
 (0)