diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 07dea1568089..7a1738f51d97 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -22,38 +22,37 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.Arrays; import java.util.Optional; import java.util.concurrent.ExecutorService; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody; +import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody; import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.utils.BinaryUtils; /** - * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is - * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e. - * to write that data on the wire). + * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where + * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber + * of the data (i.e. to write that data on the wire). * *

* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe - * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a - * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the - * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls. + * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link + * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If + * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls. *

* *

- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The - * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be - * notified via the {@link org.reactivestreams.Subscription#request(long)} method. + * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. + * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits + * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method. *

* * @see FileAsyncRequestBody - * @see ByteBuffersAsyncRequestBody + * @see ByteArrayAsyncRequestBody */ @SdkPublicApi public interface AsyncRequestBody extends SdkPublisher { @@ -71,8 +70,8 @@ default String contentType() { } /** - * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the - * publisher publishes the data. + * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. + * The data is delivered when the publisher publishes the data. * * @param publisher Publisher of source data * @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher @@ -125,11 +124,11 @@ static AsyncRequestBody fromFile(File file) { * @param string The string to provide. * @param cs The {@link Charset} to use. * @return Implementation of {@link AsyncRequestBody} that uses the specified string. - * @see ByteBuffersAsyncRequestBody + * @see ByteArrayAsyncRequestBody */ static AsyncRequestBody fromString(String string, Charset cs) { - return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(), - string.getBytes(cs)); + return new ByteArrayAsyncRequestBody(string.getBytes(cs), + Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name()); } /** @@ -144,181 +143,29 @@ static AsyncRequestBody fromString(String string) { } /** - * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent - * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}. + * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the + * original byte array are not reflected in the {@link AsyncRequestBody}. * * @param bytes The bytes to send to the service. * @return AsyncRequestBody instance. */ static AsyncRequestBody fromBytes(byte[] bytes) { - byte[] clonedBytes = bytes.clone(); - return ByteBuffersAsyncRequestBody.from(clonedBytes); + return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM); } /** - * Creates an {@link AsyncRequestBody} from a byte array without copying the contents of the byte array. This - * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody} - * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this - * {@code AsyncRequestBody} implementation. - * - *

As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks. - * - * @param bytes The bytes to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromBytesUnsafe(byte[] bytes) { - return ByteBuffersAsyncRequestBody.from(bytes); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to - * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}. - *

- * NOTE: This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need - * it to copy only the remaining readable bytes. + * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications + * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}. * * @param byteBuffer ByteBuffer to send to the service. * @return AsyncRequestBody instance. */ static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) { - ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer); - immutableCopy.rewind(); - return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy); - } - - /** - * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the - * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being - * reflected in the {@link AsyncRequestBody}. - *

Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads - * only the remaining bytes. - * - * @param byteBuffer ByteBuffer to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) { - ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer); - return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the - * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this - * {@code AsyncRequestBody} implementation. - *

- * NOTE: This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you - * need it to copy only the remaining readable bytes. - * - *

As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the - * risks. - * - * @param byteBuffer ByteBuffer to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) { - ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); - readOnlyBuffer.rewind(); - return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the - * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this - * {@code AsyncRequestBody} implementation. - *

Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of - * the buffer and reads only the remaining bytes. - * - *

As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the - * risks. - * - * @param byteBuffer ByteBuffer to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) { - ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); - return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer} - * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}. - *

- * NOTE: This method ignores the current read position of each {@link ByteBuffer}. Use - * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes. - * - * @param byteBuffers ByteBuffer array to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) { - ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers) - .map(BinaryUtils::immutableCopyOf) - .peek(ByteBuffer::rewind) - .toArray(ByteBuffer[]::new); - return ByteBuffersAsyncRequestBody.of(immutableCopy); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each - * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the - * {@link AsyncRequestBody}. - *

Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, - * this method respects the current read position of each buffer and reads only the remaining bytes. - * - * @param byteBuffers ByteBuffer array to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) { - ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers) - .map(BinaryUtils::immutableCopyOfRemaining) - .peek(ByteBuffer::rewind) - .toArray(ByteBuffer[]::new); - return ByteBuffersAsyncRequestBody.of(immutableCopy); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each - * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this - * {@code AsyncRequestBody} implementation. - *

- * NOTE: This method ignores the current read position of each {@link ByteBuffer}. Use - * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes. - * - *

As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the - * risks. - * - * @param byteBuffers ByteBuffer array to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) { - ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers) - .map(ByteBuffer::asReadOnlyBuffer) - .peek(ByteBuffer::rewind) - .toArray(ByteBuffer[]::new); - return ByteBuffersAsyncRequestBody.of(readOnlyBuffers); - } - - /** - * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each - * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this - * {@code AsyncRequestBody} implementation. - *

Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)}, - * this method respects the current read position of each buffer and reads only the remaining bytes. - * - *

As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the - * risks. - * - * @param byteBuffers ByteBuffer array to send to the service. - * @return AsyncRequestBody instance. - */ - static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) { - ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers) - .map(ByteBuffer::asReadOnlyBuffer) - .toArray(ByteBuffer[]::new); - return ByteBuffersAsyncRequestBody.of(readOnlyBuffers); + return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer)); } /** - * Creates an {@link AsyncRequestBody} from an {@link InputStream}. + * Creates a {@link AsyncRequestBody} from a {@link InputStream}. * *

An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the * non-blocking event loop threads owned by the SDK. @@ -392,7 +239,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content } /** - * Creates an {@link AsyncRequestBody} with no content. + * Creates a {@link AsyncRequestBody} with no content. * * @return AsyncRequestBody instance. */ diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java new file mode 100644 index 000000000000..29205479b798 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java @@ -0,0 +1,98 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.util.Optional; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.utils.Logger; + +/** + * An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static + * methods on {@link AsyncRequestBody} + * + * @see AsyncRequestBody#fromBytes(byte[]) + * @see AsyncRequestBody#fromByteBuffer(ByteBuffer) + * @see AsyncRequestBody#fromString(String) + */ +@SdkInternalApi +public final class ByteArrayAsyncRequestBody implements AsyncRequestBody { + private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class); + + private final byte[] bytes; + + private final String mimetype; + + public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) { + this.bytes = bytes.clone(); + this.mimetype = mimetype; + } + + @Override + public Optional contentLength() { + return Optional.of((long) bytes.length); + } + + @Override + public String contentType() { + return mimetype; + } + + @Override + public void subscribe(Subscriber s) { + // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null + if (s == null) { + throw new NullPointerException("Subscription MUST NOT be null."); + } + + // As per 2.13, this method must return normally (i.e. not throw). + try { + s.onSubscribe( + new Subscription() { + private boolean done = false; + + @Override + public void request(long n) { + if (done) { + return; + } + if (n > 0) { + done = true; + s.onNext(ByteBuffer.wrap(bytes)); + s.onComplete(); + } else { + s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); + } + } + + @Override + public void cancel() { + synchronized (this) { + if (!done) { + done = true; + } + } + } + } + ); + } catch (Throwable ex) { + log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex); + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java deleted file mode 100644 index e7e9d00dd0e5..000000000000 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.internal.async; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.internal.util.Mimetype; -import software.amazon.awssdk.utils.BinaryUtils; -import software.amazon.awssdk.utils.Logger; - -/** - * An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created - * using static methods on {@link AsyncRequestBody} - * - * @see AsyncRequestBody#fromBytes(byte[]) - * @see AsyncRequestBody#fromBytesUnsafe(byte[]) - * @see AsyncRequestBody#fromByteBuffer(ByteBuffer) - * @see AsyncRequestBody#fromByteBufferUnsafe(ByteBuffer) - * @see AsyncRequestBody#fromByteBuffers(ByteBuffer...) - * @see AsyncRequestBody#fromByteBuffersUnsafe(ByteBuffer...) - * @see AsyncRequestBody#fromString(String) - */ -@SdkInternalApi -public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody { - private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class); - - private final String mimetype; - private final Long length; - private final ByteBuffer[] buffers; - - private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) { - this.mimetype = mimetype; - this.length = length; - this.buffers = buffers; - } - - @Override - public Optional contentLength() { - return Optional.ofNullable(length); - } - - @Override - public String contentType() { - return mimetype; - } - - @Override - public void subscribe(Subscriber s) { - // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null - if (s == null) { - throw new NullPointerException("Subscription MUST NOT be null."); - } - - // As per 2.13, this method must return normally (i.e. not throw). - try { - s.onSubscribe( - new Subscription() { - private final AtomicInteger index = new AtomicInteger(0); - private final AtomicBoolean completed = new AtomicBoolean(false); - - @Override - public void request(long n) { - if (completed.get()) { - return; - } - - if (n > 0) { - int i = index.getAndIncrement(); - - if (i >= buffers.length) { - return; - } - - long remaining = n; - - do { - ByteBuffer buffer = buffers[i]; - - // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928 - if (buffer.isDirect()) { - buffer = BinaryUtils.toNonDirectBuffer(buffer); - } - - s.onNext(buffer.asReadOnlyBuffer()); - remaining--; - } while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length); - - if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) { - s.onComplete(); - } - } else { - s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); - } - } - - @Override - public void cancel() { - completed.set(true); - } - } - ); - } catch (Throwable ex) { - log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex); - } - } - - public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) { - long length = Arrays.stream(buffers) - .mapToLong(ByteBuffer::remaining) - .sum(); - return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers); - } - - public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) { - return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers); - } - - public static ByteBuffersAsyncRequestBody of(String mimetype, ByteBuffer... buffers) { - long length = Arrays.stream(buffers) - .mapToLong(ByteBuffer::remaining) - .sum(); - return new ByteBuffersAsyncRequestBody(mimetype, length, buffers); - } - - public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) { - return new ByteBuffersAsyncRequestBody(mimetype, length, buffers); - } - - public static ByteBuffersAsyncRequestBody from(byte[] bytes) { - return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length, - ByteBuffer.wrap(bytes)); - } - - public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) { - return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes)); - } -} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java index 93d6d09578a6..8fd7f0260b76 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.SdkBuilder; @@ -59,11 +58,10 @@ public synchronized Iterable bufferAndCreateChunks(ByteBuffer buffer int availableToRead = bufferSize - bufferedBytes; int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); - byte[] bytes = BinaryUtils.copyAllBytesFrom(buffer); if (bufferedBytes == 0) { - currentBuffer.put(bytes, startPosition, bytesToMove); + currentBuffer.put(buffer.array(), startPosition, bytesToMove); } else { - currentBuffer.put(bytes, 0, bytesToMove); + currentBuffer.put(buffer.array(), 0, bytesToMove); } startPosition = startPosition + bytesToMove; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java index aab643cbb6a6..e0252c9ba6d2 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java @@ -15,39 +15,44 @@ package software.amazon.awssdk.core.async; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; import io.reactivex.Flowable; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Instant; +import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.stream.Collectors; import org.assertj.core.util.Lists; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.http.async.SimpleSubscriber; import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.StringInputStream; +@RunWith(Parameterized.class) public class AsyncRequestBodyTest { - - private static final String testString = "Hello!"; - private static final Path path; + private final static String testString = "Hello!"; + private final static Path path; static { FileSystem fs = Jimfs.newFileSystem(Configuration.unix()); @@ -59,16 +64,27 @@ public class AsyncRequestBodyTest { } } - @ParameterizedTest - @MethodSource("contentIntegrityChecks") - void hasCorrectLength(AsyncRequestBody asyncRequestBody) { - assertEquals(testString.length(), asyncRequestBody.contentLength().get()); + @Parameterized.Parameters + public static AsyncRequestBody[] data() { + return new AsyncRequestBody[]{ + AsyncRequestBody.fromString(testString), + AsyncRequestBody.fromFile(path) + }; } + private AsyncRequestBody provider; + + public AsyncRequestBodyTest(AsyncRequestBody provider) { + this.provider = provider; + } - @ParameterizedTest - @MethodSource("contentIntegrityChecks") - void hasCorrectContent(AsyncRequestBody asyncRequestBody) throws InterruptedException { + @Test + public void hasCorrectLength() { + assertThat(provider.contentLength().get()).isEqualTo(testString.length()); + } + + @Test + public void hasCorrectContent() throws InterruptedException { StringBuilder sb = new StringBuilder(); CountDownLatch done = new CountDownLatch(1); @@ -90,268 +106,75 @@ public void onComplete() { } }; - asyncRequestBody.subscribe(subscriber); + provider.subscribe(subscriber); done.await(); - assertEquals(testString, sb.toString()); - } - - private static AsyncRequestBody[] contentIntegrityChecks() { - return new AsyncRequestBody[] { - AsyncRequestBody.fromString(testString), - AsyncRequestBody.fromFile(path) - }; + assertThat(sb.toString()).isEqualTo(testString); } @Test - void fromBytesCopiesTheProvidedByteArray() { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - byte[] bytesClone = bytes.clone(); - - AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytes(bytes); - - for (int i = 0; i < bytes.length; i++) { - bytes[i] += 1; - } - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get()); - assertArrayEquals(bytesClone, publishedByteArray); + public void stringConstructorHasCorrectContentType() { + AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world"); + assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=UTF-8"); } @Test - void fromBytesUnsafeDoesNotCopyTheProvidedByteArray() { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - - AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytesUnsafe(bytes); - - for (int i = 0; i < bytes.length; i++) { - bytes[i] += 1; - } - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get()); - assertArrayEquals(bytes, publishedByteArray); - } - - @ParameterizedTest - @MethodSource("safeByteBufferBodyBuilders") - void safeByteBufferBuildersCopyTheProvidedBuffer(Function bodyBuilder) { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - byte[] bytesClone = bytes.clone(); - - AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes)); - - for (int i = 0; i < bytes.length; i++) { - bytes[i] += 1; - } - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get()); - assertArrayEquals(bytesClone, publishedByteArray); - } - - private static Function[] safeByteBufferBodyBuilders() { - Function fromByteBuffer = AsyncRequestBody::fromByteBuffer; - Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer; - Function fromByteBuffers = AsyncRequestBody::fromByteBuffers; - Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers; - return new Function[] {fromByteBuffer, fromRemainingByteBuffer, fromByteBuffers, fromRemainingByteBuffers}; - } - - @ParameterizedTest - @MethodSource("unsafeByteBufferBodyBuilders") - void unsafeByteBufferBuildersDoNotCopyTheProvidedBuffer(Function bodyBuilder) { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - - AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes)); - - for (int i = 0; i < bytes.length; i++) { - bytes[i] += 1; - } - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get()); - assertArrayEquals(bytes, publishedByteArray); - } - - private static Function[] unsafeByteBufferBodyBuilders() { - Function fromByteBuffer = AsyncRequestBody::fromByteBufferUnsafe; - Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBufferUnsafe; - Function fromByteBuffers = AsyncRequestBody::fromByteBuffersUnsafe; - Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffersUnsafe; - return new Function[] {fromByteBuffer, fromRemainingByteBuffer, fromByteBuffers, fromRemainingByteBuffers}; - } - - @ParameterizedTest - @MethodSource("nonRewindingByteBufferBodyBuilders") - void nonRewindingByteBufferBuildersReadFromTheInputBufferPosition( - Function bodyBuilder) { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - ByteBuffer bb = ByteBuffer.wrap(bytes); - int expectedPosition = bytes.length / 2; - bb.position(expectedPosition); - - AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb); - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - int remaining = bb.remaining(); - assertEquals(remaining, publishedBuffer.get().remaining()); - for (int i = 0; i < remaining; i++) { - assertEquals(bb.get(), publishedBuffer.get().get()); - } - } - - private static Function[] nonRewindingByteBufferBodyBuilders() { - Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer; - Function fromRemainingByteBufferUnsafe = AsyncRequestBody::fromRemainingByteBufferUnsafe; - Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers; - Function fromRemainingByteBuffersUnsafe = AsyncRequestBody::fromRemainingByteBuffersUnsafe; - return new Function[] {fromRemainingByteBuffer, fromRemainingByteBufferUnsafe, fromRemainingByteBuffers, - fromRemainingByteBuffersUnsafe}; - } - - @ParameterizedTest - @MethodSource("safeNonRewindingByteBufferBodyBuilders") - void safeNonRewindingByteBufferBuildersCopyFromTheInputBufferPosition( - Function bodyBuilder) { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - ByteBuffer bb = ByteBuffer.wrap(bytes); - int expectedPosition = bytes.length / 2; - bb.position(expectedPosition); - - AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb); - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - int remaining = bb.remaining(); - assertEquals(remaining, publishedBuffer.get().capacity()); - for (int i = 0; i < remaining; i++) { - assertEquals(bb.get(), publishedBuffer.get().get()); - } - } - - private static Function[] safeNonRewindingByteBufferBodyBuilders() { - Function fromRemainingByteBuffer = AsyncRequestBody::fromRemainingByteBuffer; - Function fromRemainingByteBuffers = AsyncRequestBody::fromRemainingByteBuffers; - return new Function[] {fromRemainingByteBuffer, fromRemainingByteBuffers}; - } - - @ParameterizedTest - @MethodSource("rewindingByteBufferBodyBuilders") - void rewindingByteBufferBuildersDoNotRewindTheInputBuffer(Function bodyBuilder) { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - ByteBuffer bb = ByteBuffer.wrap(bytes); - int expectedPosition = bytes.length / 2; - bb.position(expectedPosition); - - AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb); - - Subscriber subscriber = new SimpleSubscriber(buffer -> { - }); - - asyncRequestBody.subscribe(subscriber); - - assertEquals(expectedPosition, bb.position()); - } - - @ParameterizedTest - @MethodSource("rewindingByteBufferBodyBuilders") - void rewindingByteBufferBuildersReadTheInputBufferFromTheBeginning( - Function bodyBuilder) { - byte[] bytes = testString.getBytes(StandardCharsets.UTF_8); - ByteBuffer bb = ByteBuffer.wrap(bytes); - bb.position(bytes.length / 2); - - AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb); - - AtomicReference publishedBuffer = new AtomicReference<>(); - Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set); - - asyncRequestBody.subscribe(subscriber); - - assertEquals(0, publishedBuffer.get().position()); - publishedBuffer.get().rewind(); - bb.rewind(); - assertEquals(bb, publishedBuffer.get()); - } - - private static Function[] rewindingByteBufferBodyBuilders() { - Function fromByteBuffer = AsyncRequestBody::fromByteBuffer; - Function fromByteBufferUnsafe = AsyncRequestBody::fromByteBufferUnsafe; - Function fromByteBuffers = AsyncRequestBody::fromByteBuffers; - Function fromByteBuffersUnsafe = AsyncRequestBody::fromByteBuffersUnsafe; - return new Function[] {fromByteBuffer, fromByteBufferUnsafe, fromByteBuffers, fromByteBuffersUnsafe}; - } - - @ParameterizedTest - @ValueSource(strings = {"US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16"}) - void charsetsAreConvertedToTheCorrectContentType(Charset charset) { - AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", charset); - assertEquals("text/plain; charset=" + charset.name(), requestBody.contentType()); + public void stringWithEncoding1ConstructorHasCorrectContentType() { + AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", StandardCharsets.ISO_8859_1); + assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=ISO-8859-1"); } @Test - void stringConstructorHasCorrectDefaultContentType() { - AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world"); - assertEquals("text/plain; charset=UTF-8", requestBody.contentType()); + public void stringWithEncoding2ConstructorHasCorrectContentType() { + AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", StandardCharsets.UTF_16BE); + assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=UTF-16BE"); } @Test - void fileConstructorHasCorrectContentType() { + public void fileConstructorHasCorrectContentType() { AsyncRequestBody requestBody = AsyncRequestBody.fromFile(path); - assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType()); + assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM); } @Test - void bytesArrayConstructorHasCorrectContentType() { + public void bytesArrayConstructorHasCorrectContentType() { AsyncRequestBody requestBody = AsyncRequestBody.fromBytes("hello world".getBytes()); - assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType()); + assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM); } @Test - void bytesBufferConstructorHasCorrectContentType() { + public void bytesBufferConstructorHasCorrectContentType() { ByteBuffer byteBuffer = ByteBuffer.wrap("hello world".getBytes()); AsyncRequestBody requestBody = AsyncRequestBody.fromByteBuffer(byteBuffer); - assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType()); + assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM); } @Test - void emptyBytesConstructorHasCorrectContentType() { + public void emptyBytesConstructorHasCorrectContentType() { AsyncRequestBody requestBody = AsyncRequestBody.empty(); - assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType()); + assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM); } @Test - void publisherConstructorHasCorrectContentType() { + public void publisherConstructorHasCorrectContentType() { List requestBodyStrings = Lists.newArrayList("A", "B", "C"); List bodyBytes = requestBodyStrings.stream() - .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) - .collect(Collectors.toList()); + .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + .collect(Collectors.toList()); Publisher bodyPublisher = Flowable.fromIterable(bodyBytes); AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher); - assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType()); + assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM); + } + + @Test + public void fromBytes_byteArrayNotNull_createsCopy() { + byte[] original = {0x1, 0x2, 0x3, 0x4}; + byte[] toModify = new byte[original.length]; + System.arraycopy(original, 0, toModify, 0, original.length); + AsyncRequestBody body = AsyncRequestBody.fromBytes(toModify); + for (int i = 0; i < toModify.length; ++i) { + toModify[i]++; + } + ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0); + assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original); } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java new file mode 100644 index 000000000000..378fbf2f59c3 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java @@ -0,0 +1,65 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.internal.util.Mimetype; + +public class ByteArrayAsyncRequestBodyTest { + private class testSubscriber implements Subscriber { + private Subscription subscription; + protected AtomicBoolean onCompleteCalled = new AtomicBoolean(false); + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscription.request(1); + onCompleteCalled.set(true); + } + } + + testSubscriber subscriber = new testSubscriber(); + + @Test + public void concurrentRequests_shouldCompleteNormally() { + ByteArrayAsyncRequestBody byteArrayReq = new ByteArrayAsyncRequestBody("Hello World!".getBytes(), + Mimetype.MIMETYPE_OCTET_STREAM); + byteArrayReq.subscribe(subscriber); + assertTrue(subscriber.onCompleteCalled.get()); + } + +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java deleted file mode 100644 index b4073247f8b9..000000000000 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.internal.async; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; -import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.utils.BinaryUtils; - -class ByteBuffersAsyncRequestBodyTest { - - private static class TestSubscriber implements Subscriber { - private Subscription subscription; - private boolean onCompleteCalled = false; - private int callsToComplete = 0; - private final List publishedResults = Collections.synchronizedList(new ArrayList<>()); - - public void request(long n) { - subscription.request(n); - } - - @Override - public void onSubscribe(Subscription s) { - this.subscription = s; - } - - @Override - public void onNext(ByteBuffer byteBuffer) { - publishedResults.add(byteBuffer); - } - - @Override - public void onError(Throwable throwable) { - throw new IllegalStateException(throwable); - } - - @Override - public void onComplete() { - onCompleteCalled = true; - callsToComplete++; - } - } - - @Test - public void subscriberIsMarkedAsCompleted() { - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8)); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - subscriber.request(1); - - assertTrue(subscriber.onCompleteCalled); - assertEquals(1, subscriber.publishedResults.size()); - } - - @Test - public void subscriberIsMarkedAsCompletedWhenARequestIsMadeForMoreBuffersThanAreAvailable() { - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8)); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - subscriber.request(2); - - assertTrue(subscriber.onCompleteCalled); - assertEquals(1, subscriber.publishedResults.size()); - } - - @Test - public void subscriberIsThreadSafeAndMarkedAsCompletedExactlyOnce() throws InterruptedException { - int numBuffers = 100; - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(IntStream.range(0, numBuffers) - .mapToObj(i -> ByteBuffer.wrap(new byte[1])) - .toArray(ByteBuffer[]::new)); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - - int parallelism = 8; - ExecutorService executorService = Executors.newFixedThreadPool(parallelism); - for (int i = 0; i < parallelism; i++) { - executorService.submit(() -> { - for (int j = 0; j < numBuffers; j++) { - subscriber.request(2); - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.MINUTES); - - assertTrue(subscriber.onCompleteCalled); - assertEquals(1, subscriber.callsToComplete); - assertEquals(numBuffers, subscriber.publishedResults.size()); - } - - @Test - public void subscriberIsNotMarkedAsCompletedWhenThereAreRemainingBuffersToPublish() { - byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8); - byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8); - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length), - ByteBuffer.wrap(helloWorld), - ByteBuffer.wrap(goodbyeWorld)); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - subscriber.request(1); - - assertFalse(subscriber.onCompleteCalled); - assertEquals(1, subscriber.publishedResults.size()); - } - - @Test - public void subscriberReceivesAllBuffers() { - byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8); - byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8); - - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length), - ByteBuffer.wrap(helloWorld), - ByteBuffer.wrap(goodbyeWorld)); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - subscriber.request(2); - - assertEquals(2, subscriber.publishedResults.size()); - assertTrue(subscriber.onCompleteCalled); - assertArrayEquals(helloWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(0))); - assertArrayEquals(goodbyeWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(1))); - } - - @Test - public void multipleSubscribersReceiveTheSameResults() { - ByteBuffer sourceBuffer = ByteBuffer.wrap("Hello World!".getBytes(StandardCharsets.UTF_8)); - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(sourceBuffer); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - subscriber.request(1); - TestSubscriber otherSubscriber = new TestSubscriber(); - requestBody.subscribe(otherSubscriber); - otherSubscriber.request(1); - - ByteBuffer publishedBuffer = subscriber.publishedResults.get(0); - ByteBuffer otherPublishedBuffer = otherSubscriber.publishedResults.get(0); - - assertEquals(publishedBuffer, otherPublishedBuffer); - } - - @Test - public void canceledSubscriberDoesNotReturnNewResults() { - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(new byte[0])); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - - subscriber.subscription.cancel(); - subscriber.request(1); - - assertTrue(subscriber.publishedResults.isEmpty()); - } - - // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928 - @Test - public void directBuffersAreCoppiedToNonDirectBuffers() { - byte[] bytes = "Hello World!".getBytes(StandardCharsets.UTF_8); - ByteBuffer buffer = ByteBuffer.allocateDirect(bytes.length) - .put(bytes); - buffer.flip(); - AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(buffer); - - TestSubscriber subscriber = new TestSubscriber(); - requestBody.subscribe(subscriber); - subscriber.request(1); - - ByteBuffer publishedBuffer = subscriber.publishedResults.get(0); - assertFalse(publishedBuffer.isDirect()); - byte[] publishedBytes = new byte[publishedBuffer.remaining()]; - publishedBuffer.get(publishedBytes); - assertArrayEquals(bytes, publishedBytes); - } - - @Test - public void staticOfByteBufferConstructorSetsLengthBasedOnBufferRemaining() { - ByteBuffer bb1 = ByteBuffer.allocate(2); - ByteBuffer bb2 = ByteBuffer.allocate(2); - bb2.position(1); - ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(bb1, bb2); - assertTrue(body.contentLength().isPresent()); - assertEquals(bb1.remaining() + bb2.remaining(), body.contentLength().get()); - } - - @Test - public void staticFromBytesConstructorSetsLengthBasedOnArrayLength() { - byte[] bytes = new byte[2]; - ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.from(bytes); - assertTrue(body.contentLength().isPresent()); - assertEquals(bytes.length, body.contentLength().get()); - } - -} diff --git a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java index 192ea7cead9b..e7fd8c015e1d 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java @@ -117,80 +117,6 @@ public static ByteArrayInputStream toStream(ByteBuffer byteBuffer) { return new ByteArrayInputStream(copyBytesFrom(byteBuffer)); } - /** - * Returns an immutable copy of the given {@code ByteBuffer}. - *

- * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be - * ignored. - *

- * NOTE: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always - * be the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)} - * - * @param bb the source {@code ByteBuffer} to copy. - * @return a read only {@code ByteBuffer}. - */ - public static ByteBuffer immutableCopyOf(ByteBuffer bb) { - if (bb == null) { - return null; - } - int sourceBufferPosition = bb.position(); - ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer(); - readOnlyCopy.rewind(); - ByteBuffer cloned = ByteBuffer.allocate(readOnlyCopy.capacity()) - .put(readOnlyCopy); - cloned.position(sourceBufferPosition); - return cloned.asReadOnlyBuffer(); - } - - /** - * Returns an immutable copy of the remaining bytes of the given {@code ByteBuffer}. - *

- * NOTE: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always - * be the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)} - * - * @param bb the source {@code ByteBuffer} to copy. - * @return a read only {@code ByteBuffer}. - */ - public static ByteBuffer immutableCopyOfRemaining(ByteBuffer bb) { - if (bb == null) { - return null; - } - ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer(); - ByteBuffer cloned = ByteBuffer.allocate(readOnlyCopy.remaining()) - .put(readOnlyCopy); - cloned.flip(); - return cloned.asReadOnlyBuffer(); - } - - /** - * Returns a copy of the given {@code DirectByteBuffer} from its current position as a non-direct {@code HeapByteBuffer} - *

- * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be - * ignored. - * - * @param bb the source {@code ByteBuffer} to copy. - * @return {@code ByteBuffer}. - */ - public static ByteBuffer toNonDirectBuffer(ByteBuffer bb) { - if (bb == null) { - return null; - } - if (!bb.isDirect()) { - throw new IllegalArgumentException("Provided ByteBuffer is already non-direct"); - } - int sourceBufferPosition = bb.position(); - ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer(); - readOnlyCopy.rewind(); - ByteBuffer cloned = ByteBuffer.allocate(bb.capacity()) - .put(readOnlyCopy); - cloned.rewind(); - cloned.position(sourceBufferPosition); - if (bb.isReadOnly()) { - return cloned.asReadOnlyBuffer(); - } - return cloned; - } - /** * Returns a copy of all the bytes from the given ByteBuffer, * from the beginning to the buffer's limit; or null if the input is null. diff --git a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java index 4e416ea9e3b6..5f255d347adc 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java @@ -16,11 +16,9 @@ package software.amazon.awssdk.utils; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.ByteBuffer; @@ -34,11 +32,13 @@ public class BinaryUtilsTest { public void testHex() { { String hex = BinaryUtils.toHex(new byte[] {0}); + System.out.println(hex); String hex2 = Base16Lower.encodeAsString(new byte[] {0}); assertEquals(hex, hex2); } { String hex = BinaryUtils.toHex(new byte[] {-1}); + System.out.println(hex); String hex2 = Base16Lower.encodeAsString(new byte[] {-1}); assertEquals(hex, hex2); } @@ -169,7 +169,7 @@ public void testCopyRemainingBytesFrom_nullBuffer() { @Test public void testCopyRemainingBytesFrom_noRemainingBytes() { ByteBuffer bb = ByteBuffer.allocate(1); - bb.put(new byte[] {1}); + bb.put(new byte[]{1}); bb.flip(); bb.get(); @@ -180,7 +180,7 @@ public void testCopyRemainingBytesFrom_noRemainingBytes() { @Test public void testCopyRemainingBytesFrom_fullBuffer() { ByteBuffer bb = ByteBuffer.allocate(4); - bb.put(new byte[] {1, 2, 3, 4}); + bb.put(new byte[]{1, 2, 3, 4}); bb.flip(); byte[] copy = BinaryUtils.copyRemainingBytesFrom(bb); @@ -191,7 +191,7 @@ public void testCopyRemainingBytesFrom_fullBuffer() { @Test public void testCopyRemainingBytesFrom_partiallyReadBuffer() { ByteBuffer bb = ByteBuffer.allocate(4); - bb.put(new byte[] {1, 2, 3, 4}); + bb.put(new byte[]{1, 2, 3, 4}); bb.flip(); bb.get(); @@ -201,137 +201,4 @@ public void testCopyRemainingBytesFrom_partiallyReadBuffer() { assertThat(bb).isEqualTo(ByteBuffer.wrap(copy)); assertThat(copy).hasSize(2); } - - @Test - public void testImmutableCopyOfByteBuffer() { - ByteBuffer sourceBuffer = ByteBuffer.allocate(4); - byte[] originalBytesInSource = {1, 2, 3, 4}; - sourceBuffer.put(originalBytesInSource); - sourceBuffer.flip(); - - ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer); - - byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4}; - sourceBuffer.put(bytesInSourceAfterCopy); - sourceBuffer.flip(); - - assertTrue(immutableCopy.isReadOnly()); - byte[] fromImmutableCopy = new byte[originalBytesInSource.length]; - immutableCopy.get(fromImmutableCopy); - assertArrayEquals(originalBytesInSource, fromImmutableCopy); - - assertEquals(0, sourceBuffer.position()); - byte[] fromSource = new byte[bytesInSourceAfterCopy.length]; - sourceBuffer.get(fromSource); - assertArrayEquals(bytesInSourceAfterCopy, fromSource); - } - - @Test - public void testImmutableCopyOfByteBuffer_nullBuffer() { - assertNull(BinaryUtils.immutableCopyOf(null)); - } - - @Test - public void testImmutableCopyOfByteBuffer_partiallyReadBuffer() { - ByteBuffer sourceBuffer = ByteBuffer.allocate(4); - byte[] bytes = {1, 2, 3, 4}; - sourceBuffer.put(bytes); - sourceBuffer.position(2); - - ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer); - - assertEquals(sourceBuffer.position(), immutableCopy.position()); - immutableCopy.rewind(); - byte[] fromImmutableCopy = new byte[bytes.length]; - immutableCopy.get(fromImmutableCopy); - assertArrayEquals(bytes, fromImmutableCopy); - } - - @Test - public void testImmutableCopyOfRemainingByteBuffer() { - ByteBuffer sourceBuffer = ByteBuffer.allocate(4); - byte[] originalBytesInSource = {1, 2, 3, 4}; - sourceBuffer.put(originalBytesInSource); - sourceBuffer.flip(); - - ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(sourceBuffer); - - byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4}; - sourceBuffer.put(bytesInSourceAfterCopy); - sourceBuffer.flip(); - - assertTrue(immutableCopy.isReadOnly()); - byte[] fromImmutableCopy = new byte[originalBytesInSource.length]; - immutableCopy.get(fromImmutableCopy); - assertArrayEquals(originalBytesInSource, fromImmutableCopy); - - assertEquals(0, sourceBuffer.position()); - byte[] fromSource = new byte[bytesInSourceAfterCopy.length]; - sourceBuffer.get(fromSource); - assertArrayEquals(bytesInSourceAfterCopy, fromSource); - } - - @Test - public void testImmutableCopyOfByteBufferRemaining_nullBuffer() { - assertNull(BinaryUtils.immutableCopyOfRemaining(null)); - } - - @Test - public void testImmutableCopyOfByteBufferRemaining_partiallyReadBuffer() { - ByteBuffer sourceBuffer = ByteBuffer.allocate(4); - byte[] bytes = {1, 2, 3, 4}; - sourceBuffer.put(bytes); - sourceBuffer.position(2); - - ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(sourceBuffer); - - assertEquals(2, immutableCopy.capacity()); - assertEquals(2, immutableCopy.remaining()); - assertEquals(0, immutableCopy.position()); - assertEquals((byte) 3, immutableCopy.get()); - assertEquals((byte) 4, immutableCopy.get()); - } - - @Test - public void testToNonDirectBuffer() { - ByteBuffer bb = ByteBuffer.allocateDirect(4); - byte[] expected = {1, 2, 3, 4}; - bb.put(expected); - bb.flip(); - - ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(bb); - - assertFalse(nonDirectBuffer.isDirect()); - byte[] bytes = new byte[expected.length]; - nonDirectBuffer.get(bytes); - assertArrayEquals(expected, bytes); - } - - @Test - public void testToNonDirectBuffer_nullBuffer() { - assertNull(BinaryUtils.toNonDirectBuffer(null)); - } - - @Test - public void testToNonDirectBuffer_partiallyReadBuffer() { - ByteBuffer sourceBuffer = ByteBuffer.allocateDirect(4); - byte[] bytes = {1, 2, 3, 4}; - sourceBuffer.put(bytes); - sourceBuffer.position(2); - - ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(sourceBuffer); - - assertEquals(sourceBuffer.position(), nonDirectBuffer.position()); - nonDirectBuffer.rewind(); - byte[] fromNonDirectBuffer = new byte[bytes.length]; - nonDirectBuffer.get(fromNonDirectBuffer); - assertArrayEquals(bytes, fromNonDirectBuffer); - } - - @Test - public void testToNonDirectBuffer_nonDirectBuffer() { - ByteBuffer nonDirectBuffer = ByteBuffer.allocate(0); - assertThrows(IllegalArgumentException.class, () -> BinaryUtils.toNonDirectBuffer(nonDirectBuffer)); - } - }