diff --git a/bom-internal/pom.xml b/bom-internal/pom.xml index 09b11f3ebdf7..0f655db54a7a 100644 --- a/bom-internal/pom.xml +++ b/bom-internal/pom.xml @@ -15,9 +15,9 @@ - software.amazon - flow - ${flow.version} + software.amazon.eventstream + eventstream + ${eventstream.version} commons-io @@ -318,4 +318,4 @@ - \ No newline at end of file + diff --git a/core/auth/pom.xml b/core/auth/pom.xml index f079cf0e0672..9a0170b7f7e5 100644 --- a/core/auth/pom.xml +++ b/core/auth/pom.xml @@ -52,8 +52,8 @@ jackson-databind - software.amazon - flow + software.amazon.eventstream + eventstream diff --git a/core/aws-core/pom.xml b/core/aws-core/pom.xml index 6c524981f3f1..8b370014d6ba 100644 --- a/core/aws-core/pom.xml +++ b/core/aws-core/pom.xml @@ -53,8 +53,8 @@ slf4j-api - software.amazon - flow + software.amazon.eventstream + eventstream @@ -146,5 +146,5 @@ - + diff --git a/flow/build.properties b/flow/build.properties deleted file mode 100644 index 173b9d3c15f8..000000000000 --- a/flow/build.properties +++ /dev/null @@ -1,25 +0,0 @@ -# -# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). -# You may not use this file except in compliance with the License. -# A copy of the License is located at -# -# http://aws.amazon.com/apache2.0 -# -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing -# permissions and limitations under the License. -# - -source.. = src/main/java,\ - src/main/resources -output.. = bin/ - -bin.includes = LICENSE.txt,\ - NOTICE.txt,\ - META-INF/,\ - . - -jre.compilation.profile = JavaSE-1.6 diff --git a/flow/pom.xml b/flow/pom.xml deleted file mode 100644 index bebce13b303c..000000000000 --- a/flow/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ - - - - aws-sdk-java-pom - software.amazon.awssdk - 2.1.0-SNAPSHOT - - 4.0.0 - - software.amazon - flow - ${flow.version} - - AWS Event Stream - The AWS Event Stream decoder library. - https://aws.amazon.com/sdkforjava - - - - UTF-8 - 1.8 - - - - - - maven-surefire-plugin - ${maven.surefire.version} - - - org.junit.platform - junit-platform-surefire-provider - 1.2.0 - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.7.0 - - - 1.8 - 1.8 - - - - - org.apache.maven.plugins - maven-jar-plugin - 3.1.0 - - aws-flow-java - - - software.amazon.eventstream - - - - - - - - - - - - - - - - - - - org.junit.vintage - junit-vintage-engine - 4.12.1 - test - - - org.hamcrest - hamcrest-all - 1.3 - test - - - diff --git a/flow/src/main/java/software/amazon/eventstream/CRC32C.java b/flow/src/main/java/software/amazon/eventstream/CRC32C.java deleted file mode 100644 index 63587fa85fc0..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/CRC32C.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.util.zip.Checksum; - -final class CRC32C implements Checksum { - private static final int[] LOOKUP_TABLE = { - 0x00000000, 0xF26B8303, 0xE13B70F7, 0x1350F3F4, 0xC79A971F, 0x35F1141C, 0x26A1E7E8, 0xD4CA64EB, - 0x8AD958CF, 0x78B2DBCC, 0x6BE22838, 0x9989AB3B, 0x4D43CFD0, 0xBF284CD3, 0xAC78BF27, 0x5E133C24, - 0x105EC76F, 0xE235446C, 0xF165B798, 0x030E349B, 0xD7C45070, 0x25AFD373, 0x36FF2087, 0xC494A384, - 0x9A879FA0, 0x68EC1CA3, 0x7BBCEF57, 0x89D76C54, 0x5D1D08BF, 0xAF768BBC, 0xBC267848, 0x4E4DFB4B, - 0x20BD8EDE, 0xD2D60DDD, 0xC186FE29, 0x33ED7D2A, 0xE72719C1, 0x154C9AC2, 0x061C6936, 0xF477EA35, - 0xAA64D611, 0x580F5512, 0x4B5FA6E6, 0xB93425E5, 0x6DFE410E, 0x9F95C20D, 0x8CC531F9, 0x7EAEB2FA, - 0x30E349B1, 0xC288CAB2, 0xD1D83946, 0x23B3BA45, 0xF779DEAE, 0x05125DAD, 0x1642AE59, 0xE4292D5A, - 0xBA3A117E, 0x4851927D, 0x5B016189, 0xA96AE28A, 0x7DA08661, 0x8FCB0562, 0x9C9BF696, 0x6EF07595, - 0x417B1DBC, 0xB3109EBF, 0xA0406D4B, 0x522BEE48, 0x86E18AA3, 0x748A09A0, 0x67DAFA54, 0x95B17957, - 0xCBA24573, 0x39C9C670, 0x2A993584, 0xD8F2B687, 0x0C38D26C, 0xFE53516F, 0xED03A29B, 0x1F682198, - 0x5125DAD3, 0xA34E59D0, 0xB01EAA24, 0x42752927, 0x96BF4DCC, 0x64D4CECF, 0x77843D3B, 0x85EFBE38, - 0xDBFC821C, 0x2997011F, 0x3AC7F2EB, 0xC8AC71E8, 0x1C661503, 0xEE0D9600, 0xFD5D65F4, 0x0F36E6F7, - 0x61C69362, 0x93AD1061, 0x80FDE395, 0x72966096, 0xA65C047D, 0x5437877E, 0x4767748A, 0xB50CF789, - 0xEB1FCBAD, 0x197448AE, 0x0A24BB5A, 0xF84F3859, 0x2C855CB2, 0xDEEEDFB1, 0xCDBE2C45, 0x3FD5AF46, - 0x7198540D, 0x83F3D70E, 0x90A324FA, 0x62C8A7F9, 0xB602C312, 0x44694011, 0x5739B3E5, 0xA55230E6, - 0xFB410CC2, 0x092A8FC1, 0x1A7A7C35, 0xE811FF36, 0x3CDB9BDD, 0xCEB018DE, 0xDDE0EB2A, 0x2F8B6829, - 0x82F63B78, 0x709DB87B, 0x63CD4B8F, 0x91A6C88C, 0x456CAC67, 0xB7072F64, 0xA457DC90, 0x563C5F93, - 0x082F63B7, 0xFA44E0B4, 0xE9141340, 0x1B7F9043, 0xCFB5F4A8, 0x3DDE77AB, 0x2E8E845F, 0xDCE5075C, - 0x92A8FC17, 0x60C37F14, 0x73938CE0, 0x81F80FE3, 0x55326B08, 0xA759E80B, 0xB4091BFF, 0x466298FC, - 0x1871A4D8, 0xEA1A27DB, 0xF94AD42F, 0x0B21572C, 0xDFEB33C7, 0x2D80B0C4, 0x3ED04330, 0xCCBBC033, - 0xA24BB5A6, 0x502036A5, 0x4370C551, 0xB11B4652, 0x65D122B9, 0x97BAA1BA, 0x84EA524E, 0x7681D14D, - 0x2892ED69, 0xDAF96E6A, 0xC9A99D9E, 0x3BC21E9D, 0xEF087A76, 0x1D63F975, 0x0E330A81, 0xFC588982, - 0xB21572C9, 0x407EF1CA, 0x532E023E, 0xA145813D, 0x758FE5D6, 0x87E466D5, 0x94B49521, 0x66DF1622, - 0x38CC2A06, 0xCAA7A905, 0xD9F75AF1, 0x2B9CD9F2, 0xFF56BD19, 0x0D3D3E1A, 0x1E6DCDEE, 0xEC064EED, - 0xC38D26C4, 0x31E6A5C7, 0x22B65633, 0xD0DDD530, 0x0417B1DB, 0xF67C32D8, 0xE52CC12C, 0x1747422F, - 0x49547E0B, 0xBB3FFD08, 0xA86F0EFC, 0x5A048DFF, 0x8ECEE914, 0x7CA56A17, 0x6FF599E3, 0x9D9E1AE0, - 0xD3D3E1AB, 0x21B862A8, 0x32E8915C, 0xC083125F, 0x144976B4, 0xE622F5B7, 0xF5720643, 0x07198540, - 0x590AB964, 0xAB613A67, 0xB831C993, 0x4A5A4A90, 0x9E902E7B, 0x6CFBAD78, 0x7FAB5E8C, 0x8DC0DD8F, - 0xE330A81A, 0x115B2B19, 0x020BD8ED, 0xF0605BEE, 0x24AA3F05, 0xD6C1BC06, 0xC5914FF2, 0x37FACCF1, - 0x69E9F0D5, 0x9B8273D6, 0x88D28022, 0x7AB90321, 0xAE7367CA, 0x5C18E4C9, 0x4F48173D, 0xBD23943E, - 0xF36E6F75, 0x0105EC76, 0x12551F82, 0xE03E9C81, 0x34F4F86A, 0xC69F7B69, 0xD5CF889D, 0x27A40B9E, - 0x79B737BA, 0x8BDCB4B9, 0x988C474D, 0x6AE7C44E, 0xBE2DA0A5, 0x4C4623A6, 0x5F16D052, 0xAD7D5351 - }; - - private int crc = 0xFFFF_FFFF; - - @Override - public void update(int b) { - crc = (crc >>> 8) ^ LOOKUP_TABLE[(crc ^ (b & 0xFF)) & 0xFF]; - } - - @Override - public void update(byte[] bytes, int offset, int length) { - for (int i = offset; i < offset + length; i++) { - update(bytes[i]); - } - } - - @Override - public long getValue() { - return (~crc) & 0xFFFF_FFFFL; - } - - @Override - public void reset() { - crc = 0xFFFF_FFFF; - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/Checksums.java b/flow/src/main/java/software/amazon/eventstream/Checksums.java deleted file mode 100644 index 0e79ec03b667..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/Checksums.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.nio.ByteBuffer; -import java.util.zip.Checksum; - -final class Checksums { - private Checksums() {} - - static void update(Checksum checksum, ByteBuffer buffer) { - if (buffer.hasArray()) { - int pos = buffer.position(); - int off = buffer.arrayOffset(); - int limit = buffer.limit(); - int rem = limit - pos; - checksum.update(buffer.array(), pos + off, rem); - buffer.position(limit); - } else { - int length = buffer.remaining(); - byte[] b = new byte[length]; - buffer.get(b, 0, length); - checksum.update(b, 0, length); - } - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/Header.java b/flow/src/main/java/software/amazon/eventstream/Header.java deleted file mode 100644 index c53e286f60b0..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/Header.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import static java.util.Objects.requireNonNull; - -class Header { - private final String name; - private final HeaderValue value; - - Header(String name, HeaderValue value) { - this.name = requireNonNull(name); - this.value = requireNonNull(value); - } - - Header(String name, String value) { - this(name, HeaderValue.fromString(value)); - } - - public String getName() { - return name; - } - - public HeaderValue getValue() { - return value; - } - - static Header decode(ByteBuffer buf) { - String name = Utils.readShortString(buf); - return new Header(name, HeaderValue.decode(buf)); - } - - static void encode(Map.Entry header, DataOutputStream dos) throws IOException { - new Header(header.getKey(), header.getValue()).encode(dos); - } - - void encode(DataOutputStream dos) throws IOException { - Utils.writeShortString(dos, name); - value.encode(dos); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Header header = (Header) o; - - if (!name.equals(header.name)) return false; - return value.equals(header.value); - } - - @Override - public int hashCode() { - int result = name.hashCode(); - result = 31 * result + value.hashCode(); - return result; - } - - @Override - public String toString() { - return "Header{" - + "name='" + name + '\'' - + ", value=" + value - + '}'; - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/HeaderType.java b/flow/src/main/java/software/amazon/eventstream/HeaderType.java deleted file mode 100644 index 65e452b43a98..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/HeaderType.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -enum HeaderType { - TRUE(0), - FALSE(1), - BYTE(2), - SHORT(3), - INTEGER(4), - LONG(5), - BYTE_ARRAY(6), - STRING(7), - TIMESTAMP(8), - UUID(9); - - final byte headerTypeId; - - HeaderType(int headerTypeId) { - this.headerTypeId = (byte) headerTypeId; - } - - static HeaderType fromTypeId(byte headerTypeId) { - switch (headerTypeId) { - case 0: return TRUE; - case 1: return FALSE; - case 2: return BYTE; - case 3: return SHORT; - case 4: return INTEGER; - case 5: return LONG; - case 6: return BYTE_ARRAY; - case 7: return STRING; - case 8: return TIMESTAMP; - case 9: return UUID; - default: - throw new IllegalArgumentException("Got unknown headerTypeId " + headerTypeId); - } - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/HeaderValue.java b/flow/src/main/java/software/amazon/eventstream/HeaderValue.java deleted file mode 100644 index 30b502991b2b..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/HeaderValue.java +++ /dev/null @@ -1,567 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.Arrays; -import java.util.Base64; -import java.util.Date; -import java.util.UUID; - -import static java.util.Objects.requireNonNull; -import static software.amazon.eventstream.HeaderType.TIMESTAMP; -import static software.amazon.eventstream.HeaderType.fromTypeId; -import static software.amazon.eventstream.Utils.writeBytes; -import static software.amazon.eventstream.Utils.writeString; - -/** - * A typed header value. The underlying value can be obtained by calling the - * appropriate getter. - */ -public abstract class HeaderValue { - public static HeaderValue fromBoolean(boolean value) { - return new BooleanValue(value); - } - - public static HeaderValue fromByte(byte value) { - return new ByteValue(value); - } - - public static HeaderValue fromShort(short value) { - return new ShortValue(value); - } - - public static HeaderValue fromInteger(int value) { - return new IntegerValue(value); - } - - public static HeaderValue fromLong(long value) { - return new LongValue(value); - } - - public static HeaderValue fromByteArray(byte[] bytes) { - return new ByteArrayValue(bytes); - } - - public static HeaderValue fromByteBuffer(ByteBuffer buf) { - buf = buf.duplicate(); - byte[] bytes = new byte[buf.remaining()]; - buf.get(bytes); - return fromByteArray(bytes); - } - - public static HeaderValue fromString(String string) { - return new StringValue(string); - } - - public static HeaderValue fromTimestamp(Instant value) { - return new TimestampValue(value); - } - - public static HeaderValue fromDate(Date value) { - return new TimestampValue(value.toInstant()); - } - - public static HeaderValue fromUuid(UUID value) { - return new UuidValue(value); - } - - protected HeaderValue() {} - - public abstract HeaderType getType(); - - public boolean getBoolean() { - throw new IllegalStateException(); - } - - public byte getByte() { - throw new IllegalStateException("Expected byte, but type was " + getType().name()); - } - - public short getShort() { - throw new IllegalStateException("Expected short, but type was " + getType().name()); - } - - public int getInteger() { - throw new IllegalStateException("Expected integer, but type was " + getType().name()); - } - - public long getLong() { - throw new IllegalStateException("Expected long, but type was " + getType().name()); - } - - public byte[] getByteArray() { - throw new IllegalStateException(); - } - - public final ByteBuffer getByteBuffer() { - return ByteBuffer.wrap(getByteArray()); - } - - public String getString() { - throw new IllegalStateException(); - } - - public Instant getTimestamp() { - throw new IllegalStateException("Expected timestamp, but type was " + getType().name()); - } - - public Date getDate() { - return Date.from(getTimestamp()); - } - - public UUID getUuid() { - throw new IllegalStateException("Expected UUID, but type was " + getType().name()); - } - - void encode(DataOutputStream dos) throws IOException { - dos.writeByte(getType().headerTypeId); - encodeValue(dos); - } - - abstract void encodeValue(DataOutputStream dos) throws IOException; - - static HeaderValue decode(ByteBuffer buf) { - HeaderType type = fromTypeId(buf.get()); - switch (type) { - case TRUE: - return new BooleanValue(true); - case FALSE: - return new BooleanValue(false); - case BYTE: - return new ByteValue(buf.get()); - case SHORT: - return new ShortValue(buf.getShort()); - case INTEGER: - return fromInteger(buf.getInt()); - case LONG: - return new LongValue(buf.getLong()); - case BYTE_ARRAY: - return fromByteArray(Utils.readBytes(buf)); - case STRING: - return fromString(Utils.readString(buf)); - case TIMESTAMP: - return TimestampValue.decode(buf); - case UUID: - return UuidValue.decode(buf); - default: - throw new IllegalStateException(); - } - } - - private static final class BooleanValue extends HeaderValue { - private final boolean value; - - private BooleanValue(boolean value) { - this.value = value; - } - - @Override - public HeaderType getType() { - if (value) { - return HeaderType.TRUE; - } else { - return HeaderType.FALSE; - } - } - - @Override - public boolean getBoolean() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) {} - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - BooleanValue that = (BooleanValue) o; - - return value == that.value; - } - - @Override - public int hashCode() { - if (value) { - return 1; - } else { - return 0; - } - } - - @Override - public String toString() { - return String.valueOf(value); - } - } - - private static final class ByteValue extends HeaderValue { - private final byte value; - - private ByteValue(byte value) { - this.value = value; - } - - @Override - public HeaderType getType() { - return HeaderType.BYTE; - } - - @Override - public byte getByte() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) {} - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ByteValue that = (ByteValue) o; - - return value == that.value; - } - - @Override - public int hashCode() { - return (int) value; - } - - @Override - public String toString() { - return String.valueOf(value); - } - } - - private static final class ShortValue extends HeaderValue { - private final short value; - - private ShortValue(short value) { - this.value = value; - } - - @Override - public HeaderType getType() { - return HeaderType.SHORT; - } - - @Override - public short getShort() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) {} - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ShortValue that = (ShortValue) o; - - return value == that.value; - } - - @Override - public int hashCode() { - return (int) value; - } - - @Override - public String toString() { - return String.valueOf(value); - } - } - - private static final class IntegerValue extends HeaderValue { - private final int value; - - private IntegerValue(int value) { - this.value = value; - } - - @Override - public HeaderType getType() { - return HeaderType.INTEGER; - } - - @Override - public int getInteger() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) throws IOException { - dos.writeInt(value); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - IntegerValue that = (IntegerValue) o; - - return value == that.value; - } - - @Override - public int hashCode() { - return value; - } - - @Override - public String toString() { - return String.valueOf(value); - } - } - - private static final class LongValue extends HeaderValue { - private final long value; - - private LongValue(long value) { - this.value = value; - } - - @Override - public HeaderType getType() { - return HeaderType.LONG; - } - - @Override - public long getLong() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) throws IOException { - dos.writeLong(value); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - LongValue longValue = (LongValue) o; - - return value == longValue.value; - } - - @Override - public int hashCode() { - return (int) (value ^ (value >>> 32)); - } - - @Override - public String toString() { - return String.valueOf(value); - } - } - - private static final class ByteArrayValue extends HeaderValue { - private final byte[] value; - - private ByteArrayValue(byte[] value) { - this.value = requireNonNull(value); - } - - @Override - public HeaderType getType() { - return HeaderType.BYTE_ARRAY; - } - - @Override - public byte[] getByteArray() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) throws IOException { - writeBytes(dos, value); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ByteArrayValue that = (ByteArrayValue) o; - - return Arrays.equals(value, that.value); - } - - @Override - public int hashCode() { - return Arrays.hashCode(value); - } - - @Override - public String toString() { - return Base64.getEncoder().encodeToString(value); - } - } - - private static final class StringValue extends HeaderValue { - private final String value; - - private StringValue(String value) { - this.value = requireNonNull(value); - } - - @Override - public HeaderType getType() { - return HeaderType.STRING; - } - - @Override - public String getString() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) throws IOException { - writeString(dos, value); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - StringValue that = (StringValue) o; - - return value.equals(that.value); - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public String toString() { - return '"' + value + '"'; - } - } - - private static final class TimestampValue extends HeaderValue { - private final Instant value; - - private TimestampValue(Instant value) { - this.value = requireNonNull(value); - } - - static TimestampValue decode(ByteBuffer buf) { - long epochMillis = buf.getLong(); - return new TimestampValue(Instant.ofEpochMilli(epochMillis)); - } - - @Override - public HeaderType getType() { - return TIMESTAMP; - } - - @Override - public Instant getTimestamp() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) throws IOException { - dos.writeLong(value.toEpochMilli()); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimestampValue that = (TimestampValue) o; - - return value.equals(that.value); - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public String toString() { - return value.toString(); - } - } - - private static final class UuidValue extends HeaderValue { - private final UUID value; - - private UuidValue(UUID value) { - this.value = requireNonNull(value); - } - - static UuidValue decode(ByteBuffer buf) { - long msb = buf.getLong(); - long lsb = buf.getLong(); - return new UuidValue(new UUID(msb, lsb)); - } - - @Override - public HeaderType getType() { - return HeaderType.UUID; - } - - @Override - public UUID getUuid() { - return value; - } - - @Override - void encodeValue(DataOutputStream dos) throws IOException { - dos.writeLong(value.getMostSignificantBits()); - dos.writeLong(value.getLeastSignificantBits()); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - UuidValue uuidValue = (UuidValue) o; - - return value.equals(uuidValue.value); - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public String toString() { - return value.toString(); - } - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/Message.java b/flow/src/main/java/software/amazon/eventstream/Message.java deleted file mode 100644 index 360115175f86..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/Message.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Base64; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.zip.CRC32; -import java.util.zip.CheckedOutputStream; -import java.util.zip.Checksum; - -import static java.lang.String.format; - -/** - * An eventstream message. - */ -public class Message { - private static final int TRAILING_CRC_LENGTH = 4; - static final int MESSAGE_OVERHEAD = Prelude.LENGTH_WITH_CRC + TRAILING_CRC_LENGTH; - - private final Map headers; - private final byte[] payload; - - public Message(Map headers, byte[] payload) { - this.headers = headers; - this.payload = payload.clone(); - } - - public Map getHeaders() { - return headers; - } - - public byte[] getPayload() { - return payload.clone(); - } - - public static Message decode(ByteBuffer buf) { - return decode(Prelude.decode(buf), buf); - } - - /** - * Decodes a message with an already decoded prelude. Useful for not decoding the prelude twice. - * - * @param prelude Decoded prelude of message. - * @param buf Data of message (including prelude which will be skipped over). - * @return Decoded message - */ - static Message decode(Prelude prelude, ByteBuffer buf) { - int totalLength = prelude.getTotalLength(); - validateMessageCrc(buf, totalLength); - buf.position(buf.position() + Prelude.LENGTH_WITH_CRC); - - long headersLength = prelude.getHeadersLength(); - byte[] headerBytes = new byte[Math.toIntExact(headersLength)]; - buf.get(headerBytes); - Map headers = decodeHeaders(ByteBuffer.wrap(headerBytes)); - - byte[] payload = new byte[Math.toIntExact(totalLength - MESSAGE_OVERHEAD - headersLength)]; - buf.get(payload); - buf.getInt(); // skip past the message CRC - - return new Message(headers, payload); - } - - private static void validateMessageCrc(ByteBuffer buf, int totalLength) { - Checksum crc = new CRC32(); - - Checksums.update(crc, (ByteBuffer) buf.duplicate().limit(buf.position() + totalLength - 4)); - long computedMessageCrc = crc.getValue(); - - long wireMessageCrc = Integer.toUnsignedLong(buf.getInt(buf.position() + totalLength - 4)); - - if (wireMessageCrc != computedMessageCrc) { - throw new IllegalArgumentException(format("Message checksum failure: expected 0x%x, computed 0x%x", - wireMessageCrc, computedMessageCrc)); - } - } - - static Map decodeHeaders(ByteBuffer buf) { - Map headers = new HashMap<>(); - - while (buf.hasRemaining()) { - Header header = Header.decode(buf); - headers.put(header.getName(), header.getValue()); - } - - return Collections.unmodifiableMap(headers); - } - - public ByteBuffer toByteBuffer() { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - encode(baos); - baos.close(); - return ByteBuffer.wrap(baos.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public void encode(OutputStream os) { - try { - CheckedOutputStream checkedOutputStream = new CheckedOutputStream(os, new CRC32()); - encodeOrThrow(checkedOutputStream); - long messageCrc = checkedOutputStream.getChecksum().getValue(); - os.write((int) (0xFF & messageCrc >> 24)); - os.write((int) (0xFF & messageCrc >> 16)); - os.write((int) (0xFF & messageCrc >> 8)); - os.write((int) (0xFF & messageCrc)); - - os.flush(); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - /** - * Encode the given {@code headers}, without any leading or trailing metadata such as checksums or lengths. - * - * @param headers a sequence of zero or more headers, which will be encoded in iteration order - * @return a byte array corresponding to the {@code headers} section of a {@code Message} - */ - public static byte[] encodeHeaders(Iterable> headers) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - for (Entry entry : headers) { - Header.encode(entry, dos); - } - dos.close(); - return baos.toByteArray(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void encodeOrThrow(OutputStream os) throws IOException { - ByteArrayOutputStream headersAndPayload = new ByteArrayOutputStream(); - headersAndPayload.write(encodeHeaders(headers.entrySet())); - headersAndPayload.write(payload); - - int totalLength = Prelude.LENGTH_WITH_CRC + headersAndPayload.size() + 4; - - { - byte[] preludeBytes = getPrelude(totalLength); - Checksum crc = new CRC32(); - crc.update(preludeBytes, 0, preludeBytes.length); - - DataOutputStream dos = new DataOutputStream(os); - dos.write(preludeBytes); - long value = crc.getValue(); - int value1 = (int) value; - dos.writeInt(value1); - dos.flush(); - } - - headersAndPayload.writeTo(os); - } - - private byte[] getPrelude(int totalLength) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(8); - DataOutputStream dos = new DataOutputStream(baos); - - int headerLength = totalLength - Message.MESSAGE_OVERHEAD - payload.length; - dos.writeInt(totalLength); - dos.writeInt(headerLength); - - dos.close(); - return baos.toByteArray(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Message message = (Message) o; - - if (!headers.equals(message.headers)) return false; - return Arrays.equals(payload, message.payload); - } - - @Override - public int hashCode() { - int result = headers.hashCode(); - result = 31 * result + Arrays.hashCode(payload); - return result; - } - - @Override - public String toString() { - StringBuilder ret = new StringBuilder(); - - for (Entry entry : headers.entrySet()) { - ret.append(entry.getKey()); - ret.append(": "); - ret.append(entry.getValue().toString()); - ret.append('\n'); - } - ret.append('\n'); - - String contentType = headers.getOrDefault(":content-type", HeaderValue.fromString("application/octet-stream")) - .getString(); - if (contentType.contains("json") || contentType.contains("text")) { - ret.append(new String(payload, StandardCharsets.UTF_8)); - } else { - ret.append(Base64.getEncoder().encodeToString(payload)); - } - ret.append('\n'); - return ret.toString(); - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/MessageBuilder.java b/flow/src/main/java/software/amazon/eventstream/MessageBuilder.java deleted file mode 100644 index 73387fb68126..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/MessageBuilder.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.util.Map; - -/** - * An abstraction for message construction. This interface can be used to - * customize message creation in different ways, such as implementing - * decorators that add headers or manipulate the message payload. - */ -@FunctionalInterface -public interface MessageBuilder { - Message build(Map headers, byte[] payload); - - static MessageBuilder defaultBuilder() { - return Message::new; - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/MessageDecoder.java b/flow/src/main/java/software/amazon/eventstream/MessageDecoder.java deleted file mode 100644 index e24bdcc9c516..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/MessageDecoder.java +++ /dev/null @@ -1,148 +0,0 @@ -package software.amazon.eventstream; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.function.Consumer; - -/** - * A simple decoder that accumulates chunks of bytes and emits eventstream - * messages. Instances of this class are not thread-safe. - */ -public final class MessageDecoder { - - /** - * Initial buffer size is 2MB. Will grow as needed to accommodate larger messages. - */ - private static final int INITIAL_BUFFER_SIZE = 2048 * 1024; - - private final Consumer messageConsumer; - private List bufferedOutput; - private ByteBuffer buf; - private Prelude currentPrelude; - - /** - * Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded - * messages can be obtained by calling {@link #getDecodedMessages()}. - */ - public MessageDecoder() { - this.messageConsumer = message -> this.bufferedOutput.add(message); - this.bufferedOutput = new ArrayList<>(); - this.buf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); - } - - /** - * Creates a {@code MessageDecoder} instance that will publish messages incrementally to the supplied {@code - * messageConsumer} as they are decoded. The resulting instance does not support the {@link #getDecodedMessages()} - * operation, and will throw an exception if it is invoked. - * - * @param messageConsumer a function that consumes {@link Message} instances - */ - public MessageDecoder(Consumer messageConsumer) { - this(messageConsumer, INITIAL_BUFFER_SIZE); - } - - /** - * To be used by tests only. - */ - MessageDecoder(Consumer messageConsumer, int initialBufferSize) { - this.messageConsumer = messageConsumer; - this.buf = ByteBuffer.allocate(initialBufferSize); - this.bufferedOutput = null; - } - - /** - * Returns {@link Message} instances that have been decoded since this method was last invoked. Note that this - * method is only supported if this decoder was not configured to use a custom message consumer. - * - * @return all messages decoded since the last invocation of this method - */ - public List getDecodedMessages() { - if (bufferedOutput == null) { - throw new IllegalStateException(""); - } - List ret = bufferedOutput; - bufferedOutput = new ArrayList<>(); - return Collections.unmodifiableList(ret); - } - - public void feed(byte[] bytes) { - feed(ByteBuffer.wrap(bytes)); - } - - public void feed(byte[] bytes, int offset, int length) { - feed(ByteBuffer.wrap(bytes, offset, length)); - } - - /** - * Feed the contents of the given {@link ByteBuffer} into this decoder. Messages will be incrementally decoded and - * buffered or published to the message consumer (depending on configuration). - * - * @param byteBuffer a {@link ByteBuffer} whose entire contents will be read into the decoder's internal buffer - * @return this {@code MessageDecoder} instance - */ - public MessageDecoder feed(ByteBuffer byteBuffer) { - int bytesToRead = byteBuffer.remaining(); - int bytesConsumed = 0; - while (bytesConsumed < bytesToRead) { - ByteBuffer readView = updateReadView(); - if (currentPrelude == null) { - // Put only 15 bytes into buffer and compute prelude. - int numBytesToWrite = Math.min(15 - readView.remaining(), - bytesToRead - bytesConsumed); - - feedBuf(byteBuffer, numBytesToWrite); - - bytesConsumed += numBytesToWrite; - readView = updateReadView(); - - // Have enough data to decode the prelude - if (readView.remaining() >= 15) { - currentPrelude = Prelude.decode(readView.duplicate()); - if (buf.capacity() < currentPrelude.getTotalLength()) { - // Don't have enough capacity to hold this message, grow the buffer - buf = ByteBuffer.allocate(currentPrelude.getTotalLength()); - buf.put(readView); - readView = updateReadView(); - } - } - } - // We might not have received enough data to decode the prelude so check for null again - if (currentPrelude != null) { - // Only write up to what we need to decode the next message - int numBytesToWrite = Math.min(currentPrelude.getTotalLength() - readView.remaining(), - bytesToRead - bytesConsumed); - - feedBuf(byteBuffer, numBytesToWrite); - bytesConsumed += numBytesToWrite; - readView = updateReadView(); - - // If we have enough data to decode the message do so and reset the buffer for the next message - if (readView.remaining() >= currentPrelude.getTotalLength()) { - messageConsumer.accept(Message.decode(currentPrelude, readView)); - buf.clear(); - currentPrelude = null; - } - } - } - - return this; - } - - private void feedBuf(ByteBuffer byteBuffer, int numBytesToWrite) { - buf.put((ByteBuffer) byteBuffer.duplicate().limit(byteBuffer.position() + numBytesToWrite)); - byteBuffer.position(byteBuffer.position() + numBytesToWrite); - } - - private ByteBuffer updateReadView() { - return (ByteBuffer) buf.duplicate().flip(); - } - - /** - * To be used by tests only. - */ - int currentBufferSize() { - return buf.capacity(); - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/Prelude.java b/flow/src/main/java/software/amazon/eventstream/Prelude.java deleted file mode 100644 index 96f3f21a6bc3..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/Prelude.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.nio.ByteBuffer; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import static java.lang.String.format; - -final class Prelude { - static final int LENGTH = 8; - static final int LENGTH_WITH_CRC = LENGTH + 4; - - private final int totalLength; - private final long headersLength; - - private Prelude(int totalLength, long headersLength) { - this.totalLength = totalLength; - this.headersLength = headersLength; - } - - static Prelude decode(ByteBuffer buf) { - buf = buf.duplicate(); - - long computedPreludeCrc = computePreludeCrc(buf); - - long totalLength = Integer.toUnsignedLong(buf.getInt()); - long headersLength = Integer.toUnsignedLong(buf.getInt()); - long wirePreludeCrc = Integer.toUnsignedLong(buf.getInt()); - if (computedPreludeCrc != wirePreludeCrc) { - throw new IllegalArgumentException(format("Prelude checksum failure: expected 0x%x, computed 0x%x", - wirePreludeCrc, computedPreludeCrc)); - } - - if (headersLength < 0 || headersLength > 131_072) { - throw new IllegalArgumentException("Illegal headers_length value: " + headersLength); - } - - long payloadLength = (totalLength - headersLength) - Message.MESSAGE_OVERHEAD; - // This implementation temporarily accepts larger payloads than the spec permits. - if (payloadLength < 0 || payloadLength > 25_165_824) { - throw new IllegalArgumentException("Illegal payload size: " + payloadLength); - } - - return new Prelude(Math.toIntExact(totalLength), headersLength); - } - - private static long computePreludeCrc(ByteBuffer buf) { - byte[] prelude = new byte[Prelude.LENGTH]; - buf.duplicate().get(prelude); - - Checksum crc = new CRC32(); - crc.update(prelude, 0, prelude.length); - return crc.getValue(); - } - - int getTotalLength() { - return totalLength; - } - - long getHeadersLength() { - return headersLength; - } -} diff --git a/flow/src/main/java/software/amazon/eventstream/Utils.java b/flow/src/main/java/software/amazon/eventstream/Utils.java deleted file mode 100644 index 0aedf48cdba1..000000000000 --- a/flow/src/main/java/software/amazon/eventstream/Utils.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import static java.nio.charset.StandardCharsets.UTF_8; - -final class Utils { - private Utils() {} - - static String readShortString(ByteBuffer buf) { - int length = buf.get() & 0xFF; - checkStringBounds(length, 255); - byte[] bytes = new byte[length]; - buf.get(bytes); - return new String(bytes, UTF_8); - } - - static String readString(ByteBuffer buf) { - int length = buf.getShort() & 0xFFFF; - checkStringBounds(length, 32767); - byte[] bytes = new byte[length]; - buf.get(bytes); - return new String(bytes, UTF_8); - } - - static byte[] readBytes(ByteBuffer buf) { - int length = buf.getShort() & 0xFFFF; - checkByteArrayBounds(length); - byte[] bytes = new byte[length]; - buf.get(bytes); - return bytes; - } - - static void writeShortString(DataOutputStream dos, String string) throws IOException { - byte[] bytes = string.getBytes(UTF_8); - checkStringBounds(bytes.length, 255); - dos.writeByte(bytes.length); - dos.write(bytes); - } - - static void writeString(DataOutputStream dos, String string) throws IOException { - byte[] bytes = string.getBytes(UTF_8); - checkStringBounds(bytes.length, 32767); - writeBytes(dos, bytes); - } - - static void writeBytes(DataOutputStream dos, byte[] bytes) throws IOException { - checkByteArrayBounds(bytes.length); - dos.writeShort((short) bytes.length); - dos.write(bytes); - } - - private static void checkByteArrayBounds(int length) { - if (length == 0) { - throw new IllegalArgumentException("Byte arrays may not be empty"); - } - if (length > 32767) { - throw new IllegalArgumentException("Illegal byte array length: " + length); - } - } - - private static void checkStringBounds(int length, int maxLength) { - if (length == 0) { - throw new IllegalArgumentException("Strings may not be empty"); - } - if (length > maxLength) { - throw new IllegalArgumentException("Illegal string length: " + length); - } - } -} diff --git a/flow/src/test/java/software/amazon/eventstream/HeaderTest.java b/flow/src/test/java/software/amazon/eventstream/HeaderTest.java deleted file mode 100644 index 135a5034be5b..000000000000 --- a/flow/src/test/java/software/amazon/eventstream/HeaderTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; -import static software.amazon.eventstream.HeaderValue.fromInteger; - -public class HeaderTest { - @Test - public void genericHeaders() throws Exception { - roundTrip(new Header("test-string-header", "test-string-value")); - roundTrip(new Header("test-byte-array-header", HeaderValue.fromByteArray(bb(1, 2, 3, 4, 5, 6, 7, 8)))); - roundTrip(new Header("test-uint32-header", fromInteger(8918230))); - } - - @Test - public void typeId() { - for (byte i = 0; i <= 9; i++) { - assertEquals(i, HeaderType.fromTypeId(i).headerTypeId); - } - } - - static void roundTrip(Header header) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (DataOutputStream dos = new DataOutputStream(baos)) { - header.encode(dos); - } - byte[] bytes = baos.toByteArray(); - Header actual = Header.decode(ByteBuffer.wrap(bytes)); - - assertEquals(header, actual); - } - - static byte[] bb(int... bytes) { - byte[] bs = new byte[bytes.length]; - for (int i = 0; i < bytes.length; i++) { - bs[i] = (byte) bytes[i]; - } - return bs; - } -} diff --git a/flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java b/flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java deleted file mode 100644 index 78c181ea643a..000000000000 --- a/flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java +++ /dev/null @@ -1,211 +0,0 @@ -package software.amazon.eventstream; - -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; - -public class MessageDecoderTest { - long SEED = 8912374098123423L; - - @Test - public void testDecoder() throws Exception { - TestUtils utils = new TestUtils(SEED); - Random rand = new Random(SEED); - List expected = IntStream.range(0, 10_000) - .mapToObj(x -> utils.randomMessage()) - .collect(Collectors.toList()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add); - while (buf.remaining() > 0) { - int bufSize = Math.min(1 + rand.nextInt(1024), buf.remaining()); - byte[] bs = new byte[bufSize]; - buf.get(bs); - decoder.feed(bs); - } - - Assert.assertEquals(expected, actual); - } - - @Test - public void testDecoder_WithOffset() throws Exception { - TestUtils utils = new TestUtils(SEED); - Random rand = new Random(SEED); - List expected = IntStream.range(0, 10_000) - .mapToObj(x -> utils.randomMessage()) - .collect(Collectors.toList()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - byte[] data = baos.toByteArray(); - int toRead = data.length; - int read = 0; - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add); - while (toRead > 0) { - int length = rand.nextInt(100); - if (read + length > data.length) { - length = data.length - read; - } - decoder.feed(data, read, length); - read += length; - toRead -= length; - } - - Assert.assertEquals(expected, actual); - } - - @Test - public void preludeFedFirst_DecodesCorrectly() { - TestUtils utils = new TestUtils(SEED); - Message message = utils.randomMessage(); - int messageSize = message.toByteBuffer().remaining(); - List expected = Collections.singletonList(message); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add, 8192); - - // Feed just the prelude in it's entirety - byte[] bs = new byte[15]; - buf.get(bs); - decoder.feed(bs); - - // No messages should be decoded yet - assertThat(actual, Matchers.hasSize(0)); - - // Feed rest of message in it's entirety. - bs = new byte[messageSize - 15]; - buf.get(bs); - decoder.feed(bs); - - // Should have successfully decoded the one message - assertThat(actual, Matchers.hasSize(1)); - } - - @Test - public void preludeFedInParts_DecodesCorrectly() { - TestUtils utils = new TestUtils(SEED); - Message message = utils.randomMessage(); - int messageSize = message.toByteBuffer().remaining(); - List expected = Collections.singletonList(message); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add, 8192); - - // Feed the prelude in parts - byte[] bs = new byte[7]; - buf.get(bs); - decoder.feed(bs); - - // Feed rest of prelude - bs = new byte[8]; - buf.get(bs); - decoder.feed(bs); - - // No messages should be decoded yet - assertThat(actual, Matchers.hasSize(0)); - - // Feed rest of message in it's entirety. - bs = new byte[messageSize - 15]; - buf.get(bs); - decoder.feed(bs); - - // Should have successfully decoded the one message - assertThat(actual, Matchers.hasSize(1)); - } - - @Test - public void bufferNeedsToGrow() { - TestUtils utils = new TestUtils(SEED); - Message message = utils.randomMessage(8192 * 2); - int messageSize = message.toByteBuffer().remaining(); - List expected = Collections.singletonList(message); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add, 8192); - - // Feed all at once - byte[] bs = new byte[messageSize]; - buf.get(bs); - decoder.feed(bs); - - // Should have successfully decoded the one message - assertThat(actual, Matchers.hasSize(1)); - } - - @Test - public void multipleMessagesDoesNotGrowBuffer() { - TestUtils utils = new TestUtils(SEED); - Message message = utils.randomMessage(4096); - List expected = IntStream.range(0, 100) - .mapToObj(x -> message) - .collect(Collectors.toList()); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add, 8192); - - // Feed all at once - byte[] bs = new byte[buf.capacity()]; - buf.get(bs); - decoder.feed(bs); - - assertEquals(expected, actual); - assertEquals(8192, decoder.currentBufferSize()); - } - - @Test - public void multipleLargeMessages_GrowsBufferAsNeeded() { - TestUtils utils = new TestUtils(SEED); - Message message = utils.randomMessage(9001); - List expected = IntStream.range(0, 100) - .mapToObj(x -> message) - .collect(Collectors.toList()); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.forEach(x -> x.encode(baos)); - ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - - List actual = new ArrayList<>(); - MessageDecoder decoder = new MessageDecoder(actual::add, 8192); - - // Feed all at once - byte[] bs = new byte[buf.capacity()]; - buf.get(bs); - decoder.feed(bs); - - assertEquals(expected, actual); - assertThat(decoder.currentBufferSize(), greaterThan(9001)); - } - -} diff --git a/flow/src/test/java/software/amazon/eventstream/MessageTest.java b/flow/src/test/java/software/amazon/eventstream/MessageTest.java deleted file mode 100644 index cb4de8201592..000000000000 --- a/flow/src/test/java/software/amazon/eventstream/MessageTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package software.amazon.eventstream; - -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Collections.emptyMap; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class MessageTest { - @Test - public void emptyVector() { - Message message = new Message(emptyMap(), new byte[]{}); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - message.encode(baos); - byte[] bytes = baos.toByteArray(); - - byte[] expected = new byte[]{ - 0, 0, 0, 16, // total_length - 0, 0, 0, 0, // headers_length - 5, -62, 72, -21, // prelude_crc - 125, -104, -56, -1 // message_crc - }; - - assertArrayEquals(expected, bytes); - - assertEquals(message, Message.decode(ByteBuffer.wrap(expected))); - } - - @Test - public void appdataVector() { - byte[] bytes = new byte[]{ - 0, 0, 0, 0x1d, // total_length - 0, 0, 0, 0, // headers_length - (byte) 0xfd, 0x52, (byte) 0x8c, 0x5a, // prelude_crc - 0x7b, 0x27, 0x66, 0x6f, 0x6f, 0x27, 0x3a, // payload - 0x27, 0x62, 0x61, 0x72, 0x27, 0x7d, - // 0xc3 65 39 36 - (byte) 0xc3, 0x65, 0x39, 0x36 // message_crc - }; - - Message message = new Message(emptyMap(), "{'foo':'bar'}".getBytes(UTF_8)); - - assertEquals(message, Message.decode(ByteBuffer.wrap(bytes))); - } - - @Test - public void roundTripTests() { - roundTrip(new Message(emptyMap(), new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })); - - Map headers = new HashMap<>(); - headers.put(":content-type", HeaderValue.fromString("application/json")); - headers.put("content-encoding", HeaderValue.fromString("gzip")); - headers.put("request-id", HeaderValue.fromByteArray(new byte[]{ 1, 2, 3, 4, 5 })); - headers.put("more-stuff", HeaderValue.fromInteger(27)); - roundTrip(new Message(headers, new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9 })); - } - - @Test - public void generativeTest() throws Exception { - long SEED = 8912374098123423L; - TestUtils utils = new TestUtils(SEED); - Random rand = new Random(SEED); - for (int i = 0; i < 10_000; i++) { - byte[] padding = new byte[rand.nextInt(128 * 1024)]; - rand.nextBytes(padding); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - baos.write(padding); - Message message = utils.randomMessage(); - message.encode(baos); - - int arraylen = 256 * 1024; - int arrayoffset = rand.nextInt(4 * 1024); - ByteBuffer buf = ((ByteBuffer) ByteBuffer.allocate(arraylen).position(arrayoffset)).slice(); - buf.put(baos.toByteArray()); - buf.flip(); - - buf.get(padding); - Message decoded = Message.decode(buf); - - assertEquals(message, decoded); - } - } - - static void roundTrip(Message expected) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - expected.encode(baos); - Message actual = Message.decode(ByteBuffer.wrap(baos.toByteArray())); - assertEquals(expected, actual); - } -} diff --git a/flow/src/test/java/software/amazon/eventstream/PreludeTest.java b/flow/src/test/java/software/amazon/eventstream/PreludeTest.java deleted file mode 100644 index a13c437c408c..000000000000 --- a/flow/src/test/java/software/amazon/eventstream/PreludeTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package software.amazon.eventstream; - -import org.junit.Test; - -import java.nio.ByteBuffer; - -public class PreludeTest { - @Test(expected = IllegalArgumentException.class) - public void maxPayloadSize() throws Exception { - ByteBuffer buf = ByteBuffer.wrap(new byte[15]); - buf.putInt(16777217 + 19); // total length - buf.put((byte) 1); // ApplicationData - buf.putShort((short) 0); // unmodeled data - buf.putInt(0); // headers_length - buf.putInt(0xf0c8e628); // prelude_crc - buf.flip(); - - Prelude.decode(buf); - } - - @Test(expected = IllegalArgumentException.class) - public void maxHeaderSize() throws Exception { - ByteBuffer buf = ByteBuffer.wrap(new byte[15]); - buf.putInt(131073 + 19); // total length - buf.put((byte) 1); // ApplicationData - buf.putShort((short) 0); // unmodeled data - buf.putInt(131073); // headers_length - buf.putInt(0x49fd415c); // prelude_crc - buf.flip(); - - Prelude.decode(buf); - } -} diff --git a/flow/src/test/java/software/amazon/eventstream/TestUtils.java b/flow/src/test/java/software/amazon/eventstream/TestUtils.java deleted file mode 100644 index cae03168b1a0..000000000000 --- a/flow/src/test/java/software/amazon/eventstream/TestUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -package software.amazon.eventstream; - -import java.time.Instant; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.UUID; - -public class TestUtils { - private final Random rand; - private final int maxHeaders; - private final int maxHeaderSize; - private final int maxPayloadSize; - - public TestUtils(long seed) { - this.rand = new Random(seed); - this.maxHeaders = 20; - this.maxHeaderSize = 64; - this.maxPayloadSize = 4096; - } - - public Message randomMessage() { - return new Message(randomHeaders(), randomPayload()); - } - - public Message randomMessage(int payloadSize) { - return new Message(randomHeaders(), randomPayload(payloadSize)); - } - - private Map randomHeaders() { - Map headers = new HashMap<>(); - int numHeaders = rand.nextInt(maxHeaders + 1); - for (int i = 0; i < numHeaders; i++) { - headers.put("asdf" + rand.nextInt(), randomHeaderValue()); - } - return headers; - } - - private HeaderValue randomHeaderValue() { - switch (rand.nextInt(7)) { - case 0: - return HeaderValue.fromInteger(rand.nextInt(Integer.MAX_VALUE)); - case 1: - int bytes = rand.nextInt(maxHeaderSize + 1) + 1; - byte[] buf = new byte[bytes]; - rand.nextBytes(buf); - return HeaderValue.fromByteArray(buf); - case 2: - return HeaderValue.fromString("asdf"); - case 3: - return HeaderValue.fromBoolean(true); - case 4: - return HeaderValue.fromBoolean(false); - case 5: - return HeaderValue.fromTimestamp(Instant.ofEpochMilli(rand.nextLong())); - case 6: - return HeaderValue.fromUuid(new UUID(rand.nextLong(), rand.nextLong())); - default: - throw new IllegalStateException(); - } - } - - private byte[] randomPayload() { - return randomPayload(rand.nextInt(maxPayloadSize + 1)); - } - - private byte[] randomPayload(int payloadSize) { - byte[] ret = new byte[payloadSize]; - rand.nextBytes(ret); - return ret; - } -} diff --git a/flow/src/test/java/software/amazon/eventstream/ToStringTest.java b/flow/src/test/java/software/amazon/eventstream/ToStringTest.java deleted file mode 100644 index 9c2ab69453ed..000000000000 --- a/flow/src/test/java/software/amazon/eventstream/ToStringTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package software.amazon.eventstream; - -import org.junit.Test; - -import java.time.Instant; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.Assert.assertEquals; - -public class ToStringTest { - @Test - public void headerTypes() throws Exception { - Map headers = new LinkedHashMap<>(); - headers.put(":content-type", HeaderValue.fromString("application/json")); - headers.put("boolean-true", HeaderValue.fromBoolean(true)); - headers.put("boolean-false", HeaderValue.fromBoolean(false)); - headers.put("byte-value", HeaderValue.fromByte((byte) 4)); - headers.put("short-value", HeaderValue.fromShort((short) 16384)); - headers.put("integer-value", HeaderValue.fromInteger(-1048576)); - headers.put("long-value", HeaderValue.fromLong(850270403920392L)); - headers.put("string-value", HeaderValue.fromString("asdf")); - headers.put("byte-array-value", HeaderValue.fromByteArray(new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 })); - headers.put("instant-value", HeaderValue.fromTimestamp(Instant.ofEpochMilli(1511312536153L))); - headers.put("uuid-value", HeaderValue.fromUuid(new UUID(89012350712350912L, 9182739072970134201L))); - - Message message = new Message( - headers, - "{\"foo\": \"bar\"}".getBytes(UTF_8) - ); - - String expected = - ":content-type: \"application/json\"\n" + - "boolean-true: true\n" + - "boolean-false: false\n" + - "byte-value: 4\n" + - "short-value: 16384\n" + - "integer-value: -1048576\n" + - "long-value: 850270403920392\n" + - "string-value: \"asdf\"\n" + - "byte-array-value: AQIDBAUGBwgJCgsM\n" + - "instant-value: 2017-11-22T01:02:16.153Z\n" + - "uuid-value: 013c3c42-e8d5-08c0-7f6f-a488dd7c12b9\n" + - "\n" + - "{\"foo\": \"bar\"}\n"; - System.out.println(message); - assertEquals(expected, message.toString()); - } - - @Test - public void controlMessages() throws Exception { - Map headers = new LinkedHashMap<>(); - headers.put(":content-type", HeaderValue.fromString("application/json")); - - Message message = new Message(headers, "{\"foo\": \"bar\"}".getBytes(UTF_8)); - - String expected = - ":content-type: \"application/json\"\n" + - "\n" + - "{\"foo\": \"bar\"}\n"; - assertEquals(expected, message.toString()); - } - - @Test - public void binaryPayload() throws Exception { - Map headers = new LinkedHashMap<>(); - headers.put(":content-type", HeaderValue.fromString("application/octet-stream")); - - Message message = new Message(headers, new byte[]{ 23, 12, (byte) 129, 44, 89, 90 }); - - String expected = - ":content-type: \"application/octet-stream\"\n" + - "\n" + - "FwyBLFla\n"; - assertEquals(expected, message.toString()); - } -} diff --git a/pom.xml b/pom.xml index 5d4808405998..fcbaa20a6d30 100644 --- a/pom.xml +++ b/pom.xml @@ -81,9 +81,9 @@ ${project.version} - 1.7 2.9.8 2.9.0 + 1.0.1 1.2.0 3.4 2.18.0 diff --git a/services/kinesis/pom.xml b/services/kinesis/pom.xml index 40363956e3d4..a4f869acf905 100644 --- a/services/kinesis/pom.xml +++ b/services/kinesis/pom.xml @@ -62,9 +62,9 @@ ${awsjavasdk.version} - software.amazon - flow - ${flow.version} + software.amazon.eventstream + eventstream + ${eventstream.version} test