-
Notifications
You must be signed in to change notification settings - Fork 239
Added event class MskFirehoseEvent.java for Firehose Lambda transformation when MSK is the source #490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added event class MskFirehoseEvent.java for Firehose Lambda transformation when MSK is the source #490
Changes from 4 commits
ec8dca0
7df3645
f6f4a60
51b2640
7d689bb
eb08bbe
5eb7042
a720f87
df1e0e4
57a9a81
5ba689e
f240a62
5abcb06
f29f105
24e1b10
feec28f
6fa478e
243253b
4505f6d
cb37072
8239c4a
5769d76
e12da26
6fd7282
4c14fec
b3b5f61
62d58c1
641b3f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package com.amazonaws.services.lambda.runtime.events; | ||
|
||
import java.io.Serializable; | ||
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import lombok.EqualsAndHashCode; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
import lombok.ToString; | ||
|
||
/** | ||
* Created by vermshas on 6/28/24. | ||
* Event format is below: | ||
* { | ||
* "invocationId": "", | ||
* "sourceMSKArn": "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to model the sourceMSKArn There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added sourceMSKArn. |
||
* "deliveryStreamArn": "", | ||
* "region": "us-east-1", | ||
* "records": [ | ||
* { | ||
* "recordId": "00000000000000000000000000000000000000000000000000000000000000", | ||
* "approximateArrivalTimestamp": 1716369573887, | ||
* "mskRecordMetadata": { | ||
* "offset": "0", | ||
* "partitionId": "1", | ||
* "approximateArrivalTimestamp": 1716369573887 | ||
* }, | ||
* "kafkaRecordValue": "" | ||
* } | ||
* ] | ||
* } | ||
*/ | ||
|
||
@Getter | ||
@Setter | ||
@ToString | ||
@EqualsAndHashCode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use the following lombok annotations
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change done. |
||
|
||
public class MskFirehoseEvent implements Serializable, Cloneable { | ||
|
||
private static final long serialVersionUID = -2890373471008001695L; | ||
|
||
private String invocationId; | ||
|
||
private String deliveryStreamArn; | ||
|
||
private String region; | ||
|
||
private List<Record> records; | ||
|
||
@Getter | ||
@Setter | ||
@ToString | ||
@EqualsAndHashCode | ||
public static class Record implements Serializable, Cloneable { | ||
|
||
private static final long serialVersionUID = -7231161900431910379L; | ||
|
||
private ByteBuffer kafkaRecordValue; | ||
|
||
private String recordId; | ||
|
||
private Long approximateArrivalEpoch; | ||
|
||
private Long approximateArrivalTimestamp; | ||
|
||
private Map<String, String> mskRecordMetadata; | ||
|
||
public Record() {} | ||
|
||
public Record withRecordId(String recordId) { | ||
setRecordId(recordId); | ||
return this; | ||
} | ||
|
||
public Record withApproximateArrivalEpoch(Long approximateArrivalEpoch) { | ||
setApproximateArrivalEpoch(approximateArrivalEpoch); | ||
return this; | ||
} | ||
|
||
public Record withApproximateArrivalTimestamp(Long approximateArrivalTimestamp) { | ||
setApproximateArrivalTimestamp(approximateArrivalTimestamp); | ||
return this; | ||
} | ||
|
||
public Record withMskRecordMetadata(Map<String, String> mskRecordMetadata) { | ||
setMskRecordMetadata(mskRecordMetadata); | ||
return this; | ||
} | ||
|
||
@Override | ||
public Record clone() { | ||
try { | ||
return (Record) super.clone(); | ||
} catch (CloneNotSupportedException e) { | ||
throw new IllegalStateException("Got a CloneNotSupportedException from Object.clone()", e); | ||
} | ||
} | ||
|
||
} | ||
public MskFirehoseEvent() {} | ||
public MskFirehoseEvent withInvocationId(String invocationId) { | ||
setInvocationId(invocationId); | ||
return this; | ||
} | ||
public MskFirehoseEvent withDeliveryStreamArn(String deliveryStreamArn) { | ||
setDeliveryStreamArn(deliveryStreamArn); | ||
return this; | ||
} | ||
public MskFirehoseEvent withRegion(String region) { | ||
setRegion(region); | ||
return this; | ||
} | ||
public MskFirehoseEvent withRecords(List<Record> records) { | ||
setRecords(records); | ||
return this; | ||
} | ||
|
||
@Override | ||
public MskFirehoseEvent clone() { | ||
try { | ||
return (MskFirehoseEvent) super.clone(); | ||
} catch (CloneNotSupportedException e) { | ||
throw new IllegalStateException("Got a CloneNotSupportedException from Object.clone()", e); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link to the docs here