diff --git a/bom-internal/pom.xml b/bom-internal/pom.xml
index 09b11f3ebdf7..0f655db54a7a 100644
--- a/bom-internal/pom.xml
+++ b/bom-internal/pom.xml
@@ -15,9 +15,9 @@
- software.amazon
- flow
- ${flow.version}
+ software.amazon.eventstream
+ eventstream
+ ${eventstream.version}
commons-io
@@ -318,4 +318,4 @@
-
\ No newline at end of file
+
diff --git a/core/auth/pom.xml b/core/auth/pom.xml
index f079cf0e0672..9a0170b7f7e5 100644
--- a/core/auth/pom.xml
+++ b/core/auth/pom.xml
@@ -52,8 +52,8 @@
jackson-databind
- software.amazon
- flow
+ software.amazon.eventstream
+ eventstream
diff --git a/core/aws-core/pom.xml b/core/aws-core/pom.xml
index 6c524981f3f1..8b370014d6ba 100644
--- a/core/aws-core/pom.xml
+++ b/core/aws-core/pom.xml
@@ -53,8 +53,8 @@
slf4j-api
- software.amazon
- flow
+ software.amazon.eventstream
+ eventstream
@@ -146,5 +146,5 @@
-
+
diff --git a/flow/build.properties b/flow/build.properties
deleted file mode 100644
index 173b9d3c15f8..000000000000
--- a/flow/build.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-#
-# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License").
-# You may not use this file except in compliance with the License.
-# A copy of the License is located at
-#
-# http://aws.amazon.com/apache2.0
-#
-# or in the "license" file accompanying this file. This file is distributed
-# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
-# express or implied. See the License for the specific language governing
-# permissions and limitations under the License.
-#
-
-source.. = src/main/java,\
- src/main/resources
-output.. = bin/
-
-bin.includes = LICENSE.txt,\
- NOTICE.txt,\
- META-INF/,\
- .
-
-jre.compilation.profile = JavaSE-1.6
diff --git a/flow/pom.xml b/flow/pom.xml
deleted file mode 100644
index bebce13b303c..000000000000
--- a/flow/pom.xml
+++ /dev/null
@@ -1,90 +0,0 @@
-
-
-
- aws-sdk-java-pom
- software.amazon.awssdk
- 2.1.0-SNAPSHOT
-
- 4.0.0
-
- software.amazon
- flow
- ${flow.version}
-
- AWS Event Stream
- The AWS Event Stream decoder library.
- https://aws.amazon.com/sdkforjava
-
-
-
- UTF-8
- 1.8
-
-
-
-
-
- maven-surefire-plugin
- ${maven.surefire.version}
-
-
- org.junit.platform
- junit-platform-surefire-provider
- 1.2.0
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.7.0
-
-
- 1.8
- 1.8
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
- 3.1.0
-
- aws-flow-java
-
-
- software.amazon.eventstream
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- org.junit.vintage
- junit-vintage-engine
- 4.12.1
- test
-
-
- org.hamcrest
- hamcrest-all
- 1.3
- test
-
-
-
diff --git a/flow/src/main/java/software/amazon/eventstream/CRC32C.java b/flow/src/main/java/software/amazon/eventstream/CRC32C.java
deleted file mode 100644
index 63587fa85fc0..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/CRC32C.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.util.zip.Checksum;
-
-final class CRC32C implements Checksum {
- private static final int[] LOOKUP_TABLE = {
- 0x00000000, 0xF26B8303, 0xE13B70F7, 0x1350F3F4, 0xC79A971F, 0x35F1141C, 0x26A1E7E8, 0xD4CA64EB,
- 0x8AD958CF, 0x78B2DBCC, 0x6BE22838, 0x9989AB3B, 0x4D43CFD0, 0xBF284CD3, 0xAC78BF27, 0x5E133C24,
- 0x105EC76F, 0xE235446C, 0xF165B798, 0x030E349B, 0xD7C45070, 0x25AFD373, 0x36FF2087, 0xC494A384,
- 0x9A879FA0, 0x68EC1CA3, 0x7BBCEF57, 0x89D76C54, 0x5D1D08BF, 0xAF768BBC, 0xBC267848, 0x4E4DFB4B,
- 0x20BD8EDE, 0xD2D60DDD, 0xC186FE29, 0x33ED7D2A, 0xE72719C1, 0x154C9AC2, 0x061C6936, 0xF477EA35,
- 0xAA64D611, 0x580F5512, 0x4B5FA6E6, 0xB93425E5, 0x6DFE410E, 0x9F95C20D, 0x8CC531F9, 0x7EAEB2FA,
- 0x30E349B1, 0xC288CAB2, 0xD1D83946, 0x23B3BA45, 0xF779DEAE, 0x05125DAD, 0x1642AE59, 0xE4292D5A,
- 0xBA3A117E, 0x4851927D, 0x5B016189, 0xA96AE28A, 0x7DA08661, 0x8FCB0562, 0x9C9BF696, 0x6EF07595,
- 0x417B1DBC, 0xB3109EBF, 0xA0406D4B, 0x522BEE48, 0x86E18AA3, 0x748A09A0, 0x67DAFA54, 0x95B17957,
- 0xCBA24573, 0x39C9C670, 0x2A993584, 0xD8F2B687, 0x0C38D26C, 0xFE53516F, 0xED03A29B, 0x1F682198,
- 0x5125DAD3, 0xA34E59D0, 0xB01EAA24, 0x42752927, 0x96BF4DCC, 0x64D4CECF, 0x77843D3B, 0x85EFBE38,
- 0xDBFC821C, 0x2997011F, 0x3AC7F2EB, 0xC8AC71E8, 0x1C661503, 0xEE0D9600, 0xFD5D65F4, 0x0F36E6F7,
- 0x61C69362, 0x93AD1061, 0x80FDE395, 0x72966096, 0xA65C047D, 0x5437877E, 0x4767748A, 0xB50CF789,
- 0xEB1FCBAD, 0x197448AE, 0x0A24BB5A, 0xF84F3859, 0x2C855CB2, 0xDEEEDFB1, 0xCDBE2C45, 0x3FD5AF46,
- 0x7198540D, 0x83F3D70E, 0x90A324FA, 0x62C8A7F9, 0xB602C312, 0x44694011, 0x5739B3E5, 0xA55230E6,
- 0xFB410CC2, 0x092A8FC1, 0x1A7A7C35, 0xE811FF36, 0x3CDB9BDD, 0xCEB018DE, 0xDDE0EB2A, 0x2F8B6829,
- 0x82F63B78, 0x709DB87B, 0x63CD4B8F, 0x91A6C88C, 0x456CAC67, 0xB7072F64, 0xA457DC90, 0x563C5F93,
- 0x082F63B7, 0xFA44E0B4, 0xE9141340, 0x1B7F9043, 0xCFB5F4A8, 0x3DDE77AB, 0x2E8E845F, 0xDCE5075C,
- 0x92A8FC17, 0x60C37F14, 0x73938CE0, 0x81F80FE3, 0x55326B08, 0xA759E80B, 0xB4091BFF, 0x466298FC,
- 0x1871A4D8, 0xEA1A27DB, 0xF94AD42F, 0x0B21572C, 0xDFEB33C7, 0x2D80B0C4, 0x3ED04330, 0xCCBBC033,
- 0xA24BB5A6, 0x502036A5, 0x4370C551, 0xB11B4652, 0x65D122B9, 0x97BAA1BA, 0x84EA524E, 0x7681D14D,
- 0x2892ED69, 0xDAF96E6A, 0xC9A99D9E, 0x3BC21E9D, 0xEF087A76, 0x1D63F975, 0x0E330A81, 0xFC588982,
- 0xB21572C9, 0x407EF1CA, 0x532E023E, 0xA145813D, 0x758FE5D6, 0x87E466D5, 0x94B49521, 0x66DF1622,
- 0x38CC2A06, 0xCAA7A905, 0xD9F75AF1, 0x2B9CD9F2, 0xFF56BD19, 0x0D3D3E1A, 0x1E6DCDEE, 0xEC064EED,
- 0xC38D26C4, 0x31E6A5C7, 0x22B65633, 0xD0DDD530, 0x0417B1DB, 0xF67C32D8, 0xE52CC12C, 0x1747422F,
- 0x49547E0B, 0xBB3FFD08, 0xA86F0EFC, 0x5A048DFF, 0x8ECEE914, 0x7CA56A17, 0x6FF599E3, 0x9D9E1AE0,
- 0xD3D3E1AB, 0x21B862A8, 0x32E8915C, 0xC083125F, 0x144976B4, 0xE622F5B7, 0xF5720643, 0x07198540,
- 0x590AB964, 0xAB613A67, 0xB831C993, 0x4A5A4A90, 0x9E902E7B, 0x6CFBAD78, 0x7FAB5E8C, 0x8DC0DD8F,
- 0xE330A81A, 0x115B2B19, 0x020BD8ED, 0xF0605BEE, 0x24AA3F05, 0xD6C1BC06, 0xC5914FF2, 0x37FACCF1,
- 0x69E9F0D5, 0x9B8273D6, 0x88D28022, 0x7AB90321, 0xAE7367CA, 0x5C18E4C9, 0x4F48173D, 0xBD23943E,
- 0xF36E6F75, 0x0105EC76, 0x12551F82, 0xE03E9C81, 0x34F4F86A, 0xC69F7B69, 0xD5CF889D, 0x27A40B9E,
- 0x79B737BA, 0x8BDCB4B9, 0x988C474D, 0x6AE7C44E, 0xBE2DA0A5, 0x4C4623A6, 0x5F16D052, 0xAD7D5351
- };
-
- private int crc = 0xFFFF_FFFF;
-
- @Override
- public void update(int b) {
- crc = (crc >>> 8) ^ LOOKUP_TABLE[(crc ^ (b & 0xFF)) & 0xFF];
- }
-
- @Override
- public void update(byte[] bytes, int offset, int length) {
- for (int i = offset; i < offset + length; i++) {
- update(bytes[i]);
- }
- }
-
- @Override
- public long getValue() {
- return (~crc) & 0xFFFF_FFFFL;
- }
-
- @Override
- public void reset() {
- crc = 0xFFFF_FFFF;
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/Checksums.java b/flow/src/main/java/software/amazon/eventstream/Checksums.java
deleted file mode 100644
index 0e79ec03b667..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/Checksums.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.nio.ByteBuffer;
-import java.util.zip.Checksum;
-
-final class Checksums {
- private Checksums() {}
-
- static void update(Checksum checksum, ByteBuffer buffer) {
- if (buffer.hasArray()) {
- int pos = buffer.position();
- int off = buffer.arrayOffset();
- int limit = buffer.limit();
- int rem = limit - pos;
- checksum.update(buffer.array(), pos + off, rem);
- buffer.position(limit);
- } else {
- int length = buffer.remaining();
- byte[] b = new byte[length];
- buffer.get(b, 0, length);
- checksum.update(b, 0, length);
- }
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/Header.java b/flow/src/main/java/software/amazon/eventstream/Header.java
deleted file mode 100644
index c53e286f60b0..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/Header.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-class Header {
- private final String name;
- private final HeaderValue value;
-
- Header(String name, HeaderValue value) {
- this.name = requireNonNull(name);
- this.value = requireNonNull(value);
- }
-
- Header(String name, String value) {
- this(name, HeaderValue.fromString(value));
- }
-
- public String getName() {
- return name;
- }
-
- public HeaderValue getValue() {
- return value;
- }
-
- static Header decode(ByteBuffer buf) {
- String name = Utils.readShortString(buf);
- return new Header(name, HeaderValue.decode(buf));
- }
-
- static void encode(Map.Entry header, DataOutputStream dos) throws IOException {
- new Header(header.getKey(), header.getValue()).encode(dos);
- }
-
- void encode(DataOutputStream dos) throws IOException {
- Utils.writeShortString(dos, name);
- value.encode(dos);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Header header = (Header) o;
-
- if (!name.equals(header.name)) return false;
- return value.equals(header.value);
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + value.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "Header{"
- + "name='" + name + '\''
- + ", value=" + value
- + '}';
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/HeaderType.java b/flow/src/main/java/software/amazon/eventstream/HeaderType.java
deleted file mode 100644
index 65e452b43a98..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/HeaderType.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-enum HeaderType {
- TRUE(0),
- FALSE(1),
- BYTE(2),
- SHORT(3),
- INTEGER(4),
- LONG(5),
- BYTE_ARRAY(6),
- STRING(7),
- TIMESTAMP(8),
- UUID(9);
-
- final byte headerTypeId;
-
- HeaderType(int headerTypeId) {
- this.headerTypeId = (byte) headerTypeId;
- }
-
- static HeaderType fromTypeId(byte headerTypeId) {
- switch (headerTypeId) {
- case 0: return TRUE;
- case 1: return FALSE;
- case 2: return BYTE;
- case 3: return SHORT;
- case 4: return INTEGER;
- case 5: return LONG;
- case 6: return BYTE_ARRAY;
- case 7: return STRING;
- case 8: return TIMESTAMP;
- case 9: return UUID;
- default:
- throw new IllegalArgumentException("Got unknown headerTypeId " + headerTypeId);
- }
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/HeaderValue.java b/flow/src/main/java/software/amazon/eventstream/HeaderValue.java
deleted file mode 100644
index 30b502991b2b..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/HeaderValue.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Date;
-import java.util.UUID;
-
-import static java.util.Objects.requireNonNull;
-import static software.amazon.eventstream.HeaderType.TIMESTAMP;
-import static software.amazon.eventstream.HeaderType.fromTypeId;
-import static software.amazon.eventstream.Utils.writeBytes;
-import static software.amazon.eventstream.Utils.writeString;
-
-/**
- * A typed header value. The underlying value can be obtained by calling the
- * appropriate getter.
- */
-public abstract class HeaderValue {
- public static HeaderValue fromBoolean(boolean value) {
- return new BooleanValue(value);
- }
-
- public static HeaderValue fromByte(byte value) {
- return new ByteValue(value);
- }
-
- public static HeaderValue fromShort(short value) {
- return new ShortValue(value);
- }
-
- public static HeaderValue fromInteger(int value) {
- return new IntegerValue(value);
- }
-
- public static HeaderValue fromLong(long value) {
- return new LongValue(value);
- }
-
- public static HeaderValue fromByteArray(byte[] bytes) {
- return new ByteArrayValue(bytes);
- }
-
- public static HeaderValue fromByteBuffer(ByteBuffer buf) {
- buf = buf.duplicate();
- byte[] bytes = new byte[buf.remaining()];
- buf.get(bytes);
- return fromByteArray(bytes);
- }
-
- public static HeaderValue fromString(String string) {
- return new StringValue(string);
- }
-
- public static HeaderValue fromTimestamp(Instant value) {
- return new TimestampValue(value);
- }
-
- public static HeaderValue fromDate(Date value) {
- return new TimestampValue(value.toInstant());
- }
-
- public static HeaderValue fromUuid(UUID value) {
- return new UuidValue(value);
- }
-
- protected HeaderValue() {}
-
- public abstract HeaderType getType();
-
- public boolean getBoolean() {
- throw new IllegalStateException();
- }
-
- public byte getByte() {
- throw new IllegalStateException("Expected byte, but type was " + getType().name());
- }
-
- public short getShort() {
- throw new IllegalStateException("Expected short, but type was " + getType().name());
- }
-
- public int getInteger() {
- throw new IllegalStateException("Expected integer, but type was " + getType().name());
- }
-
- public long getLong() {
- throw new IllegalStateException("Expected long, but type was " + getType().name());
- }
-
- public byte[] getByteArray() {
- throw new IllegalStateException();
- }
-
- public final ByteBuffer getByteBuffer() {
- return ByteBuffer.wrap(getByteArray());
- }
-
- public String getString() {
- throw new IllegalStateException();
- }
-
- public Instant getTimestamp() {
- throw new IllegalStateException("Expected timestamp, but type was " + getType().name());
- }
-
- public Date getDate() {
- return Date.from(getTimestamp());
- }
-
- public UUID getUuid() {
- throw new IllegalStateException("Expected UUID, but type was " + getType().name());
- }
-
- void encode(DataOutputStream dos) throws IOException {
- dos.writeByte(getType().headerTypeId);
- encodeValue(dos);
- }
-
- abstract void encodeValue(DataOutputStream dos) throws IOException;
-
- static HeaderValue decode(ByteBuffer buf) {
- HeaderType type = fromTypeId(buf.get());
- switch (type) {
- case TRUE:
- return new BooleanValue(true);
- case FALSE:
- return new BooleanValue(false);
- case BYTE:
- return new ByteValue(buf.get());
- case SHORT:
- return new ShortValue(buf.getShort());
- case INTEGER:
- return fromInteger(buf.getInt());
- case LONG:
- return new LongValue(buf.getLong());
- case BYTE_ARRAY:
- return fromByteArray(Utils.readBytes(buf));
- case STRING:
- return fromString(Utils.readString(buf));
- case TIMESTAMP:
- return TimestampValue.decode(buf);
- case UUID:
- return UuidValue.decode(buf);
- default:
- throw new IllegalStateException();
- }
- }
-
- private static final class BooleanValue extends HeaderValue {
- private final boolean value;
-
- private BooleanValue(boolean value) {
- this.value = value;
- }
-
- @Override
- public HeaderType getType() {
- if (value) {
- return HeaderType.TRUE;
- } else {
- return HeaderType.FALSE;
- }
- }
-
- @Override
- public boolean getBoolean() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) {}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- BooleanValue that = (BooleanValue) o;
-
- return value == that.value;
- }
-
- @Override
- public int hashCode() {
- if (value) {
- return 1;
- } else {
- return 0;
- }
- }
-
- @Override
- public String toString() {
- return String.valueOf(value);
- }
- }
-
- private static final class ByteValue extends HeaderValue {
- private final byte value;
-
- private ByteValue(byte value) {
- this.value = value;
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.BYTE;
- }
-
- @Override
- public byte getByte() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) {}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ByteValue that = (ByteValue) o;
-
- return value == that.value;
- }
-
- @Override
- public int hashCode() {
- return (int) value;
- }
-
- @Override
- public String toString() {
- return String.valueOf(value);
- }
- }
-
- private static final class ShortValue extends HeaderValue {
- private final short value;
-
- private ShortValue(short value) {
- this.value = value;
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.SHORT;
- }
-
- @Override
- public short getShort() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) {}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ShortValue that = (ShortValue) o;
-
- return value == that.value;
- }
-
- @Override
- public int hashCode() {
- return (int) value;
- }
-
- @Override
- public String toString() {
- return String.valueOf(value);
- }
- }
-
- private static final class IntegerValue extends HeaderValue {
- private final int value;
-
- private IntegerValue(int value) {
- this.value = value;
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.INTEGER;
- }
-
- @Override
- public int getInteger() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) throws IOException {
- dos.writeInt(value);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- IntegerValue that = (IntegerValue) o;
-
- return value == that.value;
- }
-
- @Override
- public int hashCode() {
- return value;
- }
-
- @Override
- public String toString() {
- return String.valueOf(value);
- }
- }
-
- private static final class LongValue extends HeaderValue {
- private final long value;
-
- private LongValue(long value) {
- this.value = value;
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.LONG;
- }
-
- @Override
- public long getLong() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) throws IOException {
- dos.writeLong(value);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- LongValue longValue = (LongValue) o;
-
- return value == longValue.value;
- }
-
- @Override
- public int hashCode() {
- return (int) (value ^ (value >>> 32));
- }
-
- @Override
- public String toString() {
- return String.valueOf(value);
- }
- }
-
- private static final class ByteArrayValue extends HeaderValue {
- private final byte[] value;
-
- private ByteArrayValue(byte[] value) {
- this.value = requireNonNull(value);
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.BYTE_ARRAY;
- }
-
- @Override
- public byte[] getByteArray() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) throws IOException {
- writeBytes(dos, value);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ByteArrayValue that = (ByteArrayValue) o;
-
- return Arrays.equals(value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(value);
- }
-
- @Override
- public String toString() {
- return Base64.getEncoder().encodeToString(value);
- }
- }
-
- private static final class StringValue extends HeaderValue {
- private final String value;
-
- private StringValue(String value) {
- this.value = requireNonNull(value);
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.STRING;
- }
-
- @Override
- public String getString() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) throws IOException {
- writeString(dos, value);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- StringValue that = (StringValue) o;
-
- return value.equals(that.value);
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
-
- @Override
- public String toString() {
- return '"' + value + '"';
- }
- }
-
- private static final class TimestampValue extends HeaderValue {
- private final Instant value;
-
- private TimestampValue(Instant value) {
- this.value = requireNonNull(value);
- }
-
- static TimestampValue decode(ByteBuffer buf) {
- long epochMillis = buf.getLong();
- return new TimestampValue(Instant.ofEpochMilli(epochMillis));
- }
-
- @Override
- public HeaderType getType() {
- return TIMESTAMP;
- }
-
- @Override
- public Instant getTimestamp() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) throws IOException {
- dos.writeLong(value.toEpochMilli());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimestampValue that = (TimestampValue) o;
-
- return value.equals(that.value);
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
-
- @Override
- public String toString() {
- return value.toString();
- }
- }
-
- private static final class UuidValue extends HeaderValue {
- private final UUID value;
-
- private UuidValue(UUID value) {
- this.value = requireNonNull(value);
- }
-
- static UuidValue decode(ByteBuffer buf) {
- long msb = buf.getLong();
- long lsb = buf.getLong();
- return new UuidValue(new UUID(msb, lsb));
- }
-
- @Override
- public HeaderType getType() {
- return HeaderType.UUID;
- }
-
- @Override
- public UUID getUuid() {
- return value;
- }
-
- @Override
- void encodeValue(DataOutputStream dos) throws IOException {
- dos.writeLong(value.getMostSignificantBits());
- dos.writeLong(value.getLeastSignificantBits());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- UuidValue uuidValue = (UuidValue) o;
-
- return value.equals(uuidValue.value);
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
-
- @Override
- public String toString() {
- return value.toString();
- }
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/Message.java b/flow/src/main/java/software/amazon/eventstream/Message.java
deleted file mode 100644
index 360115175f86..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/Message.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedOutputStream;
-import java.util.zip.Checksum;
-
-import static java.lang.String.format;
-
-/**
- * An eventstream message.
- */
-public class Message {
- private static final int TRAILING_CRC_LENGTH = 4;
- static final int MESSAGE_OVERHEAD = Prelude.LENGTH_WITH_CRC + TRAILING_CRC_LENGTH;
-
- private final Map headers;
- private final byte[] payload;
-
- public Message(Map headers, byte[] payload) {
- this.headers = headers;
- this.payload = payload.clone();
- }
-
- public Map getHeaders() {
- return headers;
- }
-
- public byte[] getPayload() {
- return payload.clone();
- }
-
- public static Message decode(ByteBuffer buf) {
- return decode(Prelude.decode(buf), buf);
- }
-
- /**
- * Decodes a message with an already decoded prelude. Useful for not decoding the prelude twice.
- *
- * @param prelude Decoded prelude of message.
- * @param buf Data of message (including prelude which will be skipped over).
- * @return Decoded message
- */
- static Message decode(Prelude prelude, ByteBuffer buf) {
- int totalLength = prelude.getTotalLength();
- validateMessageCrc(buf, totalLength);
- buf.position(buf.position() + Prelude.LENGTH_WITH_CRC);
-
- long headersLength = prelude.getHeadersLength();
- byte[] headerBytes = new byte[Math.toIntExact(headersLength)];
- buf.get(headerBytes);
- Map headers = decodeHeaders(ByteBuffer.wrap(headerBytes));
-
- byte[] payload = new byte[Math.toIntExact(totalLength - MESSAGE_OVERHEAD - headersLength)];
- buf.get(payload);
- buf.getInt(); // skip past the message CRC
-
- return new Message(headers, payload);
- }
-
- private static void validateMessageCrc(ByteBuffer buf, int totalLength) {
- Checksum crc = new CRC32();
-
- Checksums.update(crc, (ByteBuffer) buf.duplicate().limit(buf.position() + totalLength - 4));
- long computedMessageCrc = crc.getValue();
-
- long wireMessageCrc = Integer.toUnsignedLong(buf.getInt(buf.position() + totalLength - 4));
-
- if (wireMessageCrc != computedMessageCrc) {
- throw new IllegalArgumentException(format("Message checksum failure: expected 0x%x, computed 0x%x",
- wireMessageCrc, computedMessageCrc));
- }
- }
-
- static Map decodeHeaders(ByteBuffer buf) {
- Map headers = new HashMap<>();
-
- while (buf.hasRemaining()) {
- Header header = Header.decode(buf);
- headers.put(header.getName(), header.getValue());
- }
-
- return Collections.unmodifiableMap(headers);
- }
-
- public ByteBuffer toByteBuffer() {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- encode(baos);
- baos.close();
- return ByteBuffer.wrap(baos.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void encode(OutputStream os) {
- try {
- CheckedOutputStream checkedOutputStream = new CheckedOutputStream(os, new CRC32());
- encodeOrThrow(checkedOutputStream);
- long messageCrc = checkedOutputStream.getChecksum().getValue();
- os.write((int) (0xFF & messageCrc >> 24));
- os.write((int) (0xFF & messageCrc >> 16));
- os.write((int) (0xFF & messageCrc >> 8));
- os.write((int) (0xFF & messageCrc));
-
- os.flush();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Encode the given {@code headers}, without any leading or trailing metadata such as checksums or lengths.
- *
- * @param headers a sequence of zero or more headers, which will be encoded in iteration order
- * @return a byte array corresponding to the {@code headers} section of a {@code Message}
- */
- public static byte[] encodeHeaders(Iterable> headers) {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- for (Entry entry : headers) {
- Header.encode(entry, dos);
- }
- dos.close();
- return baos.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void encodeOrThrow(OutputStream os) throws IOException {
- ByteArrayOutputStream headersAndPayload = new ByteArrayOutputStream();
- headersAndPayload.write(encodeHeaders(headers.entrySet()));
- headersAndPayload.write(payload);
-
- int totalLength = Prelude.LENGTH_WITH_CRC + headersAndPayload.size() + 4;
-
- {
- byte[] preludeBytes = getPrelude(totalLength);
- Checksum crc = new CRC32();
- crc.update(preludeBytes, 0, preludeBytes.length);
-
- DataOutputStream dos = new DataOutputStream(os);
- dos.write(preludeBytes);
- long value = crc.getValue();
- int value1 = (int) value;
- dos.writeInt(value1);
- dos.flush();
- }
-
- headersAndPayload.writeTo(os);
- }
-
- private byte[] getPrelude(int totalLength) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(8);
- DataOutputStream dos = new DataOutputStream(baos);
-
- int headerLength = totalLength - Message.MESSAGE_OVERHEAD - payload.length;
- dos.writeInt(totalLength);
- dos.writeInt(headerLength);
-
- dos.close();
- return baos.toByteArray();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Message message = (Message) o;
-
- if (!headers.equals(message.headers)) return false;
- return Arrays.equals(payload, message.payload);
- }
-
- @Override
- public int hashCode() {
- int result = headers.hashCode();
- result = 31 * result + Arrays.hashCode(payload);
- return result;
- }
-
- @Override
- public String toString() {
- StringBuilder ret = new StringBuilder();
-
- for (Entry entry : headers.entrySet()) {
- ret.append(entry.getKey());
- ret.append(": ");
- ret.append(entry.getValue().toString());
- ret.append('\n');
- }
- ret.append('\n');
-
- String contentType = headers.getOrDefault(":content-type", HeaderValue.fromString("application/octet-stream"))
- .getString();
- if (contentType.contains("json") || contentType.contains("text")) {
- ret.append(new String(payload, StandardCharsets.UTF_8));
- } else {
- ret.append(Base64.getEncoder().encodeToString(payload));
- }
- ret.append('\n');
- return ret.toString();
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/MessageBuilder.java b/flow/src/main/java/software/amazon/eventstream/MessageBuilder.java
deleted file mode 100644
index 73387fb68126..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/MessageBuilder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.util.Map;
-
-/**
- * An abstraction for message construction. This interface can be used to
- * customize message creation in different ways, such as implementing
- * decorators that add headers or manipulate the message payload.
- */
-@FunctionalInterface
-public interface MessageBuilder {
- Message build(Map headers, byte[] payload);
-
- static MessageBuilder defaultBuilder() {
- return Message::new;
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/MessageDecoder.java b/flow/src/main/java/software/amazon/eventstream/MessageDecoder.java
deleted file mode 100644
index e24bdcc9c516..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/MessageDecoder.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package software.amazon.eventstream;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Consumer;
-
-/**
- * A simple decoder that accumulates chunks of bytes and emits eventstream
- * messages. Instances of this class are not thread-safe.
- */
-public final class MessageDecoder {
-
- /**
- * Initial buffer size is 2MB. Will grow as needed to accommodate larger messages.
- */
- private static final int INITIAL_BUFFER_SIZE = 2048 * 1024;
-
- private final Consumer messageConsumer;
- private List bufferedOutput;
- private ByteBuffer buf;
- private Prelude currentPrelude;
-
- /**
- * Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded
- * messages can be obtained by calling {@link #getDecodedMessages()}.
- */
- public MessageDecoder() {
- this.messageConsumer = message -> this.bufferedOutput.add(message);
- this.bufferedOutput = new ArrayList<>();
- this.buf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
- }
-
- /**
- * Creates a {@code MessageDecoder} instance that will publish messages incrementally to the supplied {@code
- * messageConsumer} as they are decoded. The resulting instance does not support the {@link #getDecodedMessages()}
- * operation, and will throw an exception if it is invoked.
- *
- * @param messageConsumer a function that consumes {@link Message} instances
- */
- public MessageDecoder(Consumer messageConsumer) {
- this(messageConsumer, INITIAL_BUFFER_SIZE);
- }
-
- /**
- * To be used by tests only.
- */
- MessageDecoder(Consumer messageConsumer, int initialBufferSize) {
- this.messageConsumer = messageConsumer;
- this.buf = ByteBuffer.allocate(initialBufferSize);
- this.bufferedOutput = null;
- }
-
- /**
- * Returns {@link Message} instances that have been decoded since this method was last invoked. Note that this
- * method is only supported if this decoder was not configured to use a custom message consumer.
- *
- * @return all messages decoded since the last invocation of this method
- */
- public List getDecodedMessages() {
- if (bufferedOutput == null) {
- throw new IllegalStateException("");
- }
- List ret = bufferedOutput;
- bufferedOutput = new ArrayList<>();
- return Collections.unmodifiableList(ret);
- }
-
- public void feed(byte[] bytes) {
- feed(ByteBuffer.wrap(bytes));
- }
-
- public void feed(byte[] bytes, int offset, int length) {
- feed(ByteBuffer.wrap(bytes, offset, length));
- }
-
- /**
- * Feed the contents of the given {@link ByteBuffer} into this decoder. Messages will be incrementally decoded and
- * buffered or published to the message consumer (depending on configuration).
- *
- * @param byteBuffer a {@link ByteBuffer} whose entire contents will be read into the decoder's internal buffer
- * @return this {@code MessageDecoder} instance
- */
- public MessageDecoder feed(ByteBuffer byteBuffer) {
- int bytesToRead = byteBuffer.remaining();
- int bytesConsumed = 0;
- while (bytesConsumed < bytesToRead) {
- ByteBuffer readView = updateReadView();
- if (currentPrelude == null) {
- // Put only 15 bytes into buffer and compute prelude.
- int numBytesToWrite = Math.min(15 - readView.remaining(),
- bytesToRead - bytesConsumed);
-
- feedBuf(byteBuffer, numBytesToWrite);
-
- bytesConsumed += numBytesToWrite;
- readView = updateReadView();
-
- // Have enough data to decode the prelude
- if (readView.remaining() >= 15) {
- currentPrelude = Prelude.decode(readView.duplicate());
- if (buf.capacity() < currentPrelude.getTotalLength()) {
- // Don't have enough capacity to hold this message, grow the buffer
- buf = ByteBuffer.allocate(currentPrelude.getTotalLength());
- buf.put(readView);
- readView = updateReadView();
- }
- }
- }
- // We might not have received enough data to decode the prelude so check for null again
- if (currentPrelude != null) {
- // Only write up to what we need to decode the next message
- int numBytesToWrite = Math.min(currentPrelude.getTotalLength() - readView.remaining(),
- bytesToRead - bytesConsumed);
-
- feedBuf(byteBuffer, numBytesToWrite);
- bytesConsumed += numBytesToWrite;
- readView = updateReadView();
-
- // If we have enough data to decode the message do so and reset the buffer for the next message
- if (readView.remaining() >= currentPrelude.getTotalLength()) {
- messageConsumer.accept(Message.decode(currentPrelude, readView));
- buf.clear();
- currentPrelude = null;
- }
- }
- }
-
- return this;
- }
-
- private void feedBuf(ByteBuffer byteBuffer, int numBytesToWrite) {
- buf.put((ByteBuffer) byteBuffer.duplicate().limit(byteBuffer.position() + numBytesToWrite));
- byteBuffer.position(byteBuffer.position() + numBytesToWrite);
- }
-
- private ByteBuffer updateReadView() {
- return (ByteBuffer) buf.duplicate().flip();
- }
-
- /**
- * To be used by tests only.
- */
- int currentBufferSize() {
- return buf.capacity();
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/Prelude.java b/flow/src/main/java/software/amazon/eventstream/Prelude.java
deleted file mode 100644
index 96f3f21a6bc3..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/Prelude.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.nio.ByteBuffer;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
-
-import static java.lang.String.format;
-
-final class Prelude {
- static final int LENGTH = 8;
- static final int LENGTH_WITH_CRC = LENGTH + 4;
-
- private final int totalLength;
- private final long headersLength;
-
- private Prelude(int totalLength, long headersLength) {
- this.totalLength = totalLength;
- this.headersLength = headersLength;
- }
-
- static Prelude decode(ByteBuffer buf) {
- buf = buf.duplicate();
-
- long computedPreludeCrc = computePreludeCrc(buf);
-
- long totalLength = Integer.toUnsignedLong(buf.getInt());
- long headersLength = Integer.toUnsignedLong(buf.getInt());
- long wirePreludeCrc = Integer.toUnsignedLong(buf.getInt());
- if (computedPreludeCrc != wirePreludeCrc) {
- throw new IllegalArgumentException(format("Prelude checksum failure: expected 0x%x, computed 0x%x",
- wirePreludeCrc, computedPreludeCrc));
- }
-
- if (headersLength < 0 || headersLength > 131_072) {
- throw new IllegalArgumentException("Illegal headers_length value: " + headersLength);
- }
-
- long payloadLength = (totalLength - headersLength) - Message.MESSAGE_OVERHEAD;
- // This implementation temporarily accepts larger payloads than the spec permits.
- if (payloadLength < 0 || payloadLength > 25_165_824) {
- throw new IllegalArgumentException("Illegal payload size: " + payloadLength);
- }
-
- return new Prelude(Math.toIntExact(totalLength), headersLength);
- }
-
- private static long computePreludeCrc(ByteBuffer buf) {
- byte[] prelude = new byte[Prelude.LENGTH];
- buf.duplicate().get(prelude);
-
- Checksum crc = new CRC32();
- crc.update(prelude, 0, prelude.length);
- return crc.getValue();
- }
-
- int getTotalLength() {
- return totalLength;
- }
-
- long getHeadersLength() {
- return headersLength;
- }
-}
diff --git a/flow/src/main/java/software/amazon/eventstream/Utils.java b/flow/src/main/java/software/amazon/eventstream/Utils.java
deleted file mode 100644
index 0aedf48cdba1..000000000000
--- a/flow/src/main/java/software/amazon/eventstream/Utils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-final class Utils {
- private Utils() {}
-
- static String readShortString(ByteBuffer buf) {
- int length = buf.get() & 0xFF;
- checkStringBounds(length, 255);
- byte[] bytes = new byte[length];
- buf.get(bytes);
- return new String(bytes, UTF_8);
- }
-
- static String readString(ByteBuffer buf) {
- int length = buf.getShort() & 0xFFFF;
- checkStringBounds(length, 32767);
- byte[] bytes = new byte[length];
- buf.get(bytes);
- return new String(bytes, UTF_8);
- }
-
- static byte[] readBytes(ByteBuffer buf) {
- int length = buf.getShort() & 0xFFFF;
- checkByteArrayBounds(length);
- byte[] bytes = new byte[length];
- buf.get(bytes);
- return bytes;
- }
-
- static void writeShortString(DataOutputStream dos, String string) throws IOException {
- byte[] bytes = string.getBytes(UTF_8);
- checkStringBounds(bytes.length, 255);
- dos.writeByte(bytes.length);
- dos.write(bytes);
- }
-
- static void writeString(DataOutputStream dos, String string) throws IOException {
- byte[] bytes = string.getBytes(UTF_8);
- checkStringBounds(bytes.length, 32767);
- writeBytes(dos, bytes);
- }
-
- static void writeBytes(DataOutputStream dos, byte[] bytes) throws IOException {
- checkByteArrayBounds(bytes.length);
- dos.writeShort((short) bytes.length);
- dos.write(bytes);
- }
-
- private static void checkByteArrayBounds(int length) {
- if (length == 0) {
- throw new IllegalArgumentException("Byte arrays may not be empty");
- }
- if (length > 32767) {
- throw new IllegalArgumentException("Illegal byte array length: " + length);
- }
- }
-
- private static void checkStringBounds(int length, int maxLength) {
- if (length == 0) {
- throw new IllegalArgumentException("Strings may not be empty");
- }
- if (length > maxLength) {
- throw new IllegalArgumentException("Illegal string length: " + length);
- }
- }
-}
diff --git a/flow/src/test/java/software/amazon/eventstream/HeaderTest.java b/flow/src/test/java/software/amazon/eventstream/HeaderTest.java
deleted file mode 100644
index 135a5034be5b..000000000000
--- a/flow/src/test/java/software/amazon/eventstream/HeaderTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static software.amazon.eventstream.HeaderValue.fromInteger;
-
-public class HeaderTest {
- @Test
- public void genericHeaders() throws Exception {
- roundTrip(new Header("test-string-header", "test-string-value"));
- roundTrip(new Header("test-byte-array-header", HeaderValue.fromByteArray(bb(1, 2, 3, 4, 5, 6, 7, 8))));
- roundTrip(new Header("test-uint32-header", fromInteger(8918230)));
- }
-
- @Test
- public void typeId() {
- for (byte i = 0; i <= 9; i++) {
- assertEquals(i, HeaderType.fromTypeId(i).headerTypeId);
- }
- }
-
- static void roundTrip(Header header) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (DataOutputStream dos = new DataOutputStream(baos)) {
- header.encode(dos);
- }
- byte[] bytes = baos.toByteArray();
- Header actual = Header.decode(ByteBuffer.wrap(bytes));
-
- assertEquals(header, actual);
- }
-
- static byte[] bb(int... bytes) {
- byte[] bs = new byte[bytes.length];
- for (int i = 0; i < bytes.length; i++) {
- bs[i] = (byte) bytes[i];
- }
- return bs;
- }
-}
diff --git a/flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java b/flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java
deleted file mode 100644
index 78c181ea643a..000000000000
--- a/flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-package software.amazon.eventstream;
-
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MessageDecoderTest {
- long SEED = 8912374098123423L;
-
- @Test
- public void testDecoder() throws Exception {
- TestUtils utils = new TestUtils(SEED);
- Random rand = new Random(SEED);
- List expected = IntStream.range(0, 10_000)
- .mapToObj(x -> utils.randomMessage())
- .collect(Collectors.toList());
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add);
- while (buf.remaining() > 0) {
- int bufSize = Math.min(1 + rand.nextInt(1024), buf.remaining());
- byte[] bs = new byte[bufSize];
- buf.get(bs);
- decoder.feed(bs);
- }
-
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void testDecoder_WithOffset() throws Exception {
- TestUtils utils = new TestUtils(SEED);
- Random rand = new Random(SEED);
- List expected = IntStream.range(0, 10_000)
- .mapToObj(x -> utils.randomMessage())
- .collect(Collectors.toList());
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- byte[] data = baos.toByteArray();
- int toRead = data.length;
- int read = 0;
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add);
- while (toRead > 0) {
- int length = rand.nextInt(100);
- if (read + length > data.length) {
- length = data.length - read;
- }
- decoder.feed(data, read, length);
- read += length;
- toRead -= length;
- }
-
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void preludeFedFirst_DecodesCorrectly() {
- TestUtils utils = new TestUtils(SEED);
- Message message = utils.randomMessage();
- int messageSize = message.toByteBuffer().remaining();
- List expected = Collections.singletonList(message);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
-
- // Feed just the prelude in it's entirety
- byte[] bs = new byte[15];
- buf.get(bs);
- decoder.feed(bs);
-
- // No messages should be decoded yet
- assertThat(actual, Matchers.hasSize(0));
-
- // Feed rest of message in it's entirety.
- bs = new byte[messageSize - 15];
- buf.get(bs);
- decoder.feed(bs);
-
- // Should have successfully decoded the one message
- assertThat(actual, Matchers.hasSize(1));
- }
-
- @Test
- public void preludeFedInParts_DecodesCorrectly() {
- TestUtils utils = new TestUtils(SEED);
- Message message = utils.randomMessage();
- int messageSize = message.toByteBuffer().remaining();
- List expected = Collections.singletonList(message);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
-
- // Feed the prelude in parts
- byte[] bs = new byte[7];
- buf.get(bs);
- decoder.feed(bs);
-
- // Feed rest of prelude
- bs = new byte[8];
- buf.get(bs);
- decoder.feed(bs);
-
- // No messages should be decoded yet
- assertThat(actual, Matchers.hasSize(0));
-
- // Feed rest of message in it's entirety.
- bs = new byte[messageSize - 15];
- buf.get(bs);
- decoder.feed(bs);
-
- // Should have successfully decoded the one message
- assertThat(actual, Matchers.hasSize(1));
- }
-
- @Test
- public void bufferNeedsToGrow() {
- TestUtils utils = new TestUtils(SEED);
- Message message = utils.randomMessage(8192 * 2);
- int messageSize = message.toByteBuffer().remaining();
- List expected = Collections.singletonList(message);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
-
- // Feed all at once
- byte[] bs = new byte[messageSize];
- buf.get(bs);
- decoder.feed(bs);
-
- // Should have successfully decoded the one message
- assertThat(actual, Matchers.hasSize(1));
- }
-
- @Test
- public void multipleMessagesDoesNotGrowBuffer() {
- TestUtils utils = new TestUtils(SEED);
- Message message = utils.randomMessage(4096);
- List expected = IntStream.range(0, 100)
- .mapToObj(x -> message)
- .collect(Collectors.toList());
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
-
- // Feed all at once
- byte[] bs = new byte[buf.capacity()];
- buf.get(bs);
- decoder.feed(bs);
-
- assertEquals(expected, actual);
- assertEquals(8192, decoder.currentBufferSize());
- }
-
- @Test
- public void multipleLargeMessages_GrowsBufferAsNeeded() {
- TestUtils utils = new TestUtils(SEED);
- Message message = utils.randomMessage(9001);
- List expected = IntStream.range(0, 100)
- .mapToObj(x -> message)
- .collect(Collectors.toList());
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.forEach(x -> x.encode(baos));
- ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-
- List actual = new ArrayList<>();
- MessageDecoder decoder = new MessageDecoder(actual::add, 8192);
-
- // Feed all at once
- byte[] bs = new byte[buf.capacity()];
- buf.get(bs);
- decoder.feed(bs);
-
- assertEquals(expected, actual);
- assertThat(decoder.currentBufferSize(), greaterThan(9001));
- }
-
-}
diff --git a/flow/src/test/java/software/amazon/eventstream/MessageTest.java b/flow/src/test/java/software/amazon/eventstream/MessageTest.java
deleted file mode 100644
index cb4de8201592..000000000000
--- a/flow/src/test/java/software/amazon/eventstream/MessageTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package software.amazon.eventstream;
-
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Collections.emptyMap;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class MessageTest {
- @Test
- public void emptyVector() {
- Message message = new Message(emptyMap(), new byte[]{});
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- message.encode(baos);
- byte[] bytes = baos.toByteArray();
-
- byte[] expected = new byte[]{
- 0, 0, 0, 16, // total_length
- 0, 0, 0, 0, // headers_length
- 5, -62, 72, -21, // prelude_crc
- 125, -104, -56, -1 // message_crc
- };
-
- assertArrayEquals(expected, bytes);
-
- assertEquals(message, Message.decode(ByteBuffer.wrap(expected)));
- }
-
- @Test
- public void appdataVector() {
- byte[] bytes = new byte[]{
- 0, 0, 0, 0x1d, // total_length
- 0, 0, 0, 0, // headers_length
- (byte) 0xfd, 0x52, (byte) 0x8c, 0x5a, // prelude_crc
- 0x7b, 0x27, 0x66, 0x6f, 0x6f, 0x27, 0x3a, // payload
- 0x27, 0x62, 0x61, 0x72, 0x27, 0x7d,
- // 0xc3 65 39 36
- (byte) 0xc3, 0x65, 0x39, 0x36 // message_crc
- };
-
- Message message = new Message(emptyMap(), "{'foo':'bar'}".getBytes(UTF_8));
-
- assertEquals(message, Message.decode(ByteBuffer.wrap(bytes)));
- }
-
- @Test
- public void roundTripTests() {
- roundTrip(new Message(emptyMap(), new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }));
-
- Map headers = new HashMap<>();
- headers.put(":content-type", HeaderValue.fromString("application/json"));
- headers.put("content-encoding", HeaderValue.fromString("gzip"));
- headers.put("request-id", HeaderValue.fromByteArray(new byte[]{ 1, 2, 3, 4, 5 }));
- headers.put("more-stuff", HeaderValue.fromInteger(27));
- roundTrip(new Message(headers, new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
- }
-
- @Test
- public void generativeTest() throws Exception {
- long SEED = 8912374098123423L;
- TestUtils utils = new TestUtils(SEED);
- Random rand = new Random(SEED);
- for (int i = 0; i < 10_000; i++) {
- byte[] padding = new byte[rand.nextInt(128 * 1024)];
- rand.nextBytes(padding);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write(padding);
- Message message = utils.randomMessage();
- message.encode(baos);
-
- int arraylen = 256 * 1024;
- int arrayoffset = rand.nextInt(4 * 1024);
- ByteBuffer buf = ((ByteBuffer) ByteBuffer.allocate(arraylen).position(arrayoffset)).slice();
- buf.put(baos.toByteArray());
- buf.flip();
-
- buf.get(padding);
- Message decoded = Message.decode(buf);
-
- assertEquals(message, decoded);
- }
- }
-
- static void roundTrip(Message expected) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- expected.encode(baos);
- Message actual = Message.decode(ByteBuffer.wrap(baos.toByteArray()));
- assertEquals(expected, actual);
- }
-}
diff --git a/flow/src/test/java/software/amazon/eventstream/PreludeTest.java b/flow/src/test/java/software/amazon/eventstream/PreludeTest.java
deleted file mode 100644
index a13c437c408c..000000000000
--- a/flow/src/test/java/software/amazon/eventstream/PreludeTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package software.amazon.eventstream;
-
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-public class PreludeTest {
- @Test(expected = IllegalArgumentException.class)
- public void maxPayloadSize() throws Exception {
- ByteBuffer buf = ByteBuffer.wrap(new byte[15]);
- buf.putInt(16777217 + 19); // total length
- buf.put((byte) 1); // ApplicationData
- buf.putShort((short) 0); // unmodeled data
- buf.putInt(0); // headers_length
- buf.putInt(0xf0c8e628); // prelude_crc
- buf.flip();
-
- Prelude.decode(buf);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void maxHeaderSize() throws Exception {
- ByteBuffer buf = ByteBuffer.wrap(new byte[15]);
- buf.putInt(131073 + 19); // total length
- buf.put((byte) 1); // ApplicationData
- buf.putShort((short) 0); // unmodeled data
- buf.putInt(131073); // headers_length
- buf.putInt(0x49fd415c); // prelude_crc
- buf.flip();
-
- Prelude.decode(buf);
- }
-}
diff --git a/flow/src/test/java/software/amazon/eventstream/TestUtils.java b/flow/src/test/java/software/amazon/eventstream/TestUtils.java
deleted file mode 100644
index cae03168b1a0..000000000000
--- a/flow/src/test/java/software/amazon/eventstream/TestUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package software.amazon.eventstream;
-
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-
-public class TestUtils {
- private final Random rand;
- private final int maxHeaders;
- private final int maxHeaderSize;
- private final int maxPayloadSize;
-
- public TestUtils(long seed) {
- this.rand = new Random(seed);
- this.maxHeaders = 20;
- this.maxHeaderSize = 64;
- this.maxPayloadSize = 4096;
- }
-
- public Message randomMessage() {
- return new Message(randomHeaders(), randomPayload());
- }
-
- public Message randomMessage(int payloadSize) {
- return new Message(randomHeaders(), randomPayload(payloadSize));
- }
-
- private Map randomHeaders() {
- Map headers = new HashMap<>();
- int numHeaders = rand.nextInt(maxHeaders + 1);
- for (int i = 0; i < numHeaders; i++) {
- headers.put("asdf" + rand.nextInt(), randomHeaderValue());
- }
- return headers;
- }
-
- private HeaderValue randomHeaderValue() {
- switch (rand.nextInt(7)) {
- case 0:
- return HeaderValue.fromInteger(rand.nextInt(Integer.MAX_VALUE));
- case 1:
- int bytes = rand.nextInt(maxHeaderSize + 1) + 1;
- byte[] buf = new byte[bytes];
- rand.nextBytes(buf);
- return HeaderValue.fromByteArray(buf);
- case 2:
- return HeaderValue.fromString("asdf");
- case 3:
- return HeaderValue.fromBoolean(true);
- case 4:
- return HeaderValue.fromBoolean(false);
- case 5:
- return HeaderValue.fromTimestamp(Instant.ofEpochMilli(rand.nextLong()));
- case 6:
- return HeaderValue.fromUuid(new UUID(rand.nextLong(), rand.nextLong()));
- default:
- throw new IllegalStateException();
- }
- }
-
- private byte[] randomPayload() {
- return randomPayload(rand.nextInt(maxPayloadSize + 1));
- }
-
- private byte[] randomPayload(int payloadSize) {
- byte[] ret = new byte[payloadSize];
- rand.nextBytes(ret);
- return ret;
- }
-}
diff --git a/flow/src/test/java/software/amazon/eventstream/ToStringTest.java b/flow/src/test/java/software/amazon/eventstream/ToStringTest.java
deleted file mode 100644
index 9c2ab69453ed..000000000000
--- a/flow/src/test/java/software/amazon/eventstream/ToStringTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package software.amazon.eventstream;
-
-import org.junit.Test;
-
-import java.time.Instant;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-
-public class ToStringTest {
- @Test
- public void headerTypes() throws Exception {
- Map headers = new LinkedHashMap<>();
- headers.put(":content-type", HeaderValue.fromString("application/json"));
- headers.put("boolean-true", HeaderValue.fromBoolean(true));
- headers.put("boolean-false", HeaderValue.fromBoolean(false));
- headers.put("byte-value", HeaderValue.fromByte((byte) 4));
- headers.put("short-value", HeaderValue.fromShort((short) 16384));
- headers.put("integer-value", HeaderValue.fromInteger(-1048576));
- headers.put("long-value", HeaderValue.fromLong(850270403920392L));
- headers.put("string-value", HeaderValue.fromString("asdf"));
- headers.put("byte-array-value", HeaderValue.fromByteArray(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 }));
- headers.put("instant-value", HeaderValue.fromTimestamp(Instant.ofEpochMilli(1511312536153L)));
- headers.put("uuid-value", HeaderValue.fromUuid(new UUID(89012350712350912L, 9182739072970134201L)));
-
- Message message = new Message(
- headers,
- "{\"foo\": \"bar\"}".getBytes(UTF_8)
- );
-
- String expected =
- ":content-type: \"application/json\"\n" +
- "boolean-true: true\n" +
- "boolean-false: false\n" +
- "byte-value: 4\n" +
- "short-value: 16384\n" +
- "integer-value: -1048576\n" +
- "long-value: 850270403920392\n" +
- "string-value: \"asdf\"\n" +
- "byte-array-value: AQIDBAUGBwgJCgsM\n" +
- "instant-value: 2017-11-22T01:02:16.153Z\n" +
- "uuid-value: 013c3c42-e8d5-08c0-7f6f-a488dd7c12b9\n" +
- "\n" +
- "{\"foo\": \"bar\"}\n";
- System.out.println(message);
- assertEquals(expected, message.toString());
- }
-
- @Test
- public void controlMessages() throws Exception {
- Map headers = new LinkedHashMap<>();
- headers.put(":content-type", HeaderValue.fromString("application/json"));
-
- Message message = new Message(headers, "{\"foo\": \"bar\"}".getBytes(UTF_8));
-
- String expected =
- ":content-type: \"application/json\"\n" +
- "\n" +
- "{\"foo\": \"bar\"}\n";
- assertEquals(expected, message.toString());
- }
-
- @Test
- public void binaryPayload() throws Exception {
- Map headers = new LinkedHashMap<>();
- headers.put(":content-type", HeaderValue.fromString("application/octet-stream"));
-
- Message message = new Message(headers, new byte[]{ 23, 12, (byte) 129, 44, 89, 90 });
-
- String expected =
- ":content-type: \"application/octet-stream\"\n" +
- "\n" +
- "FwyBLFla\n";
- assertEquals(expected, message.toString());
- }
-}
diff --git a/pom.xml b/pom.xml
index 5d4808405998..fcbaa20a6d30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,9 +81,9 @@
${project.version}
- 1.7
2.9.8
2.9.0
+ 1.0.1
1.2.0
3.4
2.18.0
diff --git a/services/kinesis/pom.xml b/services/kinesis/pom.xml
index 40363956e3d4..a4f869acf905 100644
--- a/services/kinesis/pom.xml
+++ b/services/kinesis/pom.xml
@@ -62,9 +62,9 @@
${awsjavasdk.version}
- software.amazon
- flow
- ${flow.version}
+ software.amazon.eventstream
+ eventstream
+ ${eventstream.version}
test