diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
index 07dea1568089..7a1738f51d97 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
@@ -22,38 +22,37 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
-import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
-import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
+import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;
/**
- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
- * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
- * to write that data on the wire).
+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
+ * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
+ * of the data (i.e. to write that data on the wire).
*
*
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
- * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
- * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
+ * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
+ * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
*
*
*
- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
- * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
- * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
+ * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
+ * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
*
*
* @see FileAsyncRequestBody
- * @see ByteBuffersAsyncRequestBody
+ * @see ByteArrayAsyncRequestBody
*/
@SdkPublicApi
public interface AsyncRequestBody extends SdkPublisher {
@@ -71,8 +70,8 @@ default String contentType() {
}
/**
- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
- * publisher publishes the data.
+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
+ * The data is delivered when the publisher publishes the data.
*
* @param publisher Publisher of source data
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -125,11 +124,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
- * @see ByteBuffersAsyncRequestBody
+ * @see ByteArrayAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
- return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
- string.getBytes(cs));
+ return new ByteArrayAsyncRequestBody(string.getBytes(cs),
+ Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
}
/**
@@ -144,181 +143,29 @@ static AsyncRequestBody fromString(String string) {
}
/**
- * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
- * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
+ * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
+ * original byte array are not reflected in the {@link AsyncRequestBody}.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
- byte[] clonedBytes = bytes.clone();
- return ByteBuffersAsyncRequestBody.from(clonedBytes);
+ return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
}
/**
- * Creates an {@link AsyncRequestBody} from a byte array without copying the contents of the byte array. This
- * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
- * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
- * {@code AsyncRequestBody} implementation.
- *
- * As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
- *
- * @param bytes The bytes to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
- return ByteBuffersAsyncRequestBody.from(bytes);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
- * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
- *
- * NOTE: This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
- * it to copy only the remaining readable bytes.
+ * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
+ * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
- ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
- immutableCopy.rewind();
- return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
- * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
- * reflected in the {@link AsyncRequestBody}.
- *
Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
- * only the remaining bytes.
- *
- * @param byteBuffer ByteBuffer to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
- ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
- return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the
- * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
- * {@code AsyncRequestBody} implementation.
- *
- * NOTE: This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
- * need it to copy only the remaining readable bytes.
- *
- *
As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
- * risks.
- *
- * @param byteBuffer ByteBuffer to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
- ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
- readOnlyBuffer.rewind();
- return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the
- * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
- * {@code AsyncRequestBody} implementation.
- *
Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
- * the buffer and reads only the remaining bytes.
- *
- *
As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
- * risks.
- *
- * @param byteBuffer ByteBuffer to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
- ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
- return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
- * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
- *
- * NOTE: This method ignores the current read position of each {@link ByteBuffer}. Use
- * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
- *
- * @param byteBuffers ByteBuffer array to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
- ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
- .map(BinaryUtils::immutableCopyOf)
- .peek(ByteBuffer::rewind)
- .toArray(ByteBuffer[]::new);
- return ByteBuffersAsyncRequestBody.of(immutableCopy);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
- * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
- * {@link AsyncRequestBody}.
- *
Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
- * this method respects the current read position of each buffer and reads only the remaining bytes.
- *
- * @param byteBuffers ByteBuffer array to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
- ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
- .map(BinaryUtils::immutableCopyOfRemaining)
- .peek(ByteBuffer::rewind)
- .toArray(ByteBuffer[]::new);
- return ByteBuffersAsyncRequestBody.of(immutableCopy);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each
- * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
- * {@code AsyncRequestBody} implementation.
- *
- * NOTE: This method ignores the current read position of each {@link ByteBuffer}. Use
- * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
- *
- *
As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
- * risks.
- *
- * @param byteBuffers ByteBuffer array to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
- ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
- .map(ByteBuffer::asReadOnlyBuffer)
- .peek(ByteBuffer::rewind)
- .toArray(ByteBuffer[]::new);
- return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
- }
-
- /**
- * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each
- * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
- * {@code AsyncRequestBody} implementation.
- *
Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
- * this method respects the current read position of each buffer and reads only the remaining bytes.
- *
- *
As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
- * risks.
- *
- * @param byteBuffers ByteBuffer array to send to the service.
- * @return AsyncRequestBody instance.
- */
- static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
- ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
- .map(ByteBuffer::asReadOnlyBuffer)
- .toArray(ByteBuffer[]::new);
- return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
+ return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
}
/**
- * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
+ * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
*
*
An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
* non-blocking event loop threads owned by the SDK.
@@ -392,7 +239,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
}
/**
- * Creates an {@link AsyncRequestBody} with no content.
+ * Creates a {@link AsyncRequestBody} with no content.
*
* @return AsyncRequestBody instance.
*/
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java
new file mode 100644
index 000000000000..29205479b798
--- /dev/null
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.internal.async;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.utils.Logger;
+
+/**
+ * An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static
+ * methods on {@link AsyncRequestBody}
+ *
+ * @see AsyncRequestBody#fromBytes(byte[])
+ * @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
+ * @see AsyncRequestBody#fromString(String)
+ */
+@SdkInternalApi
+public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
+ private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);
+
+ private final byte[] bytes;
+
+ private final String mimetype;
+
+ public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) {
+ this.bytes = bytes.clone();
+ this.mimetype = mimetype;
+ }
+
+ @Override
+ public Optional contentLength() {
+ return Optional.of((long) bytes.length);
+ }
+
+ @Override
+ public String contentType() {
+ return mimetype;
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> s) {
+ // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
+ if (s == null) {
+ throw new NullPointerException("Subscription MUST NOT be null.");
+ }
+
+ // As per 2.13, this method must return normally (i.e. not throw).
+ try {
+ s.onSubscribe(
+ new Subscription() {
+ private boolean done = false;
+
+ @Override
+ public void request(long n) {
+ if (done) {
+ return;
+ }
+ if (n > 0) {
+ done = true;
+ s.onNext(ByteBuffer.wrap(bytes));
+ s.onComplete();
+ } else {
+ s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ synchronized (this) {
+ if (!done) {
+ done = true;
+ }
+ }
+ }
+ }
+ );
+ } catch (Throwable ex) {
+ log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
+ }
+ }
+}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
deleted file mode 100644
index e7e9d00dd0e5..000000000000
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.core.internal.async;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.core.internal.util.Mimetype;
-import software.amazon.awssdk.utils.BinaryUtils;
-import software.amazon.awssdk.utils.Logger;
-
-/**
- * An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
- * using static methods on {@link AsyncRequestBody}
- *
- * @see AsyncRequestBody#fromBytes(byte[])
- * @see AsyncRequestBody#fromBytesUnsafe(byte[])
- * @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
- * @see AsyncRequestBody#fromByteBufferUnsafe(ByteBuffer)
- * @see AsyncRequestBody#fromByteBuffers(ByteBuffer...)
- * @see AsyncRequestBody#fromByteBuffersUnsafe(ByteBuffer...)
- * @see AsyncRequestBody#fromString(String)
- */
-@SdkInternalApi
-public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
- private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);
-
- private final String mimetype;
- private final Long length;
- private final ByteBuffer[] buffers;
-
- private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
- this.mimetype = mimetype;
- this.length = length;
- this.buffers = buffers;
- }
-
- @Override
- public Optional contentLength() {
- return Optional.ofNullable(length);
- }
-
- @Override
- public String contentType() {
- return mimetype;
- }
-
- @Override
- public void subscribe(Subscriber super ByteBuffer> s) {
- // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
- if (s == null) {
- throw new NullPointerException("Subscription MUST NOT be null.");
- }
-
- // As per 2.13, this method must return normally (i.e. not throw).
- try {
- s.onSubscribe(
- new Subscription() {
- private final AtomicInteger index = new AtomicInteger(0);
- private final AtomicBoolean completed = new AtomicBoolean(false);
-
- @Override
- public void request(long n) {
- if (completed.get()) {
- return;
- }
-
- if (n > 0) {
- int i = index.getAndIncrement();
-
- if (i >= buffers.length) {
- return;
- }
-
- long remaining = n;
-
- do {
- ByteBuffer buffer = buffers[i];
-
- // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928
- if (buffer.isDirect()) {
- buffer = BinaryUtils.toNonDirectBuffer(buffer);
- }
-
- s.onNext(buffer.asReadOnlyBuffer());
- remaining--;
- } while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);
-
- if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
- s.onComplete();
- }
- } else {
- s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
- }
- }
-
- @Override
- public void cancel() {
- completed.set(true);
- }
- }
- );
- } catch (Throwable ex) {
- log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
- }
- }
-
- public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) {
- long length = Arrays.stream(buffers)
- .mapToLong(ByteBuffer::remaining)
- .sum();
- return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
- }
-
- public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
- return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
- }
-
- public static ByteBuffersAsyncRequestBody of(String mimetype, ByteBuffer... buffers) {
- long length = Arrays.stream(buffers)
- .mapToLong(ByteBuffer::remaining)
- .sum();
- return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
- }
-
- public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
- return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
- }
-
- public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
- return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
- ByteBuffer.wrap(bytes));
- }
-
- public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
- return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
- }
-}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java
index 93d6d09578a6..8fd7f0260b76 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java
@@ -22,7 +22,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;
@@ -59,11 +58,10 @@ public synchronized Iterable bufferAndCreateChunks(ByteBuffer buffer
int availableToRead = bufferSize - bufferedBytes;
int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition);
- byte[] bytes = BinaryUtils.copyAllBytesFrom(buffer);
if (bufferedBytes == 0) {
- currentBuffer.put(bytes, startPosition, bytesToMove);
+ currentBuffer.put(buffer.array(), startPosition, bytesToMove);
} else {
- currentBuffer.put(bytes, 0, bytesToMove);
+ currentBuffer.put(buffer.array(), 0, bytesToMove);
}
startPosition = startPosition + bytesToMove;
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
index aab643cbb6a6..e0252c9ba6d2 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
@@ -15,39 +15,44 @@
package software.amazon.awssdk.core.async;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import io.reactivex.Flowable;
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.utils.BinaryUtils;
+import software.amazon.awssdk.utils.StringInputStream;
+@RunWith(Parameterized.class)
public class AsyncRequestBodyTest {
-
- private static final String testString = "Hello!";
- private static final Path path;
+ private final static String testString = "Hello!";
+ private final static Path path;
static {
FileSystem fs = Jimfs.newFileSystem(Configuration.unix());
@@ -59,16 +64,27 @@ public class AsyncRequestBodyTest {
}
}
- @ParameterizedTest
- @MethodSource("contentIntegrityChecks")
- void hasCorrectLength(AsyncRequestBody asyncRequestBody) {
- assertEquals(testString.length(), asyncRequestBody.contentLength().get());
+ @Parameterized.Parameters
+ public static AsyncRequestBody[] data() {
+ return new AsyncRequestBody[]{
+ AsyncRequestBody.fromString(testString),
+ AsyncRequestBody.fromFile(path)
+ };
}
+ private AsyncRequestBody provider;
+
+ public AsyncRequestBodyTest(AsyncRequestBody provider) {
+ this.provider = provider;
+ }
- @ParameterizedTest
- @MethodSource("contentIntegrityChecks")
- void hasCorrectContent(AsyncRequestBody asyncRequestBody) throws InterruptedException {
+ @Test
+ public void hasCorrectLength() {
+ assertThat(provider.contentLength().get()).isEqualTo(testString.length());
+ }
+
+ @Test
+ public void hasCorrectContent() throws InterruptedException {
StringBuilder sb = new StringBuilder();
CountDownLatch done = new CountDownLatch(1);
@@ -90,268 +106,75 @@ public void onComplete() {
}
};
- asyncRequestBody.subscribe(subscriber);
+ provider.subscribe(subscriber);
done.await();
- assertEquals(testString, sb.toString());
- }
-
- private static AsyncRequestBody[] contentIntegrityChecks() {
- return new AsyncRequestBody[] {
- AsyncRequestBody.fromString(testString),
- AsyncRequestBody.fromFile(path)
- };
+ assertThat(sb.toString()).isEqualTo(testString);
}
@Test
- void fromBytesCopiesTheProvidedByteArray() {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
- byte[] bytesClone = bytes.clone();
-
- AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytes(bytes);
-
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] += 1;
- }
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
- assertArrayEquals(bytesClone, publishedByteArray);
+ public void stringConstructorHasCorrectContentType() {
+ AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world");
+ assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=UTF-8");
}
@Test
- void fromBytesUnsafeDoesNotCopyTheProvidedByteArray() {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
-
- AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytesUnsafe(bytes);
-
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] += 1;
- }
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
- assertArrayEquals(bytes, publishedByteArray);
- }
-
- @ParameterizedTest
- @MethodSource("safeByteBufferBodyBuilders")
- void safeByteBufferBuildersCopyTheProvidedBuffer(Function bodyBuilder) {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
- byte[] bytesClone = bytes.clone();
-
- AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes));
-
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] += 1;
- }
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
- assertArrayEquals(bytesClone, publishedByteArray);
- }
-
- private static Function[] safeByteBufferBodyBuilders() {
- Function fromByteBuffer = AsyncRequestBody::fromByteBuffer;
- Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer;
- Function fromByteBuffers = AsyncRequestBody::fromByteBuffers;
- Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers;
- return new Function[] {fromByteBuffer, fromRemainingByteBuffer, fromByteBuffers, fromRemainingByteBuffers};
- }
-
- @ParameterizedTest
- @MethodSource("unsafeByteBufferBodyBuilders")
- void unsafeByteBufferBuildersDoNotCopyTheProvidedBuffer(Function bodyBuilder) {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
-
- AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes));
-
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] += 1;
- }
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
- assertArrayEquals(bytes, publishedByteArray);
- }
-
- private static Function[] unsafeByteBufferBodyBuilders() {
- Function fromByteBuffer = AsyncRequestBody::fromByteBufferUnsafe;
- Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBufferUnsafe;
- Function fromByteBuffers = AsyncRequestBody::fromByteBuffersUnsafe;
- Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffersUnsafe;
- return new Function[] {fromByteBuffer, fromRemainingByteBuffer, fromByteBuffers, fromRemainingByteBuffers};
- }
-
- @ParameterizedTest
- @MethodSource("nonRewindingByteBufferBodyBuilders")
- void nonRewindingByteBufferBuildersReadFromTheInputBufferPosition(
- Function bodyBuilder) {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- int expectedPosition = bytes.length / 2;
- bb.position(expectedPosition);
-
- AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- int remaining = bb.remaining();
- assertEquals(remaining, publishedBuffer.get().remaining());
- for (int i = 0; i < remaining; i++) {
- assertEquals(bb.get(), publishedBuffer.get().get());
- }
- }
-
- private static Function[] nonRewindingByteBufferBodyBuilders() {
- Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer;
- Function fromRemainingByteBufferUnsafe = AsyncRequestBody::fromRemainingByteBufferUnsafe;
- Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers;
- Function fromRemainingByteBuffersUnsafe = AsyncRequestBody::fromRemainingByteBuffersUnsafe;
- return new Function[] {fromRemainingByteBuffer, fromRemainingByteBufferUnsafe, fromRemainingByteBuffers,
- fromRemainingByteBuffersUnsafe};
- }
-
- @ParameterizedTest
- @MethodSource("safeNonRewindingByteBufferBodyBuilders")
- void safeNonRewindingByteBufferBuildersCopyFromTheInputBufferPosition(
- Function bodyBuilder) {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- int expectedPosition = bytes.length / 2;
- bb.position(expectedPosition);
-
- AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- int remaining = bb.remaining();
- assertEquals(remaining, publishedBuffer.get().capacity());
- for (int i = 0; i < remaining; i++) {
- assertEquals(bb.get(), publishedBuffer.get().get());
- }
- }
-
- private static Function[] safeNonRewindingByteBufferBodyBuilders() {
- Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer;
- Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers;
- return new Function[] {fromRemainingByteBuffer, fromRemainingByteBuffers};
- }
-
- @ParameterizedTest
- @MethodSource("rewindingByteBufferBodyBuilders")
- void rewindingByteBufferBuildersDoNotRewindTheInputBuffer(Function bodyBuilder) {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- int expectedPosition = bytes.length / 2;
- bb.position(expectedPosition);
-
- AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
-
- Subscriber subscriber = new SimpleSubscriber(buffer -> {
- });
-
- asyncRequestBody.subscribe(subscriber);
-
- assertEquals(expectedPosition, bb.position());
- }
-
- @ParameterizedTest
- @MethodSource("rewindingByteBufferBodyBuilders")
- void rewindingByteBufferBuildersReadTheInputBufferFromTheBeginning(
- Function bodyBuilder) {
- byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- bb.position(bytes.length / 2);
-
- AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
-
- AtomicReference publishedBuffer = new AtomicReference<>();
- Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
-
- asyncRequestBody.subscribe(subscriber);
-
- assertEquals(0, publishedBuffer.get().position());
- publishedBuffer.get().rewind();
- bb.rewind();
- assertEquals(bb, publishedBuffer.get());
- }
-
- private static Function[] rewindingByteBufferBodyBuilders() {
- Function fromByteBuffer = AsyncRequestBody::fromByteBuffer;
- Function fromByteBufferUnsafe = AsyncRequestBody::fromByteBufferUnsafe;
- Function fromByteBuffers = AsyncRequestBody::fromByteBuffers;
- Function fromByteBuffersUnsafe = AsyncRequestBody::fromByteBuffersUnsafe;
- return new Function[] {fromByteBuffer, fromByteBufferUnsafe, fromByteBuffers, fromByteBuffersUnsafe};
- }
-
- @ParameterizedTest
- @ValueSource(strings = {"US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16"})
- void charsetsAreConvertedToTheCorrectContentType(Charset charset) {
- AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", charset);
- assertEquals("text/plain; charset=" + charset.name(), requestBody.contentType());
+ public void stringWithEncoding1ConstructorHasCorrectContentType() {
+ AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", StandardCharsets.ISO_8859_1);
+ assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=ISO-8859-1");
}
@Test
- void stringConstructorHasCorrectDefaultContentType() {
- AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world");
- assertEquals("text/plain; charset=UTF-8", requestBody.contentType());
+ public void stringWithEncoding2ConstructorHasCorrectContentType() {
+ AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", StandardCharsets.UTF_16BE);
+ assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=UTF-16BE");
}
@Test
- void fileConstructorHasCorrectContentType() {
+ public void fileConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromFile(path);
- assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
+ assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}
@Test
- void bytesArrayConstructorHasCorrectContentType() {
+ public void bytesArrayConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes("hello world".getBytes());
- assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
+ assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}
@Test
- void bytesBufferConstructorHasCorrectContentType() {
+ public void bytesBufferConstructorHasCorrectContentType() {
ByteBuffer byteBuffer = ByteBuffer.wrap("hello world".getBytes());
AsyncRequestBody requestBody = AsyncRequestBody.fromByteBuffer(byteBuffer);
- assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
+ assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}
@Test
- void emptyBytesConstructorHasCorrectContentType() {
+ public void emptyBytesConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.empty();
- assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
+ assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}
@Test
- void publisherConstructorHasCorrectContentType() {
+ public void publisherConstructorHasCorrectContentType() {
List requestBodyStrings = Lists.newArrayList("A", "B", "C");
List bodyBytes = requestBodyStrings.stream()
- .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
- .collect(Collectors.toList());
+ .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
+ .collect(Collectors.toList());
Publisher bodyPublisher = Flowable.fromIterable(bodyBytes);
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
- assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
+ assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
+ }
+
+ @Test
+ public void fromBytes_byteArrayNotNull_createsCopy() {
+ byte[] original = {0x1, 0x2, 0x3, 0x4};
+ byte[] toModify = new byte[original.length];
+ System.arraycopy(original, 0, toModify, 0, original.length);
+ AsyncRequestBody body = AsyncRequestBody.fromBytes(toModify);
+ for (int i = 0; i < toModify.length; ++i) {
+ toModify[i]++;
+ }
+ ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0);
+ assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original);
}
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java
new file mode 100644
index 000000000000..378fbf2f59c3
--- /dev/null
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.internal.async;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.core.internal.util.Mimetype;
+
+public class ByteArrayAsyncRequestBodyTest {
+ private class testSubscriber implements Subscriber {
+ private Subscription subscription;
+ protected AtomicBoolean onCompleteCalled = new AtomicBoolean(false);
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ this.subscription = s;
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onComplete() {
+ subscription.request(1);
+ onCompleteCalled.set(true);
+ }
+ }
+
+ testSubscriber subscriber = new testSubscriber();
+
+ @Test
+ public void concurrentRequests_shouldCompleteNormally() {
+ ByteArrayAsyncRequestBody byteArrayReq = new ByteArrayAsyncRequestBody("Hello World!".getBytes(),
+ Mimetype.MIMETYPE_OCTET_STREAM);
+ byteArrayReq.subscribe(subscriber);
+ assertTrue(subscriber.onCompleteCalled.get());
+ }
+
+}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
deleted file mode 100644
index b4073247f8b9..000000000000
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.core.internal.async;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.IntStream;
-import org.junit.jupiter.api.Test;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.utils.BinaryUtils;
-
-class ByteBuffersAsyncRequestBodyTest {
-
- private static class TestSubscriber implements Subscriber {
- private Subscription subscription;
- private boolean onCompleteCalled = false;
- private int callsToComplete = 0;
- private final List publishedResults = Collections.synchronizedList(new ArrayList<>());
-
- public void request(long n) {
- subscription.request(n);
- }
-
- @Override
- public void onSubscribe(Subscription s) {
- this.subscription = s;
- }
-
- @Override
- public void onNext(ByteBuffer byteBuffer) {
- publishedResults.add(byteBuffer);
- }
-
- @Override
- public void onError(Throwable throwable) {
- throw new IllegalStateException(throwable);
- }
-
- @Override
- public void onComplete() {
- onCompleteCalled = true;
- callsToComplete++;
- }
- }
-
- @Test
- public void subscriberIsMarkedAsCompleted() {
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8));
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
- subscriber.request(1);
-
- assertTrue(subscriber.onCompleteCalled);
- assertEquals(1, subscriber.publishedResults.size());
- }
-
- @Test
- public void subscriberIsMarkedAsCompletedWhenARequestIsMadeForMoreBuffersThanAreAvailable() {
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8));
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
- subscriber.request(2);
-
- assertTrue(subscriber.onCompleteCalled);
- assertEquals(1, subscriber.publishedResults.size());
- }
-
- @Test
- public void subscriberIsThreadSafeAndMarkedAsCompletedExactlyOnce() throws InterruptedException {
- int numBuffers = 100;
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(IntStream.range(0, numBuffers)
- .mapToObj(i -> ByteBuffer.wrap(new byte[1]))
- .toArray(ByteBuffer[]::new));
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
-
- int parallelism = 8;
- ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
- for (int i = 0; i < parallelism; i++) {
- executorService.submit(() -> {
- for (int j = 0; j < numBuffers; j++) {
- subscriber.request(2);
- }
- });
- }
- executorService.shutdown();
- executorService.awaitTermination(1, TimeUnit.MINUTES);
-
- assertTrue(subscriber.onCompleteCalled);
- assertEquals(1, subscriber.callsToComplete);
- assertEquals(numBuffers, subscriber.publishedResults.size());
- }
-
- @Test
- public void subscriberIsNotMarkedAsCompletedWhenThereAreRemainingBuffersToPublish() {
- byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8);
- byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8);
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length),
- ByteBuffer.wrap(helloWorld),
- ByteBuffer.wrap(goodbyeWorld));
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
- subscriber.request(1);
-
- assertFalse(subscriber.onCompleteCalled);
- assertEquals(1, subscriber.publishedResults.size());
- }
-
- @Test
- public void subscriberReceivesAllBuffers() {
- byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8);
- byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8);
-
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length),
- ByteBuffer.wrap(helloWorld),
- ByteBuffer.wrap(goodbyeWorld));
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
- subscriber.request(2);
-
- assertEquals(2, subscriber.publishedResults.size());
- assertTrue(subscriber.onCompleteCalled);
- assertArrayEquals(helloWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(0)));
- assertArrayEquals(goodbyeWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(1)));
- }
-
- @Test
- public void multipleSubscribersReceiveTheSameResults() {
- ByteBuffer sourceBuffer = ByteBuffer.wrap("Hello World!".getBytes(StandardCharsets.UTF_8));
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(sourceBuffer);
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
- subscriber.request(1);
- TestSubscriber otherSubscriber = new TestSubscriber();
- requestBody.subscribe(otherSubscriber);
- otherSubscriber.request(1);
-
- ByteBuffer publishedBuffer = subscriber.publishedResults.get(0);
- ByteBuffer otherPublishedBuffer = otherSubscriber.publishedResults.get(0);
-
- assertEquals(publishedBuffer, otherPublishedBuffer);
- }
-
- @Test
- public void canceledSubscriberDoesNotReturnNewResults() {
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(new byte[0]));
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
-
- subscriber.subscription.cancel();
- subscriber.request(1);
-
- assertTrue(subscriber.publishedResults.isEmpty());
- }
-
- // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928
- @Test
- public void directBuffersAreCoppiedToNonDirectBuffers() {
- byte[] bytes = "Hello World!".getBytes(StandardCharsets.UTF_8);
- ByteBuffer buffer = ByteBuffer.allocateDirect(bytes.length)
- .put(bytes);
- buffer.flip();
- AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(buffer);
-
- TestSubscriber subscriber = new TestSubscriber();
- requestBody.subscribe(subscriber);
- subscriber.request(1);
-
- ByteBuffer publishedBuffer = subscriber.publishedResults.get(0);
- assertFalse(publishedBuffer.isDirect());
- byte[] publishedBytes = new byte[publishedBuffer.remaining()];
- publishedBuffer.get(publishedBytes);
- assertArrayEquals(bytes, publishedBytes);
- }
-
- @Test
- public void staticOfByteBufferConstructorSetsLengthBasedOnBufferRemaining() {
- ByteBuffer bb1 = ByteBuffer.allocate(2);
- ByteBuffer bb2 = ByteBuffer.allocate(2);
- bb2.position(1);
- ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(bb1, bb2);
- assertTrue(body.contentLength().isPresent());
- assertEquals(bb1.remaining() + bb2.remaining(), body.contentLength().get());
- }
-
- @Test
- public void staticFromBytesConstructorSetsLengthBasedOnArrayLength() {
- byte[] bytes = new byte[2];
- ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.from(bytes);
- assertTrue(body.contentLength().isPresent());
- assertEquals(bytes.length, body.contentLength().get());
- }
-
-}
diff --git a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java
index 192ea7cead9b..e7fd8c015e1d 100644
--- a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java
+++ b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java
@@ -117,80 +117,6 @@ public static ByteArrayInputStream toStream(ByteBuffer byteBuffer) {
return new ByteArrayInputStream(copyBytesFrom(byteBuffer));
}
- /**
- * Returns an immutable copy of the given {@code ByteBuffer}.
- *
- * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be
- * ignored.
- *
- * NOTE: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always
- * be the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)}
- *
- * @param bb the source {@code ByteBuffer} to copy.
- * @return a read only {@code ByteBuffer}.
- */
- public static ByteBuffer immutableCopyOf(ByteBuffer bb) {
- if (bb == null) {
- return null;
- }
- int sourceBufferPosition = bb.position();
- ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer();
- readOnlyCopy.rewind();
- ByteBuffer cloned = ByteBuffer.allocate(readOnlyCopy.capacity())
- .put(readOnlyCopy);
- cloned.position(sourceBufferPosition);
- return cloned.asReadOnlyBuffer();
- }
-
- /**
- * Returns an immutable copy of the remaining bytes of the given {@code ByteBuffer}.
- *
- * NOTE: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always
- * be the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)}
- *
- * @param bb the source {@code ByteBuffer} to copy.
- * @return a read only {@code ByteBuffer}.
- */
- public static ByteBuffer immutableCopyOfRemaining(ByteBuffer bb) {
- if (bb == null) {
- return null;
- }
- ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer();
- ByteBuffer cloned = ByteBuffer.allocate(readOnlyCopy.remaining())
- .put(readOnlyCopy);
- cloned.flip();
- return cloned.asReadOnlyBuffer();
- }
-
- /**
- * Returns a copy of the given {@code DirectByteBuffer} from its current position as a non-direct {@code HeapByteBuffer}
- *
- * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be
- * ignored.
- *
- * @param bb the source {@code ByteBuffer} to copy.
- * @return {@code ByteBuffer}.
- */
- public static ByteBuffer toNonDirectBuffer(ByteBuffer bb) {
- if (bb == null) {
- return null;
- }
- if (!bb.isDirect()) {
- throw new IllegalArgumentException("Provided ByteBuffer is already non-direct");
- }
- int sourceBufferPosition = bb.position();
- ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer();
- readOnlyCopy.rewind();
- ByteBuffer cloned = ByteBuffer.allocate(bb.capacity())
- .put(readOnlyCopy);
- cloned.rewind();
- cloned.position(sourceBufferPosition);
- if (bb.isReadOnly()) {
- return cloned.asReadOnlyBuffer();
- }
- return cloned;
- }
-
/**
* Returns a copy of all the bytes from the given ByteBuffer
,
* from the beginning to the buffer's limit; or null if the input is null.
diff --git a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java
index 4e416ea9e3b6..5f255d347adc 100644
--- a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java
+++ b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java
@@ -16,11 +16,9 @@
package software.amazon.awssdk.utils;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
@@ -34,11 +32,13 @@ public class BinaryUtilsTest {
public void testHex() {
{
String hex = BinaryUtils.toHex(new byte[] {0});
+ System.out.println(hex);
String hex2 = Base16Lower.encodeAsString(new byte[] {0});
assertEquals(hex, hex2);
}
{
String hex = BinaryUtils.toHex(new byte[] {-1});
+ System.out.println(hex);
String hex2 = Base16Lower.encodeAsString(new byte[] {-1});
assertEquals(hex, hex2);
}
@@ -169,7 +169,7 @@ public void testCopyRemainingBytesFrom_nullBuffer() {
@Test
public void testCopyRemainingBytesFrom_noRemainingBytes() {
ByteBuffer bb = ByteBuffer.allocate(1);
- bb.put(new byte[] {1});
+ bb.put(new byte[]{1});
bb.flip();
bb.get();
@@ -180,7 +180,7 @@ public void testCopyRemainingBytesFrom_noRemainingBytes() {
@Test
public void testCopyRemainingBytesFrom_fullBuffer() {
ByteBuffer bb = ByteBuffer.allocate(4);
- bb.put(new byte[] {1, 2, 3, 4});
+ bb.put(new byte[]{1, 2, 3, 4});
bb.flip();
byte[] copy = BinaryUtils.copyRemainingBytesFrom(bb);
@@ -191,7 +191,7 @@ public void testCopyRemainingBytesFrom_fullBuffer() {
@Test
public void testCopyRemainingBytesFrom_partiallyReadBuffer() {
ByteBuffer bb = ByteBuffer.allocate(4);
- bb.put(new byte[] {1, 2, 3, 4});
+ bb.put(new byte[]{1, 2, 3, 4});
bb.flip();
bb.get();
@@ -201,137 +201,4 @@ public void testCopyRemainingBytesFrom_partiallyReadBuffer() {
assertThat(bb).isEqualTo(ByteBuffer.wrap(copy));
assertThat(copy).hasSize(2);
}
-
- @Test
- public void testImmutableCopyOfByteBuffer() {
- ByteBuffer sourceBuffer = ByteBuffer.allocate(4);
- byte[] originalBytesInSource = {1, 2, 3, 4};
- sourceBuffer.put(originalBytesInSource);
- sourceBuffer.flip();
-
- ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer);
-
- byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4};
- sourceBuffer.put(bytesInSourceAfterCopy);
- sourceBuffer.flip();
-
- assertTrue(immutableCopy.isReadOnly());
- byte[] fromImmutableCopy = new byte[originalBytesInSource.length];
- immutableCopy.get(fromImmutableCopy);
- assertArrayEquals(originalBytesInSource, fromImmutableCopy);
-
- assertEquals(0, sourceBuffer.position());
- byte[] fromSource = new byte[bytesInSourceAfterCopy.length];
- sourceBuffer.get(fromSource);
- assertArrayEquals(bytesInSourceAfterCopy, fromSource);
- }
-
- @Test
- public void testImmutableCopyOfByteBuffer_nullBuffer() {
- assertNull(BinaryUtils.immutableCopyOf(null));
- }
-
- @Test
- public void testImmutableCopyOfByteBuffer_partiallyReadBuffer() {
- ByteBuffer sourceBuffer = ByteBuffer.allocate(4);
- byte[] bytes = {1, 2, 3, 4};
- sourceBuffer.put(bytes);
- sourceBuffer.position(2);
-
- ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer);
-
- assertEquals(sourceBuffer.position(), immutableCopy.position());
- immutableCopy.rewind();
- byte[] fromImmutableCopy = new byte[bytes.length];
- immutableCopy.get(fromImmutableCopy);
- assertArrayEquals(bytes, fromImmutableCopy);
- }
-
- @Test
- public void testImmutableCopyOfRemainingByteBuffer() {
- ByteBuffer sourceBuffer = ByteBuffer.allocate(4);
- byte[] originalBytesInSource = {1, 2, 3, 4};
- sourceBuffer.put(originalBytesInSource);
- sourceBuffer.flip();
-
- ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(sourceBuffer);
-
- byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4};
- sourceBuffer.put(bytesInSourceAfterCopy);
- sourceBuffer.flip();
-
- assertTrue(immutableCopy.isReadOnly());
- byte[] fromImmutableCopy = new byte[originalBytesInSource.length];
- immutableCopy.get(fromImmutableCopy);
- assertArrayEquals(originalBytesInSource, fromImmutableCopy);
-
- assertEquals(0, sourceBuffer.position());
- byte[] fromSource = new byte[bytesInSourceAfterCopy.length];
- sourceBuffer.get(fromSource);
- assertArrayEquals(bytesInSourceAfterCopy, fromSource);
- }
-
- @Test
- public void testImmutableCopyOfByteBufferRemaining_nullBuffer() {
- assertNull(BinaryUtils.immutableCopyOfRemaining(null));
- }
-
- @Test
- public void testImmutableCopyOfByteBufferRemaining_partiallyReadBuffer() {
- ByteBuffer sourceBuffer = ByteBuffer.allocate(4);
- byte[] bytes = {1, 2, 3, 4};
- sourceBuffer.put(bytes);
- sourceBuffer.position(2);
-
- ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(sourceBuffer);
-
- assertEquals(2, immutableCopy.capacity());
- assertEquals(2, immutableCopy.remaining());
- assertEquals(0, immutableCopy.position());
- assertEquals((byte) 3, immutableCopy.get());
- assertEquals((byte) 4, immutableCopy.get());
- }
-
- @Test
- public void testToNonDirectBuffer() {
- ByteBuffer bb = ByteBuffer.allocateDirect(4);
- byte[] expected = {1, 2, 3, 4};
- bb.put(expected);
- bb.flip();
-
- ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(bb);
-
- assertFalse(nonDirectBuffer.isDirect());
- byte[] bytes = new byte[expected.length];
- nonDirectBuffer.get(bytes);
- assertArrayEquals(expected, bytes);
- }
-
- @Test
- public void testToNonDirectBuffer_nullBuffer() {
- assertNull(BinaryUtils.toNonDirectBuffer(null));
- }
-
- @Test
- public void testToNonDirectBuffer_partiallyReadBuffer() {
- ByteBuffer sourceBuffer = ByteBuffer.allocateDirect(4);
- byte[] bytes = {1, 2, 3, 4};
- sourceBuffer.put(bytes);
- sourceBuffer.position(2);
-
- ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(sourceBuffer);
-
- assertEquals(sourceBuffer.position(), nonDirectBuffer.position());
- nonDirectBuffer.rewind();
- byte[] fromNonDirectBuffer = new byte[bytes.length];
- nonDirectBuffer.get(fromNonDirectBuffer);
- assertArrayEquals(bytes, fromNonDirectBuffer);
- }
-
- @Test
- public void testToNonDirectBuffer_nonDirectBuffer() {
- ByteBuffer nonDirectBuffer = ByteBuffer.allocate(0);
- assertThrows(IllegalArgumentException.class, () -> BinaryUtils.toNonDirectBuffer(nonDirectBuffer));
- }
-
}