Skip to content

feat(parser): DynamoDBMarshalled helper to parse DynamoDB data structure #3442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ If you want to extend a schema and transform a JSON stringified payload to an ob
--8<-- "examples/snippets/parser/samples/exampleSqsPayload.json"
```

### DynamoDB Stream event parsing

If you want to parse a DynamoDB stream event with unmarshalling, you can use the helper function `DynamoDBMarshalled`:

=== "DynamoDBStreamSchema with DynamoDBMarshalled"
```typescript hl_lines="17"
--8<-- "examples/snippets/parser/extendDynamoDBStreamSchema.ts"
```

=== "DynamoDBStream event payload"

```json hl_lines="13-20 49-56"
--8<-- "examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json"
```

## Envelopes

When trying to parse your payload you might encounter the following situations:
Expand Down
23 changes: 23 additions & 0 deletions examples/snippets/parser/extendDynamoDBStreamSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
import {
DynamoDBStreamRecord,
DynamoDBStreamSchema,
} from '@aws-lambda-powertools/parser/schemas/dynamodb';
import { z } from 'zod';

const customSchema = z.object({
id: z.string(),
message: z.string(),
});

const extendedSchema = DynamoDBStreamSchema.extend({
Records: z.array(
DynamoDBStreamRecord.extend({
dynamodb: z.object({
NewImage: DynamoDBMarshalled(customSchema).optional(),
}),
})
),
});

type ExtendedDynamoDBStreamEvent = z.infer<typeof extendedSchema>;
66 changes: 66 additions & 0 deletions examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1693997155.0,
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "eventsource_arn",
"eventSource": "aws:dynamodb"
},
{
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"OldImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "222",
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 59,
"NewImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "MODIFY",
"eventSourceARN": "source_arn",
"eventSource": "aws:dynamodb"
}
]
}

7 changes: 6 additions & 1 deletion packages/parser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@
"require": "./lib/cjs/helpers.js",
"import": "./lib/esm/helpers.js"
},
"./helpers/dynamodb": {
"require": "./lib/cjs/helpers/dynamodb.js",
"import": "./lib/esm/helpers/dynamodb.js"
},
"./types": {
"require": "./lib/cjs/types/index.js",
"import": "./lib/esm/types/index.js"
Expand Down Expand Up @@ -363,7 +367,8 @@
],
"peerDependencies": {
"@middy/core": "4.x || 5.x || 6.x",
"zod": ">=3.x"
"zod": ">=3.x",
"@aws-sdk/util-dynamodb": ">=3.x"
},
"peerDependenciesMeta": {
"zod": {
Expand Down
85 changes: 85 additions & 0 deletions packages/parser/src/helpers/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import type { AttributeValue } from '@aws-sdk/client-dynamodb';
import { unmarshall } from '@aws-sdk/util-dynamodb';
import { type ZodTypeAny, z } from 'zod';

/**
* A helper function to unmarshall DynamoDB stream events and validate them against a schema.
*
* @example
* ```typescript
* const mySchema = z.object({
* id: z.string(),
* name: z.string(),
* });
* const eventSchema = DynamoDBStreamSchema.extend({
* Records: z.array(
* DynamoDBStreamRecord.extend({
* dynamodb: z.object({
* NewImage: DynamoDBMarshalled(mySchema).optional(),
* }),
* })
* ),
* });
* type eventSchema = z.infer<typeof extendedSchema>;
* ```
* For example, if you have a DynamoDB stream event like the following:
*
* ```json
* {
* "Records": [
* {
* "dynamodb": {
* "NewImage": {
* "id": {
* "S": "12345"
* },
* "name": {
* "S": "John Doe"
* }
* }
* }
* }
* ]
* }
* ```
* Resulting in:
*
* ```json
* {
* "Records": [
* {
* "dynamodb": {
* "NewImage": {
* "id": "12345",
* "name": "John Doe"
* }
* }
* }
* ]
* }
* ```
*
* @param schema - The schema to validate the JSON string against
*/
const DynamoDBMarshalled = <T extends ZodTypeAny>(schema: T) =>
z
.union([
z.custom<AttributeValue>(),
z.record(z.string(), z.custom<AttributeValue>()),
])
.transform((str, ctx) => {
try {
return unmarshall(str);
} catch (err) {
ctx.addIssue({
code: 'custom',
message: 'Could not unmarshall DynamoDB stream record',
fatal: true,
});

return z.NEVER;
}
})
.pipe(schema);

export { DynamoDBMarshalled };
153 changes: 152 additions & 1 deletion packages/parser/tests/unit/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { describe, expect, it } from 'vitest';
import { z } from 'zod';
import { JSONStringified } from '../../src/helpers.js';
import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js';
import { AlbSchema } from '../../src/schemas/alb.js';
import {
DynamoDBStreamRecord,
DynamoDBStreamSchema,
} from '../../src/schemas/dynamodb';
import {
SnsNotificationSchema,
SnsRecordSchema,
} from '../../src/schemas/sns.js';
import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js';
import type { SnsEvent, SqsEvent } from '../../src/types/schema.js';
import type {
DynamoDBStreamEvent,
SnsEvent,
SqsEvent,
} from '../../src/types/schema.js';
import { getTestEvent } from './schema/utils.js';

const bodySchema = z.object({
Expand Down Expand Up @@ -152,3 +161,145 @@ describe('JSONStringified', () => {
});
});
});

describe('DynamoDBMarshalled', () => {
// Prepare
const schema = z.object({
Message: z.string(),
Id: z.number(),
});

const extendedSchema = DynamoDBStreamSchema.extend({
Records: z.array(
DynamoDBStreamRecord.extend({
dynamodb: z.object({
NewImage: DynamoDBMarshalled(schema).optional(),
}),
})
),
});

it('should correctly unmarshall and validate a valid DynamoDB stream record', () => {
// Prepare
const testInput = [
{
Message: {
S: 'New item!',
},
Id: {
N: '101',
},
},
{
Message: {
S: 'This item has changed',
},
Id: {
N: '101',
},
},
];
const expectedOutput = [
{
Id: 101,
Message: 'New item!',
},
{
Id: 101,
Message: 'This item has changed',
},
];

const testEvent = getTestEvent<DynamoDBStreamEvent>({
eventsPath: '.',
filename: 'dynamoStreamEvent',
});

testEvent.Records[0].dynamodb.NewImage = testInput[0];
testEvent.Records[1].dynamodb.NewImage = testInput[1];

// Act & Assess
expect(extendedSchema.parse(testEvent)).toStrictEqual({
Records: [
{
...testEvent.Records[0],
dynamodb: {
NewImage: expectedOutput[0],
},
},
{
...testEvent.Records[1],
dynamodb: {
NewImage: expectedOutput[1],
},
},
],
});
});

it('should throw an error if the DynamoDB stream record cannot be unmarshalled', () => {
// Prepare
const testInput = [
{
Message: {
S: 'New item!',
},
Id: {
NNN: '101', //unknown type
},
},
{
Message: {
S: 'This item has changed',
},
Id: {
N: '101',
},
},
];

const testEvent = getTestEvent<DynamoDBStreamEvent>({
eventsPath: '.',
filename: 'dynamoStreamEvent',
});

testEvent.Records[0].dynamodb.NewImage = testInput[0];
testEvent.Records[1].dynamodb.NewImage = testInput[1];

// Act & Assess
expect(() => extendedSchema.parse(testEvent)).toThrow(
'Could not unmarshall DynamoDB stream record'
);
});

it('should throw a validation error if the unmarshalled record does not match the schema', () => {
// Prepare
const testInput = [
{
Message: {
S: 'New item!',
},
Id: {
N: '101',
},
},
{
Message: {
S: 'This item has changed',
},
// Id is missing
},
];

const testEvent = getTestEvent<DynamoDBStreamEvent>({
eventsPath: '.',
filename: 'dynamoStreamEvent',
});

testEvent.Records[0].dynamodb.NewImage = testInput[0];
testEvent.Records[1].dynamodb.NewImage = testInput[1];

// Act & Assess
expect(() => extendedSchema.parse(testEvent)).toThrow();
});
});
Loading
Loading