From 8c3b7dcd39bd88d979ab353364333fa1b3e2a80f Mon Sep 17 00:00:00 2001 From: Zsombor Balogh Date: Thu, 26 May 2022 17:19:22 +0100 Subject: [PATCH] Add support for tumbling windows events serialization --- .../events/LambdaEventSerializers.java | 17 +++ .../mixins/DynamodbTimeWindowEventMixin.java | 14 +++ .../mixins/KinesisTimeWindowEventMixin.java | 14 +++ .../events/LambdaEventSerializersTest.java | 2 + .../dynamodb_time_window_event.json | 101 ++++++++++++++++++ .../kinesis_time_window_event.json | 31 ++++++ 6 files changed, 179 insertions(+) create mode 100644 aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/DynamodbTimeWindowEventMixin.java create mode 100644 aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/KinesisTimeWindowEventMixin.java create mode 100644 aws-lambda-java-serialization/src/test/resources/event_models/dynamodb_time_window_event.json create mode 100644 aws-lambda-java-serialization/src/test/resources/event_models/kinesis_time_window_event.json diff --git a/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializers.java b/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializers.java index 4cb4c431..b24b4060 100644 --- a/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializers.java +++ b/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializers.java @@ -8,7 +8,9 @@ import com.amazonaws.services.lambda.runtime.serialization.events.mixins.CodeCommitEventMixin; import com.amazonaws.services.lambda.runtime.serialization.events.mixins.ConnectEventMixin; import com.amazonaws.services.lambda.runtime.serialization.events.mixins.DynamodbEventMixin; +import com.amazonaws.services.lambda.runtime.serialization.events.mixins.DynamodbTimeWindowEventMixin; import com.amazonaws.services.lambda.runtime.serialization.events.mixins.KinesisEventMixin; +import com.amazonaws.services.lambda.runtime.serialization.events.mixins.KinesisTimeWindowEventMixin; import com.amazonaws.services.lambda.runtime.serialization.events.mixins.SNSEventMixin; import com.amazonaws.services.lambda.runtime.serialization.events.mixins.SQSEventMixin; import com.amazonaws.services.lambda.runtime.serialization.events.mixins.ScheduledEventMixin; @@ -63,8 +65,10 @@ public class LambdaEventSerializers { "com.amazonaws.services.lambda.runtime.events.ConfigEvent", "com.amazonaws.services.lambda.runtime.events.ConnectEvent", "com.amazonaws.services.lambda.runtime.events.DynamodbEvent", + "com.amazonaws.services.lambda.runtime.events.DynamodbTimeWindowEvent", "com.amazonaws.services.lambda.runtime.events.IoTButtonEvent", "com.amazonaws.services.lambda.runtime.events.KinesisEvent", + "com.amazonaws.services.lambda.runtime.events.KinesisTimeWindowEvent", "com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent", "com.amazonaws.services.lambda.runtime.events.LambdaDestinationEvent", "com.amazonaws.services.lambda.runtime.events.LexEvent", @@ -128,10 +132,14 @@ public class LambdaEventSerializers { DynamodbEventMixin.AttributeValueMixin.class), new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue", DynamodbEventMixin.AttributeValueMixin.class), + new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.DynamodbTimeWindowEvent", + DynamodbTimeWindowEventMixin.class), new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisEvent", KinesisEventMixin.class), new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisEvent$Record", KinesisEventMixin.RecordMixin.class), + new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisTimeWindowEvent", + KinesisTimeWindowEventMixin.class), new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.ScheduledEvent", ScheduledEventMixin.class), new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.SecretsManagerRotationEvent", @@ -172,6 +180,15 @@ public class LambdaEventSerializers { "com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord", "com.amazonaws.services.dynamodbv2.model.StreamRecord"), new NestedClass("com.amazonaws.services.lambda.runtime.events.DynamodbEvent$DynamodbStreamRecord"))), + new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.DynamodbTimeWindowEvent", + Arrays.asList( + new AlternateNestedClass( + "com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue", + "com.amazonaws.services.dynamodbv2.model.AttributeValue"), + new AlternateNestedClass( + "com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord", + "com.amazonaws.services.dynamodbv2.model.StreamRecord"), + new NestedClass("com.amazonaws.services.lambda.runtime.events.DynamodbEvent$DynamodbStreamRecord"))), new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisEvent", Arrays.asList( new NestedClass("com.amazonaws.services.lambda.runtime.events.KinesisEvent$Record"))), diff --git a/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/DynamodbTimeWindowEventMixin.java b/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/DynamodbTimeWindowEventMixin.java new file mode 100644 index 00000000..93d87a32 --- /dev/null +++ b/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/DynamodbTimeWindowEventMixin.java @@ -0,0 +1,14 @@ +/* + * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + */ + +package com.amazonaws.services.lambda.runtime.serialization.events.mixins; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public abstract class DynamodbTimeWindowEventMixin extends DynamodbEventMixin { + + // needed because Jackson expects "eventSourceArn" instead of "eventSourceARN" + @JsonProperty("eventSourceARN") abstract String getEventSourceArn(); + @JsonProperty("eventSourceARN") abstract void setEventSourceArn(String eventSourceArn); +} diff --git a/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/KinesisTimeWindowEventMixin.java b/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/KinesisTimeWindowEventMixin.java new file mode 100644 index 00000000..374a23bc --- /dev/null +++ b/aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/mixins/KinesisTimeWindowEventMixin.java @@ -0,0 +1,14 @@ +/* + * Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. + */ + +package com.amazonaws.services.lambda.runtime.serialization.events.mixins; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public abstract class KinesisTimeWindowEventMixin extends KinesisEventMixin { + + // needed because Jackson expects "eventSourceArn" instead of "eventSourceARN" + @JsonProperty("eventSourceARN") abstract String getEventSourceArn(); + @JsonProperty("eventSourceARN") abstract void setEventSourceArn(String eventSourceArn); +} diff --git a/aws-lambda-java-serialization/src/test/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializersTest.java b/aws-lambda-java-serialization/src/test/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializersTest.java index c1d9fd0b..8f907654 100644 --- a/aws-lambda-java-serialization/src/test/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializersTest.java +++ b/aws-lambda-java-serialization/src/test/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializersTest.java @@ -36,6 +36,7 @@ private static Stream serdeArguments() { Arguments.of("cognito_event.json", CognitoEvent.class), Arguments.of("config_event.json", ConfigEvent.class), Arguments.of("dynamodb_event.json", DynamodbEvent.class), + Arguments.of("dynamodb_time_window_event.json", DynamodbTimeWindowEvent.class), Arguments.of("iot_button_event.json", IoTButtonEvent.class), Arguments.of("kinesis_analytics_firehose_input_preprocessing_event.json", KinesisAnalyticsFirehoseInputPreprocessingEvent.class), Arguments.of("kinesis_analytics_input_preprocessing_response_event.json", KinesisAnalyticsInputPreprocessingResponse.class), @@ -43,6 +44,7 @@ private static Stream serdeArguments() { Arguments.of("kinesis_analytics_output_delivery_response_event.json", KinesisAnalyticsOutputDeliveryResponse.class), Arguments.of("kinesis_analytics_streams_input_preprocessing_event.json", KinesisAnalyticsStreamsInputPreprocessingEvent.class), Arguments.of("kinesis_event.json", KinesisEvent.class), + Arguments.of("kinesis_time_window_event.json", KinesisTimeWindowEvent.class), Arguments.of("kinesis_firehose_event.json", KinesisFirehoseEvent.class), Arguments.of("lex_event.json", LexEvent.class), Arguments.of("s3_event.json", S3Event.class), diff --git a/aws-lambda-java-serialization/src/test/resources/event_models/dynamodb_time_window_event.json b/aws-lambda-java-serialization/src/test/resources/event_models/dynamodb_time_window_event.json new file mode 100644 index 00000000..237b7bd2 --- /dev/null +++ b/aws-lambda-java-serialization/src/test/resources/event_models/dynamodb_time_window_event.json @@ -0,0 +1,101 @@ +{ + "Records": [ + { + "eventID": "1", + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "111", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "2", + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "3", + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "333", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + } + ], + "window": { + "start": "2021-10-26T17:00:00Z", + "end": "2021-10-26T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/aws-lambda-java-serialization/src/test/resources/event_models/kinesis_time_window_event.json b/aws-lambda-java-serialization/src/test/resources/event_models/kinesis_time_window_event.json new file mode 100644 index 00000000..02499226 --- /dev/null +++ b/aws-lambda-java-serialization/src/test/resources/event_models/kinesis_time_window_event.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2021-10-26T17:00:00Z", + "end": "2021-10-26T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +}