From 9ccb42551bb29bc51ade762dd8c51d1590b878b4 Mon Sep 17 00:00:00 2001 From: Prajesh Ravindran Date: Tue, 11 Aug 2020 08:17:06 -0700 Subject: [PATCH 1/2] Add Kafka Event --- .../lambda/runtime/events/KafkaEvent.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java new file mode 100644 index 00000000..dd554ddd --- /dev/null +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with + * the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package com.amazonaws.services.lambda.runtime.events; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Data +@NoArgsConstructor +@EqualsAndHashCode +/** Represents a Kafka Event. **/ +public class KafkaEvent { + private Map> records; + public String eventSource; + private String eventSourceArn; + + public Map> getRecords() { + return records; + } + + public KafkaEvent(Map> eventRecords) { + if (eventRecords == null) { + throw new IllegalArgumentException("Records cannot be null"); + } + Map> records = new HashMap<>(); + for(Map.Entry> entrySet: eventRecords.entrySet()) { + records.put(entrySet.getKey().toString(), entrySet.getValue()); + } + this.records = records; + } + + public String getEventSource () { + return eventSource; + } + + @Data + @NoArgsConstructor + @EqualsAndHashCode + public static class KafkaEventRecord { + private String topic; + private int partition; + private long offset; + private long timestamp; + private String timestampType; + private String key; + private String value; + } + + @Data + @NoArgsConstructor + public static class TopicPartition { + private String topic; + private int partition; + + @Override + public String toString() { + //Kafka also uses '-' for toString() + return topic + "-" + partition; + } + } +} + From 12ad1afaddbdf06dbcf8ae9d67987d08bbb7fe8a Mon Sep 17 00:00:00 2001 From: Prajesh Ravindran Date: Tue, 11 Aug 2020 09:28:23 -0700 Subject: [PATCH 2/2] KafkaEvent addressed the comments --- .../lambda/runtime/events/KafkaEvent.java | 36 ++++++------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java index dd554ddd..ed5c2ac8 100644 --- a/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java +++ b/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java @@ -12,44 +12,27 @@ */ package com.amazonaws.services.lambda.runtime.events; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; -import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import java.util.HashMap; import java.util.List; import java.util.Map; @Data @NoArgsConstructor -@EqualsAndHashCode +@AllArgsConstructor +@Builder(setterPrefix = "with") /** Represents a Kafka Event. **/ public class KafkaEvent { private Map> records; - public String eventSource; + private String eventSource; private String eventSourceArn; - - public Map> getRecords() { - return records; - } - - public KafkaEvent(Map> eventRecords) { - if (eventRecords == null) { - throw new IllegalArgumentException("Records cannot be null"); - } - Map> records = new HashMap<>(); - for(Map.Entry> entrySet: eventRecords.entrySet()) { - records.put(entrySet.getKey().toString(), entrySet.getValue()); - } - this.records = records; - } - - public String getEventSource () { - return eventSource; - } - + @Data @NoArgsConstructor - @EqualsAndHashCode + @AllArgsConstructor + @Builder(setterPrefix = "with") public static class KafkaEventRecord { private String topic; private int partition; @@ -62,6 +45,8 @@ public static class KafkaEventRecord { @Data @NoArgsConstructor + @AllArgsConstructor + @Builder(setterPrefix = "with") public static class TopicPartition { private String topic; private int partition; @@ -73,4 +58,3 @@ public String toString() { } } } -