Skip to content

Commit 781a14e

Browse files
dreamorosiam29d
andauthored
fix(parser): CloudWatch Log Envelope handles non-JSON (#3505)
Co-authored-by: Alexander Schueren <[email protected]>
1 parent 4d7f05f commit 781a14e

33 files changed

+408
-839
lines changed

Diff for: packages/parser/src/envelopes/cloudwatch.ts

+88-27
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
1-
import type { ZodSchema, z } from 'zod';
1+
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
22
import { ParseError } from '../errors.js';
33
import { CloudWatchLogsSchema } from '../schemas/index.js';
44
import type { ParsedResult } from '../types/index.js';
5-
import { Envelope, envelopeDiscriminator } from './envelope.js';
5+
import { envelopeDiscriminator } from './envelope.js';
66

77
/**
8-
* CloudWatch Envelope to extract a List of log records.
9-
*
10-
* The record's body parameter is a string (after being base64 decoded and gzipped),
11-
* though it can also be a JSON encoded string.
12-
* Regardless of its type it'll be parsed into a BaseModel object.
13-
*
14-
* Note: The record will be parsed the same way so if model is str
8+
* CloudWatch Envelope to extract messages from the `awslogs.data.logEvents` key.
159
*/
1610
export const CloudWatchEnvelope = {
1711
/**
@@ -22,45 +16,112 @@ export const CloudWatchEnvelope = {
2216
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
2317
const parsedEnvelope = CloudWatchLogsSchema.parse(data);
2418

25-
return parsedEnvelope.awslogs.data.logEvents.map((record) => {
26-
return Envelope.parse(record.message, schema);
19+
return parsedEnvelope.awslogs.data.logEvents.map((record, index) => {
20+
try {
21+
return schema.parse(record.message);
22+
} catch (error) {
23+
throw new ParseError(
24+
`Failed to parse CloudWatch log event at index ${index}`,
25+
{
26+
cause: new ZodError(
27+
(error as ZodError).issues.map((issue) => ({
28+
...issue,
29+
path: [
30+
'awslogs',
31+
'data',
32+
'logEvents',
33+
index,
34+
'message',
35+
...issue.path,
36+
],
37+
}))
38+
),
39+
}
40+
);
41+
}
2742
});
2843
},
2944

3045
safeParse<T extends ZodSchema>(
3146
data: unknown,
3247
schema: T
3348
): ParsedResult<unknown, z.infer<T>[]> {
34-
const parsedEnvelope = CloudWatchLogsSchema.safeParse(data);
49+
let parsedEnvelope: ParsedResult<unknown, z.infer<T>>;
50+
try {
51+
parsedEnvelope = CloudWatchLogsSchema.safeParse(data);
52+
} catch (error) {
53+
parsedEnvelope = {
54+
success: false,
55+
error: error as Error,
56+
};
57+
}
3558

3659
if (!parsedEnvelope.success) {
3760
return {
3861
success: false,
39-
error: new ParseError('Failed to parse CloudWatch envelope', {
62+
error: new ParseError('Failed to parse CloudWatch Log envelope', {
4063
cause: parsedEnvelope.error,
4164
}),
4265
originalEvent: data,
4366
};
4467
}
45-
const parsedLogEvents: z.infer<T>[] = [];
4668

47-
for (const record of parsedEnvelope.data.awslogs.data.logEvents) {
48-
const parsedMessage = Envelope.safeParse(record.message, schema);
49-
if (!parsedMessage.success) {
50-
return {
51-
success: false,
52-
error: new ParseError('Failed to parse CloudWatch log event', {
53-
cause: parsedMessage.error,
54-
}),
55-
originalEvent: data,
56-
};
69+
const result = parsedEnvelope.data.awslogs.data.logEvents.reduce(
70+
(
71+
acc: {
72+
success: boolean;
73+
messages: z.infer<T>;
74+
errors: { [key: number]: { issues: ZodIssue[] } };
75+
},
76+
record: { message: string },
77+
index: number
78+
) => {
79+
const result = schema.safeParse(record.message);
80+
if (!result.success) {
81+
const issues = result.error.issues.map((issue) => ({
82+
...issue,
83+
path: [
84+
'awslogs',
85+
'data',
86+
'logEvents',
87+
index,
88+
'message',
89+
...issue.path,
90+
],
91+
}));
92+
93+
acc.success = false;
94+
acc.errors[index] = { issues };
95+
return acc;
96+
}
97+
98+
acc.messages.push(result.data);
99+
return acc;
100+
},
101+
{
102+
success: true,
103+
messages: [],
104+
errors: {},
57105
}
58-
parsedLogEvents.push(parsedMessage.data);
106+
);
107+
108+
if (result.success) {
109+
return { success: true, data: result.messages };
59110
}
60111

112+
const errorMessage =
113+
Object.keys(result.errors).length > 1
114+
? `Failed to parse CloudWatch Log messages at indexes ${Object.keys(result.errors).join(', ')}`
115+
: `Failed to parse CloudWatch Log message at index ${Object.keys(result.errors)[0]}`;
116+
const errorCause = new ZodError(
117+
// @ts-expect-error - issues are assigned because success is false
118+
Object.values(result.errors).flatMap((error) => error.issues)
119+
);
120+
61121
return {
62-
success: true,
63-
data: parsedLogEvents,
122+
success: false,
123+
error: new ParseError(errorMessage, { cause: errorCause }),
124+
originalEvent: data,
64125
};
65126
},
66127
};

Diff for: packages/parser/src/errors.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,14 @@ class ParseError extends Error {
1212
}
1313
}
1414

15-
export { ParseError };
15+
/**
16+
* Custom error thrown when decompression fails.
17+
*/
18+
class DecompressError extends ParseError {
19+
constructor(message: string, options?: { cause?: Error }) {
20+
super(message, options);
21+
this.name = 'DecompressError';
22+
}
23+
}
24+
25+
export { ParseError, DecompressError };

Diff for: packages/parser/src/schemas/cloudwatch.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { gunzipSync } from 'node:zlib';
22
import { z } from 'zod';
3+
import { DecompressError } from '../errors.js';
34

45
const CloudWatchLogEventSchema = z.object({
56
id: z.string(),
@@ -13,15 +14,21 @@ const CloudWatchLogsDecodeSchema = z.object({
1314
logGroup: z.string(),
1415
logStream: z.string(),
1516
subscriptionFilters: z.array(z.string()),
16-
logEvents: z.array(CloudWatchLogEventSchema),
17+
logEvents: z.array(CloudWatchLogEventSchema).min(1),
1718
});
1819

1920
const decompressRecordToJSON = (
2021
data: string
2122
): z.infer<typeof CloudWatchLogsDecodeSchema> => {
22-
const uncompressed = gunzipSync(Buffer.from(data, 'base64')).toString('utf8');
23+
try {
24+
const uncompressed = gunzipSync(Buffer.from(data, 'base64')).toString(
25+
'utf8'
26+
);
2327

24-
return CloudWatchLogsDecodeSchema.parse(JSON.parse(uncompressed));
28+
return CloudWatchLogsDecodeSchema.parse(JSON.parse(uncompressed));
29+
} catch (error) {
30+
throw new DecompressError('Failed to decompress CloudWatch log data');
31+
}
2532
};
2633

2734
/**

Diff for: packages/parser/tests/events/activeMQEvent.json

-54
This file was deleted.

Diff for: packages/parser/tests/events/awsConfigRuleConfigurationChanged.json

-13
This file was deleted.

0 commit comments

Comments
 (0)