Skip to content

Feature request: parse DynamoDB Stream events via Kinesis Data Stream #3193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
2 tasks done
dreamorosi opened this issue Oct 11, 2024 · 2 comments · Fixed by #3328
Closed
2 tasks done

Feature request: parse DynamoDB Stream events via Kinesis Data Stream #3193

dreamorosi opened this issue Oct 11, 2024 · 2 comments · Fixed by #3328
Assignees
Labels
completed This item is complete and has been merged/shipped feature-request This item refers to a feature request for an existing or new utility parser This item relates to the Parser Utility

Comments

@dreamorosi
Copy link
Contributor

Use case

When working with Amazon DynamoDB Streams and Amazon Kinesis Data Streams, with events being consumed by a AWS Lambda function customers want to parse and validate events before processing them.

flowchart LR
    A[DynamoDB Table] -->|DynamoDB Stream| B[Kinesis Data Stream]
    B -->|triggers| C[Lambda. Function]
Loading

In these cases, the change event from the DynamoDB Stream gets encoded and wrapped into a Kinesis Data Stream event that acts as an envelope. The current schemas we have available for both DynamoDB Stream and Kinesis Data Stream are not enough to support this use case.

The current DynamoDBStreamSchema assumes that events come in a shape that looks like this:

{
  "Records": [
     {
        "eventID": "1",
        "eventVersion": "1.0",
        "dynamodb": {
          "ApproximateCreationDateTime": 1693997155.0,
          "Keys": {
            "Id": {
              "N": "101"
            }
          },
          "NewImage": {
            "Message": {
              "S": "New item!"
            },
            "Id": {
              "N": "101"
            }
          },
          "StreamViewType": "NEW_AND_OLD_IMAGES",
          "SequenceNumber": "111",
          "SizeBytes": 26
        },
        "awsRegion": "us-west-2",
        "eventName": "INSERT",
        "eventSourceARN": "eventsource_arn",
        "eventSource": "aws:dynamodb"
      }
  ]
}

On the other hand, when these same events come through a Kinesis Data Stream, they look like this (note that the data field actually comes as base64 encoded, in the example below I am presenting it as decoded for easier understanding ):

{
  Records: [
    {
      eventSource: 'aws:kinesis',
      eventVersion: '1.0',
      eventID:
        'shardId-000000000000:49656632116218945776331460018176327016585772817654480898',
      eventName: 'aws:kinesis:record',
      invokeIdentityArn:
        'arn:aws:iam::123456789012:role/KinesisddbStack-MyFunctionServiceRole3C357FF2-bxvXci1V8a2G',
      eventSourceARN:
        'arn:aws:kinesis:eu-west-1:123456789012:stream/KinesisddbStack-MyDataStream2006A1E4-BJi822bWFiFV',
      kinesis: {
        kinesisSchemaVersion: '1.0',
        partitionKey: '508B17441EAB608C8643A4479FCEF4A5',
        sequenceNumber:
          '49656632116218945776331460018176327016585772817654480898',
        approximateArrivalTimestamp: 1728572253.015,
        data: {
          awsRegion: 'eu-west-1',
          eventID: 'ec61129b-46af-4e89-b5d7-500aa6b9eeda',
          eventName: 'INSERT',
          userIdentity: null,
          recordFormat: 'application/json',
          tableName: 'MyTable',
          dynamodb: {
            ApproximateCreationDateTime: 1728572252034,
            Keys: {
              id: {
                S: 'foo',
              },
            },
            NewImage: {
              id: {
                S: 'foo',
              },
            },
            SizeBytes: 24,
          },
          eventSource: 'aws:dynamodb',
        },
        // data: 'eyJ...ZGIifQ==',
      },
    },
  ],
}

This means it's not possible to use DynamoDBStreamSchema to parse the data attribute due to the two structures being incompatible.

We should work to add support for a new schema specific to this type of integration.

Solution/User Experience

The schema below was successful in earlier tests

import { parser } from '@aws-lambda-powertools/parser/middleware';
import { KinesisEnvelope } from '@aws-lambda-powertools/parser/envelopes';
import middy from '@middy/core';
import { z } from 'zod';

const DynamoDBStreamEvent = z.object({
  awsRegion: z.string(),
  eventID: z.string(),
  eventName: z.enum(['INSERT', 'MODIFY', 'REMOVE']),
  userIdentity: z.null(),
  recordFormat: z.string(),
  tableName: z.string(),
  dynamodb: z.object({
    ApproximateCreationDateTime: z.number(),
    Keys: z.any().optional(),
    NewImage: z.any().optional(),
    SizeBytes: z.number(),
  }),
});

export const handler = middy(async (event: unknown) => {
  return {
    statusCode: 200,
    body: JSON.stringify('Hello, World!'),
  };
}).use(
  parser({
    schema: DynamoDBStreamEvent,
    envelope: KinesisEnvelope,
  }) 
);

Alternative solutions

No response

Acknowledgment

Future readers

Please react with 👍 and your use case to help us understand customer demand.

@dreamorosi dreamorosi added confirmed The scope is clear, ready for implementation feature-request This item refers to a feature request for an existing or new utility help-wanted We would really appreciate some support from community for this one parser This item relates to the Parser Utility labels Oct 11, 2024
@dreamorosi dreamorosi changed the title Feature request: support for DynamoDB Stream events via Kinesis Data Stream Feature request: parse DynamoDB Stream events via Kinesis Data Stream Oct 11, 2024
@dreamorosi dreamorosi moved this from Triage to Backlog in Powertools for AWS Lambda (TypeScript) Oct 11, 2024
@am29d am29d self-assigned this Oct 14, 2024
@dreamorosi
Copy link
Contributor Author

While working on this, we should also clarify/clean up this transform here in the KinesisDataStreamRecordPayload schema.

Right now it's parsing / decoding, and potentially parsing again various times and the comment left on the ternary operator might not be accurate. For example, I'm parsing a CloudWatch log event coming via Kinesis Data Stream and even though the logic enters in the else branch (aka the one after the :), the data being returned is not a string. but already an object.

Also the decompresed variable has a typo.

@am29d am29d moved this from Backlog to Working on it in Powertools for AWS Lambda (TypeScript) Nov 14, 2024
@github-project-automation github-project-automation bot moved this from Working on it to Coming soon in Powertools for AWS Lambda (TypeScript) Nov 22, 2024
@github-actions github-actions bot added pending-release This item has been merged and will be released soon and removed help-wanted We would really appreciate some support from community for this one confirmed The scope is clear, ready for implementation labels Nov 22, 2024
Copy link
Contributor

This is now released under v2.12.0 version!

@github-actions github-actions bot added completed This item is complete and has been merged/shipped and removed pending-release This item has been merged and will be released soon labels Dec 17, 2024
@dreamorosi dreamorosi moved this from Coming soon to Shipped in Powertools for AWS Lambda (TypeScript) Jan 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
completed This item is complete and has been merged/shipped feature-request This item refers to a feature request for an existing or new utility parser This item relates to the Parser Utility
Projects
2 participants