Skip to content

Commit e154e58

Browse files
feat(parser): DynamoDBMarshalled helper to parse DynamoDB data structure (#3442)
Co-authored-by: Andrea Amorosi <[email protected]>
1 parent 149e17f commit e154e58

File tree

7 files changed

+355
-2
lines changed

7 files changed

+355
-2
lines changed

Diff for: docs/utilities/parser.md

+15
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,21 @@ If you want to extend a schema and transform a JSON stringified payload to an ob
151151
--8<-- "examples/snippets/parser/samples/exampleSqsPayload.json"
152152
```
153153

154+
### DynamoDB Stream event parsing
155+
156+
If you want to parse a DynamoDB stream event with unmarshalling, you can use the helper function `DynamoDBMarshalled`:
157+
158+
=== "DynamoDBStreamSchema with DynamoDBMarshalled"
159+
```typescript hl_lines="17"
160+
--8<-- "examples/snippets/parser/extendDynamoDBStreamSchema.ts"
161+
```
162+
163+
=== "DynamoDBStream event payload"
164+
165+
```json hl_lines="13-20 49-56"
166+
--8<-- "examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json"
167+
```
168+
154169
## Envelopes
155170

156171
When trying to parse your payload you might encounter the following situations:
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
2+
import {
3+
DynamoDBStreamRecord,
4+
DynamoDBStreamSchema,
5+
} from '@aws-lambda-powertools/parser/schemas/dynamodb';
6+
import { z } from 'zod';
7+
8+
const customSchema = z.object({
9+
id: z.string(),
10+
message: z.string(),
11+
});
12+
13+
const extendedSchema = DynamoDBStreamSchema.extend({
14+
Records: z.array(
15+
DynamoDBStreamRecord.extend({
16+
dynamodb: z.object({
17+
NewImage: DynamoDBMarshalled(customSchema).optional(),
18+
}),
19+
})
20+
),
21+
});
22+
23+
type ExtendedDynamoDBStreamEvent = z.infer<typeof extendedSchema>;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
{
2+
"Records": [
3+
{
4+
"eventID": "1",
5+
"eventVersion": "1.0",
6+
"dynamodb": {
7+
"ApproximateCreationDateTime": 1693997155.0,
8+
"Keys": {
9+
"Id": {
10+
"N": "101"
11+
}
12+
},
13+
"NewImage": {
14+
"Message": {
15+
"S": "New item!"
16+
},
17+
"Id": {
18+
"N": "101"
19+
}
20+
},
21+
"StreamViewType": "NEW_AND_OLD_IMAGES",
22+
"SequenceNumber": "111",
23+
"SizeBytes": 26
24+
},
25+
"awsRegion": "us-west-2",
26+
"eventName": "INSERT",
27+
"eventSourceARN": "eventsource_arn",
28+
"eventSource": "aws:dynamodb"
29+
},
30+
{
31+
"eventID": "2",
32+
"eventVersion": "1.0",
33+
"dynamodb": {
34+
"OldImage": {
35+
"Message": {
36+
"S": "New item!"
37+
},
38+
"Id": {
39+
"N": "101"
40+
}
41+
},
42+
"SequenceNumber": "222",
43+
"Keys": {
44+
"Id": {
45+
"N": "101"
46+
}
47+
},
48+
"SizeBytes": 59,
49+
"NewImage": {
50+
"Message": {
51+
"S": "This item has changed"
52+
},
53+
"Id": {
54+
"N": "101"
55+
}
56+
},
57+
"StreamViewType": "NEW_AND_OLD_IMAGES"
58+
},
59+
"awsRegion": "us-west-2",
60+
"eventName": "MODIFY",
61+
"eventSourceARN": "source_arn",
62+
"eventSource": "aws:dynamodb"
63+
}
64+
]
65+
}
66+

Diff for: packages/parser/package.json

+6-1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@
180180
"require": "./lib/cjs/helpers.js",
181181
"import": "./lib/esm/helpers.js"
182182
},
183+
"./helpers/dynamodb": {
184+
"require": "./lib/cjs/helpers/dynamodb.js",
185+
"import": "./lib/esm/helpers/dynamodb.js"
186+
},
183187
"./types": {
184188
"require": "./lib/cjs/types/index.js",
185189
"import": "./lib/esm/types/index.js"
@@ -363,7 +367,8 @@
363367
],
364368
"peerDependencies": {
365369
"@middy/core": "4.x || 5.x || 6.x",
366-
"zod": ">=3.x"
370+
"zod": ">=3.x",
371+
"@aws-sdk/util-dynamodb": ">=3.x"
367372
},
368373
"peerDependenciesMeta": {
369374
"zod": {

Diff for: packages/parser/src/helpers/dynamodb.ts

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import type { AttributeValue } from '@aws-sdk/client-dynamodb';
2+
import { unmarshall } from '@aws-sdk/util-dynamodb';
3+
import { type ZodTypeAny, z } from 'zod';
4+
5+
/**
6+
* A helper function to unmarshall DynamoDB stream events and validate them against a schema.
7+
*
8+
* @example
9+
* ```typescript
10+
* const mySchema = z.object({
11+
* id: z.string(),
12+
* name: z.string(),
13+
* });
14+
* const eventSchema = DynamoDBStreamSchema.extend({
15+
* Records: z.array(
16+
* DynamoDBStreamRecord.extend({
17+
* dynamodb: z.object({
18+
* NewImage: DynamoDBMarshalled(mySchema).optional(),
19+
* }),
20+
* })
21+
* ),
22+
* });
23+
* type eventSchema = z.infer<typeof extendedSchema>;
24+
* ```
25+
* For example, if you have a DynamoDB stream event like the following:
26+
*
27+
* ```json
28+
* {
29+
* "Records": [
30+
* {
31+
* "dynamodb": {
32+
* "NewImage": {
33+
* "id": {
34+
* "S": "12345"
35+
* },
36+
* "name": {
37+
* "S": "John Doe"
38+
* }
39+
* }
40+
* }
41+
* }
42+
* ]
43+
* }
44+
* ```
45+
* Resulting in:
46+
*
47+
* ```json
48+
* {
49+
* "Records": [
50+
* {
51+
* "dynamodb": {
52+
* "NewImage": {
53+
* "id": "12345",
54+
* "name": "John Doe"
55+
* }
56+
* }
57+
* }
58+
* ]
59+
* }
60+
* ```
61+
*
62+
* @param schema - The schema to validate the JSON string against
63+
*/
64+
const DynamoDBMarshalled = <T extends ZodTypeAny>(schema: T) =>
65+
z
66+
.union([
67+
z.custom<AttributeValue>(),
68+
z.record(z.string(), z.custom<AttributeValue>()),
69+
])
70+
.transform((str, ctx) => {
71+
try {
72+
return unmarshall(str);
73+
} catch (err) {
74+
ctx.addIssue({
75+
code: 'custom',
76+
message: 'Could not unmarshall DynamoDB stream record',
77+
fatal: true,
78+
});
79+
80+
return z.NEVER;
81+
}
82+
})
83+
.pipe(schema);
84+
85+
export { DynamoDBMarshalled };

Diff for: packages/parser/tests/unit/helpers.test.ts

+152-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
import { describe, expect, it } from 'vitest';
22
import { z } from 'zod';
33
import { JSONStringified } from '../../src/helpers.js';
4+
import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js';
45
import { AlbSchema } from '../../src/schemas/alb.js';
6+
import {
7+
DynamoDBStreamRecord,
8+
DynamoDBStreamSchema,
9+
} from '../../src/schemas/dynamodb';
510
import {
611
SnsNotificationSchema,
712
SnsRecordSchema,
813
} from '../../src/schemas/sns.js';
914
import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js';
10-
import type { SnsEvent, SqsEvent } from '../../src/types/schema.js';
15+
import type {
16+
DynamoDBStreamEvent,
17+
SnsEvent,
18+
SqsEvent,
19+
} from '../../src/types/schema.js';
1120
import { getTestEvent } from './schema/utils.js';
1221

1322
const bodySchema = z.object({
@@ -152,3 +161,145 @@ describe('JSONStringified', () => {
152161
});
153162
});
154163
});
164+
165+
describe('DynamoDBMarshalled', () => {
166+
// Prepare
167+
const schema = z.object({
168+
Message: z.string(),
169+
Id: z.number(),
170+
});
171+
172+
const extendedSchema = DynamoDBStreamSchema.extend({
173+
Records: z.array(
174+
DynamoDBStreamRecord.extend({
175+
dynamodb: z.object({
176+
NewImage: DynamoDBMarshalled(schema).optional(),
177+
}),
178+
})
179+
),
180+
});
181+
182+
it('should correctly unmarshall and validate a valid DynamoDB stream record', () => {
183+
// Prepare
184+
const testInput = [
185+
{
186+
Message: {
187+
S: 'New item!',
188+
},
189+
Id: {
190+
N: '101',
191+
},
192+
},
193+
{
194+
Message: {
195+
S: 'This item has changed',
196+
},
197+
Id: {
198+
N: '101',
199+
},
200+
},
201+
];
202+
const expectedOutput = [
203+
{
204+
Id: 101,
205+
Message: 'New item!',
206+
},
207+
{
208+
Id: 101,
209+
Message: 'This item has changed',
210+
},
211+
];
212+
213+
const testEvent = getTestEvent<DynamoDBStreamEvent>({
214+
eventsPath: '.',
215+
filename: 'dynamoStreamEvent',
216+
});
217+
218+
testEvent.Records[0].dynamodb.NewImage = testInput[0];
219+
testEvent.Records[1].dynamodb.NewImage = testInput[1];
220+
221+
// Act & Assess
222+
expect(extendedSchema.parse(testEvent)).toStrictEqual({
223+
Records: [
224+
{
225+
...testEvent.Records[0],
226+
dynamodb: {
227+
NewImage: expectedOutput[0],
228+
},
229+
},
230+
{
231+
...testEvent.Records[1],
232+
dynamodb: {
233+
NewImage: expectedOutput[1],
234+
},
235+
},
236+
],
237+
});
238+
});
239+
240+
it('should throw an error if the DynamoDB stream record cannot be unmarshalled', () => {
241+
// Prepare
242+
const testInput = [
243+
{
244+
Message: {
245+
S: 'New item!',
246+
},
247+
Id: {
248+
NNN: '101', //unknown type
249+
},
250+
},
251+
{
252+
Message: {
253+
S: 'This item has changed',
254+
},
255+
Id: {
256+
N: '101',
257+
},
258+
},
259+
];
260+
261+
const testEvent = getTestEvent<DynamoDBStreamEvent>({
262+
eventsPath: '.',
263+
filename: 'dynamoStreamEvent',
264+
});
265+
266+
testEvent.Records[0].dynamodb.NewImage = testInput[0];
267+
testEvent.Records[1].dynamodb.NewImage = testInput[1];
268+
269+
// Act & Assess
270+
expect(() => extendedSchema.parse(testEvent)).toThrow(
271+
'Could not unmarshall DynamoDB stream record'
272+
);
273+
});
274+
275+
it('should throw a validation error if the unmarshalled record does not match the schema', () => {
276+
// Prepare
277+
const testInput = [
278+
{
279+
Message: {
280+
S: 'New item!',
281+
},
282+
Id: {
283+
N: '101',
284+
},
285+
},
286+
{
287+
Message: {
288+
S: 'This item has changed',
289+
},
290+
// Id is missing
291+
},
292+
];
293+
294+
const testEvent = getTestEvent<DynamoDBStreamEvent>({
295+
eventsPath: '.',
296+
filename: 'dynamoStreamEvent',
297+
});
298+
299+
testEvent.Records[0].dynamodb.NewImage = testInput[0];
300+
testEvent.Records[1].dynamodb.NewImage = testInput[1];
301+
302+
// Act & Assess
303+
expect(() => extendedSchema.parse(testEvent)).toThrow();
304+
});
305+
});

0 commit comments

Comments
 (0)