diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java new file mode 100644 index 000000000000..07e7a98e424e --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileRequestBodyConfiguration.java @@ -0,0 +1,209 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core; + +import java.nio.file.Path; +import java.util.Objects; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * Configuration options for {@link AsyncRequestBody#fromFile(FileRequestBodyConfiguration)} to configure how the SDK + * should read the file. + * + * @see #builder() + */ +@SdkPublicApi +public final class FileRequestBodyConfiguration implements ToCopyableBuilder { + private final Integer chunkSizeInBytes; + private final Long position; + private final Long numBytesToRead; + private final Path path; + + private FileRequestBodyConfiguration(DefaultBuilder builder) { + this.path = Validate.notNull(builder.path, "path"); + this.chunkSizeInBytes = Validate.isPositiveOrNull(builder.chunkSizeInBytes, "chunkSizeInBytes"); + this.position = Validate.isNotNegativeOrNull(builder.position, "position"); + this.numBytesToRead = Validate.isNotNegativeOrNull(builder.numBytesToRead, "numBytesToRead"); + } + + /** + * Create a {@link Builder}, used to create a {@link FileRequestBodyConfiguration}. + */ + public static Builder builder() { + return new DefaultBuilder(); + } + + /** + * @return the size of each chunk to read from the file + */ + public Integer chunkSizeInBytes() { + return chunkSizeInBytes; + } + + /** + * @return the file position at which the request body begins. + */ + public Long position() { + return position; + } + + /** + * @return the number of bytes to read from this file. + */ + public Long numBytesToRead() { + return numBytesToRead; + } + + /** + * @return the file path + */ + public Path path() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileRequestBodyConfiguration that = (FileRequestBodyConfiguration) o; + + if (!Objects.equals(chunkSizeInBytes, that.chunkSizeInBytes)) { + return false; + } + if (!Objects.equals(position, that.position)) { + return false; + } + if (!Objects.equals(numBytesToRead, that.numBytesToRead)) { + return false; + } + return Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + int result = chunkSizeInBytes != null ? chunkSizeInBytes.hashCode() : 0; + result = 31 * result + (position != null ? position.hashCode() : 0); + result = 31 * result + (numBytesToRead != null ? numBytesToRead.hashCode() : 0); + result = 31 * result + (path != null ? path.hashCode() : 0); + return result; + } + + @Override + public Builder toBuilder() { + return new DefaultBuilder(this); + } + + public interface Builder extends CopyableBuilder { + + /** + * Sets the {@link Path} to the file containing data to send to the service + * + * @param path Path to file to read. + * @return This builder for method chaining. + */ + Builder path(Path path); + + /** + * Sets the size of chunks read from the file. Increasing this will cause more data to be buffered into memory but + * may yield better latencies. Decreasing this will reduce memory usage but may cause reduced latency. Setting this value + * is very dependent on upload speed and requires some performance testing to tune. + * + *

The default chunk size is 16 KiB

+ * + * @param chunkSize New chunk size in bytes. + * @return This builder for method chaining. + */ + Builder chunkSizeInBytes(Integer chunkSize); + + /** + * Sets the file position at which the request body begins. + * + *

By default, it's 0, i.e., reading from the beginning. + * + * @param position the position of the file + * @return The builder for method chaining. + */ + Builder position(Long position); + + /** + * Sets the number of bytes to read from this file. + * + *

By default, it's same as the file length. + * + * @param numBytesToRead number of bytes to read + * @return The builder for method chaining. + */ + Builder numBytesToRead(Long numBytesToRead); + } + + private static final class DefaultBuilder implements Builder { + private Long position; + private Path path; + private Integer chunkSizeInBytes; + private Long numBytesToRead; + + private DefaultBuilder(FileRequestBodyConfiguration configuration) { + this.position = configuration.position; + this.path = configuration.path; + this.chunkSizeInBytes = configuration.chunkSizeInBytes; + this.numBytesToRead = configuration.numBytesToRead; + } + + private DefaultBuilder() { + + } + + @Override + public Builder path(Path path) { + this.path = path; + return this; + } + + @Override + public Builder chunkSizeInBytes(Integer chunkSizeInBytes) { + this.chunkSizeInBytes = chunkSizeInBytes; + return this; + } + + @Override + public Builder position(Long position) { + this.position = position; + return this; + } + + @Override + public Builder numBytesToRead(Long numBytesToRead) { + this.numBytesToRead = numBytesToRead; + return this; + } + + @Override + public FileRequestBodyConfiguration build() { + return new FileRequestBodyConfiguration(this); + } + } + +} \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index 4c7d70ab7553..8fd0fb6d6659 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -29,6 +29,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.FileRequestBodyConfiguration; import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody; import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody; @@ -112,16 +113,46 @@ static AsyncRequestBody fromFile(Path path) { /** * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. See - * {@link FileAsyncRequestBody#builder} to create a customized body implementation. + * {@link #fromFile(FileRequestBodyConfiguration)} to create a customized body implementation. * * @param file The file to read from. * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. - * @see FileAsyncRequestBody */ static AsyncRequestBody fromFile(File file) { return FileAsyncRequestBody.builder().path(file.toPath()).build(); } + /** + * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. + * + * @param configuration configuration for how the SDK should read the file + * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. + */ + static AsyncRequestBody fromFile(FileRequestBodyConfiguration configuration) { + Validate.notNull(configuration, "configuration"); + return FileAsyncRequestBody.builder() + .path(configuration.path()) + .position(configuration.position()) + .chunkSizeInBytes(configuration.chunkSizeInBytes()) + .numBytesToRead(configuration.numBytesToRead()) + .build(); + } + + /** + * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. + * + *

+ * This is a convenience method that creates an instance of the {@link FileRequestBodyConfiguration} builder, + * avoiding the need to create one manually via {@link FileRequestBodyConfiguration#builder()}. + * + * @param configuration configuration for how the SDK should read the file + * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. + */ + static AsyncRequestBody fromFile(Consumer configuration) { + Validate.notNull(configuration, "configuration"); + return fromFile(FileRequestBodyConfiguration.builder().applyMutation(configuration).build()); + } + /** * Creates an {@link AsyncRequestBody} that uses a single string as data. * @@ -410,22 +441,18 @@ static AsyncRequestBody empty() { * is 2MB and the default buffer size is 8MB. * *

- * If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the - * subscriber right after it's initialized. - *

- * If content length is null, it is sent after the entire content for that chunk is buffered. - * In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}. + * By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is + * delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after + * the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger + * than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this + * interface overrides this method. * * @see AsyncRequestBodySplitConfiguration */ default SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) { Validate.notNull(splitConfiguration, "splitConfiguration"); - return SplittingPublisher.builder() - .asyncRequestBody(this) - .chunkSizeInBytes(splitConfiguration.chunkSizeInBytes()) - .bufferSizeInBytes(splitConfiguration.bufferSizeInBytes()) - .build(); + return new SplittingPublisher(this, splitConfiguration); } /** diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java index fe51f33b4ff3..45596ab03eaa 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java @@ -28,6 +28,12 @@ @SdkPublicApi public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder { + private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L; + private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4; + private static final AsyncRequestBodySplitConfiguration DEFAULT_CONFIG = builder() + .bufferSizeInBytes(DEFAULT_BUFFER_SIZE) + .chunkSizeInBytes(DEFAULT_CHUNK_SIZE) + .build(); private final Long chunkSizeInBytes; private final Long bufferSizeInBytes; @@ -36,6 +42,10 @@ private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) { this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes"); } + public static AsyncRequestBodySplitConfiguration defaultConfiguration() { + return DEFAULT_CONFIG; + } + /** * The configured chunk size for each divided {@link AsyncRequestBody}. */ diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java index 8f7b2a483607..f8bbdd552088 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java @@ -33,9 +33,12 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.NumericUtils; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.SdkBuilder; @@ -65,16 +68,47 @@ public final class FileAsyncRequestBody implements AsyncRequestBody { * Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber. */ private final int chunkSizeInBytes; + private final long position; + private final long numBytesToRead; private FileAsyncRequestBody(DefaultBuilder builder) { this.path = builder.path; this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; this.fileLength = invokeSafely(() -> Files.size(path)); + this.position = builder.position == null ? 0 : Validate.isNotNegative(builder.position, "position"); + this.numBytesToRead = builder.numBytesToRead == null ? fileLength - this.position : + Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead"); + } + + @Override + public SdkPublisher split(AsyncRequestBodySplitConfiguration splitConfiguration) { + Validate.notNull(splitConfiguration, "splitConfiguration"); + return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split(); + } + + public Path path() { + return path; + } + + public long fileLength() { + return fileLength; + } + + public int chunkSizeInBytes() { + return chunkSizeInBytes; + } + + public long position() { + return position; + } + + public long numBytesToRead() { + return numBytesToRead; } @Override public Optional contentLength() { - return Optional.of(fileLength); + return Optional.of(numBytesToRead); } @Override @@ -91,7 +125,7 @@ public void subscribe(Subscriber s) { // We need to synchronize here because the subscriber could call // request() from within onSubscribe which would potentially // trigger onNext before onSubscribe is finished. - Subscription subscription = new FileSubscription(path, channel, s, chunkSizeInBytes); + Subscription subscription = new FileSubscription(channel, s); synchronized (subscription) { s.onSubscribe(subscription); @@ -128,7 +162,7 @@ public interface Builder extends SdkBuilder { Builder path(Path path); /** - * Sets the size of chunks read from the file. Increasing this will cause more data to be buffered into memory but + * Sets the size of chunks to read from the file. Increasing this will cause more data to be buffered into memory but * may yield better latencies. Decreasing this will reduce memory usage but may cause reduced latency. Setting this value * is very dependent on upload speed and requires some performance testing to tune. * @@ -139,12 +173,33 @@ public interface Builder extends SdkBuilder { */ Builder chunkSizeInBytes(Integer chunkSize); + /** + * Sets the file position at which the request body begins. + * + *

By default, it's 0, i.e., reading from the beginning. + * + * @param position the position of the file + * @return The builder for method chaining. + */ + Builder position(Long position); + + /** + * Sets the number of bytes to read from this file. + * + *

By default, it's same as the file length. + * + * @param numBytesToRead number of bytes to read + * @return The builder for method chaining. + */ + Builder numBytesToRead(Long numBytesToRead); } private static final class DefaultBuilder implements Builder { + private Long position; private Path path; private Integer chunkSizeInBytes; + private Long numBytesToRead; @Override public Builder path(Path path) { @@ -162,6 +217,18 @@ public Builder chunkSizeInBytes(Integer chunkSizeInBytes) { return this; } + @Override + public Builder position(Long position) { + this.position = position; + return this; + } + + @Override + public Builder numBytesToRead(Long numBytesToRead) { + this.numBytesToRead = numBytesToRead; + return this; + } + public void setChunkSizeInBytes(Integer chunkSizeInBytes) { chunkSizeInBytes(chunkSizeInBytes); } @@ -175,14 +242,12 @@ public FileAsyncRequestBody build() { /** * Reads the file for one subscriber. */ - private static final class FileSubscription implements Subscription { - private final Path path; + private final class FileSubscription implements Subscription { private final AsynchronousFileChannel inputChannel; private final Subscriber subscriber; - private final int chunkSize; - private final AtomicLong position = new AtomicLong(0); - private final AtomicLong remainingBytes = new AtomicLong(0); + private final AtomicLong currentPosition; + private final AtomicLong remainingBytes; private final long sizeAtStart; private final FileTime modifiedTimeAtStart; private long outstandingDemand = 0; @@ -190,17 +255,14 @@ private static final class FileSubscription implements Subscription { private volatile boolean done = false; private final Object lock = new Object(); - private FileSubscription(Path path, - AsynchronousFileChannel inputChannel, - Subscriber subscriber, - int chunkSize) throws IOException { - this.path = path; + private FileSubscription(AsynchronousFileChannel inputChannel, + Subscriber subscriber) throws IOException { this.inputChannel = inputChannel; this.subscriber = subscriber; - this.chunkSize = chunkSize; this.sizeAtStart = inputChannel.size(); this.modifiedTimeAtStart = Files.getLastModifiedTime(path); - this.remainingBytes.set(Validate.isNotNegative(sizeAtStart, "size")); + this.remainingBytes = new AtomicLong(numBytesToRead); + this.currentPosition = new AtomicLong(position); } @Override @@ -255,8 +317,8 @@ private void readData() { return; } - ByteBuffer buffer = ByteBuffer.allocate(chunkSize); - inputChannel.read(buffer, position.get(), buffer, new CompletionHandler() { + ByteBuffer buffer = ByteBuffer.allocate(Math.min(chunkSizeInBytes, NumericUtils.saturatedCast(remainingBytes.get()))); + inputChannel.read(buffer, currentPosition.get(), buffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { try { @@ -264,7 +326,7 @@ public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); int readBytes = attachment.remaining(); - position.addAndGet(readBytes); + currentPosition.addAndGet(readBytes); remainingBytes.addAndGet(-readBytes); signalOnNext(attachment); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java new file mode 100644 index 000000000000..4b0acfbd81f2 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java @@ -0,0 +1,185 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.async.SimplePublisher; + +/** + * A helper class to split a {@link FileAsyncRequestBody} to multiple smaller async request bodies. It ensures the buffer used to + * be under the configured size via {@link AsyncRequestBodySplitConfiguration#bufferSizeInBytes()} by tracking the number of + * concurrent ongoing {@link AsyncRequestBody}s. + */ +@SdkInternalApi +public final class FileAsyncRequestBodySplitHelper { + private static final Logger log = Logger.loggerFor(FileAsyncRequestBodySplitHelper.class); + + private final AtomicBoolean isSendingRequestBody = new AtomicBoolean(false); + private final AtomicLong remainingBytes; + + private final long totalContentLength; + private final Path path; + private final int bufferPerAsyncRequestBody; + private final long totalBufferSize; + private final long chunkSize; + + private volatile boolean isDone = false; + + private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0); + private AtomicInteger chunkIndex = new AtomicInteger(0); + + public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody, + AsyncRequestBodySplitConfiguration splitConfiguration) { + Validate.notNull(asyncRequestBody, "asyncRequestBody"); + Validate.notNull(splitConfiguration, "splitConfiguration"); + Validate.isTrue(asyncRequestBody.contentLength().isPresent(), "Content length must be present", asyncRequestBody); + this.totalContentLength = asyncRequestBody.contentLength().get(); + this.remainingBytes = new AtomicLong(totalContentLength); + this.path = asyncRequestBody.path(); + this.chunkSize = splitConfiguration.chunkSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().chunkSizeInBytes() : + splitConfiguration.chunkSizeInBytes(); + this.totalBufferSize = splitConfiguration.bufferSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() : + splitConfiguration.bufferSizeInBytes(); + this.bufferPerAsyncRequestBody = asyncRequestBody.chunkSizeInBytes(); + } + + public SdkPublisher split() { + + SimplePublisher simplePublisher = new SimplePublisher<>(); + + try { + sendAsyncRequestBody(simplePublisher); + } catch (Throwable throwable) { + simplePublisher.error(throwable); + } + + return SdkPublisher.adapt(simplePublisher); + } + + private void sendAsyncRequestBody(SimplePublisher simplePublisher) { + do { + if (!isSendingRequestBody.compareAndSet(false, true)) { + return; + } + + try { + doSendAsyncRequestBody(simplePublisher); + } finally { + isSendingRequestBody.set(false); + } + } while (shouldSendMore()); + } + + private void doSendAsyncRequestBody(SimplePublisher simplePublisher) { + while (shouldSendMore()) { + AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher); + simplePublisher.send(currentAsyncRequestBody); + numAsyncRequestBodiesInFlight.incrementAndGet(); + checkCompletion(simplePublisher, currentAsyncRequestBody); + } + } + + private void checkCompletion(SimplePublisher simplePublisher, AsyncRequestBody currentAsyncRequestBody) { + long remaining = remainingBytes.addAndGet(-currentAsyncRequestBody.contentLength().get()); + + if (remaining == 0) { + isDone = true; + simplePublisher.complete(); + } else if (remaining < 0) { + isDone = true; + simplePublisher.error(SdkClientException.create( + "Unexpected error occurred. Remaining data is negative: " + remaining)); + } + } + + private void startNextRequestBody(SimplePublisher simplePublisher) { + numAsyncRequestBodiesInFlight.decrementAndGet(); + sendAsyncRequestBody(simplePublisher); + } + + private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher simplePublisher) { + long position = chunkSize * chunkIndex.getAndIncrement(); + long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize); + FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() + .path(path) + .position(position) + .numBytesToRead(numBytesToReadForThisChunk) + .build(); + return new FileAsyncRequestBodyWrapper(fileAsyncRequestBody, simplePublisher); + } + + /** + * Should not send more if it's done OR sending next request body would exceed the total buffer size + */ + private boolean shouldSendMore() { + if (isDone) { + return false; + } + + long currentUsedBuffer = (long) numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody; + return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize; + } + + @SdkTestInternalApi + AtomicInteger numAsyncRequestBodiesInFlight() { + return numAsyncRequestBodiesInFlight; + } + + private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody { + + private final FileAsyncRequestBody fileAsyncRequestBody; + private final SimplePublisher simplePublisher; + + FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody, + SimplePublisher simplePublisher) { + this.fileAsyncRequestBody = fileAsyncRequestBody; + this.simplePublisher = simplePublisher; + } + + @Override + public void subscribe(Subscriber s) { + fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher)) + // The reason we still need to call startNextRequestBody when the subscription is + // cancelled is that upstream could cancel the subscription even though the stream has + // finished successfully before onComplete. If this happens, doAfterOnComplete callback + // will never be invoked, and if the current buffer is full, the publisher will stop + // sending new FileAsyncRequestBody, leading to uncompleted future. + .doAfterOnCancel(() -> startNextRequestBody(simplePublisher)) + .subscribe(s); + } + + @Override + public Optional contentLength() { + return fileAsyncRequestBody.contentLength(); + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index c56d1b6437d9..6d8d18a14754 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -24,6 +24,7 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.NonRetryableException; import software.amazon.awssdk.core.internal.util.NoopSubscription; @@ -41,18 +42,24 @@ @SdkInternalApi public class SplittingPublisher implements SdkPublisher { private static final Logger log = Logger.loggerFor(SplittingPublisher.class); - private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L; - private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4; private final AsyncRequestBody upstreamPublisher; private final SplittingSubscriber splittingSubscriber; private final SimplePublisher downstreamPublisher = new SimplePublisher<>(); private final long chunkSizeInBytes; private final long bufferSizeInBytes; - private SplittingPublisher(Builder builder) { - this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody"); - this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes; - this.bufferSizeInBytes = builder.bufferSizeInBytes == null ? DEFAULT_BUFFER_SIZE : builder.bufferSizeInBytes; + public SplittingPublisher(AsyncRequestBody asyncRequestBody, + AsyncRequestBodySplitConfiguration splitConfiguration) { + this.upstreamPublisher = Validate.paramNotNull(asyncRequestBody, "asyncRequestBody"); + Validate.notNull(splitConfiguration, "splitConfiguration"); + this.chunkSizeInBytes = splitConfiguration.chunkSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().chunkSizeInBytes() : + splitConfiguration.chunkSizeInBytes(); + + this.bufferSizeInBytes = splitConfiguration.bufferSizeInBytes() == null ? + AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() : + splitConfiguration.bufferSizeInBytes(); + this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null)); if (!upstreamPublisher.contentLength().isPresent()) { @@ -62,10 +69,6 @@ private SplittingPublisher(Builder builder) { } } - public static Builder builder() { - return new Builder(); - } - @Override public void subscribe(Subscriber downstreamSubscriber) { downstreamPublisher.subscribe(downstreamSubscriber); @@ -303,29 +306,4 @@ private void addDataBuffered(int length) { } } } - - public static final class Builder { - private AsyncRequestBody asyncRequestBody; - private Long chunkSizeInBytes; - private Long bufferSizeInBytes; - - public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) { - this.asyncRequestBody = asyncRequestBody; - return this; - } - - public Builder chunkSizeInBytes(Long chunkSizeInBytes) { - this.chunkSizeInBytes = chunkSizeInBytes; - return this; - } - - public Builder bufferSizeInBytes(Long bufferSizeInBytes) { - this.bufferSizeInBytes = bufferSizeInBytes; - return this; - } - - public SplittingPublisher build() { - return new SplittingPublisher(this); - } - } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java new file mode 100644 index 000000000000..535a7176856c --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/FileRequestBodyConfigurationTest.java @@ -0,0 +1,73 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.file.Paths; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; + +public class FileRequestBodyConfigurationTest { + + @Test + void equalsHashCode() { + EqualsVerifier.forClass(FileRequestBodyConfiguration.class) + .verify(); + } + + @Test + void invalidRequest_shouldThrowException() { + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .position(-1L) + .build()) + .hasMessage("position must not be negative"); + + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .numBytesToRead(-1L) + .build()) + .hasMessage("numBytesToRead must not be negative"); + + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .chunkSizeInBytes(0) + .build()) + .hasMessage("chunkSizeInBytes must be positive"); + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .path(Paths.get(".")) + .chunkSizeInBytes(-5) + .build()) + .hasMessage("chunkSizeInBytes must be positive"); + assertThatThrownBy(() -> FileRequestBodyConfiguration.builder() + .build()) + .hasMessage("path"); + } + + @Test + void toBuilder_shouldCopyAllProperties() { + FileRequestBodyConfiguration config = FileRequestBodyConfiguration.builder() + .path(Paths.get(".")).numBytesToRead(100L) + .position(1L) + .chunkSizeInBytes(1024) + .build(); + + assertThat(config.toBuilder().build()).isEqualTo(config); + } + +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java new file mode 100644 index 000000000000..4c5d0748d16d --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java @@ -0,0 +1,96 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; +import software.amazon.awssdk.testutils.RandomTempFile; + +public class FileAsyncRequestBodySplitHelperTest { + + private static final int CHUNK_SIZE = 5; + private static Path testFile; + private static ScheduledExecutorService executor; + + + @BeforeAll + public static void setup() throws IOException { + testFile = new RandomTempFile(2000).toPath(); + executor = Executors.newScheduledThreadPool(1); + } + + @AfterAll + public static void teardown() throws IOException { + try { + Files.delete(testFile); + } catch (NoSuchFileException e) { + // ignore + } + executor.shutdown(); + } + + @ParameterizedTest + @ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2}) + public void split_differentChunkSize_shouldSplitCorrectly(int chunkSize) throws Exception { + long bufferSize = 55l; + int chunkSizeInBytes = 10; + FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder() + .path(testFile) + .chunkSizeInBytes(10) + .build(); + AsyncRequestBodySplitConfiguration config = + AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes((long) chunkSize) + .bufferSizeInBytes(55L) + .build(); + FileAsyncRequestBodySplitHelper helper = new FileAsyncRequestBodySplitHelper(fileAsyncRequestBody, config); + + AtomicInteger maxConcurrency = new AtomicInteger(0); + ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(verifyConcurrentRequests(helper, maxConcurrency), + 1, 50, TimeUnit.MICROSECONDS); + + verifyIndividualAsyncRequestBody(helper.split(), testFile, chunkSize); + scheduledFuture.cancel(true); + int expectedMaxConcurrency = (int) (bufferSize / chunkSizeInBytes); + assertThat(maxConcurrency.get()).isLessThanOrEqualTo(expectedMaxConcurrency); + } + + private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper helper, AtomicInteger maxConcurrency) { + return () -> { + int concurrency = helper.numAsyncRequestBodiesInFlight().get(); + + if (concurrency > maxConcurrency.get()) { + maxConcurrency.set(concurrency); + } + assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10); + }; + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java index da9daf557e22..5d12035c1879 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java @@ -15,11 +15,14 @@ package software.amazon.awssdk.core.internal.async; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; +import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -35,9 +38,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.BinaryUtils; @@ -45,10 +51,12 @@ public class FileAsyncRequestBodyTest { private static final long MiB = 1024 * 1024; private static final long TEST_FILE_SIZE = 10 * MiB; private static Path testFile; + private static Path smallFile; @BeforeEach public void setup() throws IOException { testFile = new RandomTempFile(TEST_FILE_SIZE).toPath(); + smallFile = new RandomTempFile(100).toPath(); } @AfterEach @@ -226,6 +234,84 @@ public void changingFile_fileGetsDeleted_failsBecauseDeleted() throws Exception .hasCauseInstanceOf(IOException.class); } + @Test + public void positionNotZero_shouldReadFromPosition() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + long position = 20L; + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(smallFile) + .position(position) + .chunkSizeInBytes(10) + .build(); + + ByteArrayAsyncResponseTransformer.BaosSubscriber baosSubscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(future); + asyncRequestBody.subscribe(baosSubscriber); + assertThat(asyncRequestBody.contentLength()).contains(80L); + + byte[] bytes = future.get(1, TimeUnit.SECONDS); + + byte[] expected = new byte[80]; + try(FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { + inputStream.skip(position); + inputStream.read(expected, 0, 80); + } + + assertThat(bytes).isEqualTo(expected); + } + + @Test + public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + long position = 20L; + long numBytesToRead = 5L; + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(smallFile) + .position(position) + .numBytesToRead(numBytesToRead) + .chunkSizeInBytes(10) + .build(); + + ByteArrayAsyncResponseTransformer.BaosSubscriber baosSubscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(future); + asyncRequestBody.subscribe(baosSubscriber); + assertThat(asyncRequestBody.contentLength()).contains(numBytesToRead); + + byte[] bytes = future.get(1, TimeUnit.SECONDS); + + byte[] expected = new byte[5]; + try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { + inputStream.skip(position); + inputStream.read(expected, 0, 5); + } + + assertThat(bytes).isEqualTo(expected); + } + + @Test + public void numBytesToReadConfigured_shouldHonor() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(smallFile) + .numBytesToRead(5L) + .chunkSizeInBytes(10) + .build(); + + ByteArrayAsyncResponseTransformer.BaosSubscriber baosSubscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(future); + asyncRequestBody.subscribe(baosSubscriber); + assertThat(asyncRequestBody.contentLength()).contains(5L); + + byte[] bytes = future.get(1, TimeUnit.SECONDS); + + byte[] expected = new byte[5]; + try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { + inputStream.read(expected, 0, 5); + } + + assertThat(bytes).isEqualTo(expected); + } + private static class ControllableSubscriber implements Subscriber { private final ByteArrayOutputStream output = new ByteArrayOutputStream(); private final CompletableFuture completed = new CompletableFuture<>(); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 0966ea6eb76f..d2e06f28492a 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.io.ByteArrayInputStream; @@ -44,6 +45,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration; import software.amazon.awssdk.utils.BinaryUtils; public class SplittingPublisherTest { @@ -72,11 +74,10 @@ public static void afterAll() throws Exception { public void split_contentUnknownMaxMemorySmallerThanChunkSize_shouldThrowException() { AsyncRequestBody body = AsyncRequestBody.fromPublisher(s -> { }); - assertThatThrownBy(() -> SplittingPublisher.builder() - .asyncRequestBody(body) - .chunkSizeInBytes(10L) - .bufferSizeInBytes(5L) - .build()) + assertThatThrownBy(() -> new SplittingPublisher(body, AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(5L) + .build())) .hasMessageContaining("must be larger than or equal"); } @@ -106,11 +107,10 @@ public Optional contentLength() { return Optional.empty(); } }; - SplittingPublisher splittingPublisher = SplittingPublisher.builder() - .asyncRequestBody(asyncRequestBody) + SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, AsyncRequestBodySplitConfiguration.builder() .chunkSizeInBytes((long) CHUNK_SIZE) .bufferSizeInBytes(10L) - .build(); + .build()); List> futures = new ArrayList<>(); @@ -148,38 +148,13 @@ public Optional contentLength() { private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { - SplittingPublisher splittingPublisher = SplittingPublisher.builder() - .asyncRequestBody(asyncRequestBody) - .chunkSizeInBytes((long) chunkSize) - .bufferSizeInBytes((long) chunkSize * 4) - .build(); + SplittingPublisher splittingPublisher = new SplittingPublisher(asyncRequestBody, + AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes((long) chunkSize) + .bufferSizeInBytes((long) chunkSize * 4) + .build()); - List> futures = new ArrayList<>(); - - splittingPublisher.subscribe(requestBody -> { - CompletableFuture baosFuture = new CompletableFuture<>(); - BaosSubscriber subscriber = new BaosSubscriber(baosFuture); - futures.add(baosFuture); - requestBody.subscribe(subscriber); - }).get(5, TimeUnit.SECONDS); - - assertThat(futures.size()).isEqualTo((int) Math.ceil(CONTENT_SIZE / (double) chunkSize)); - - for (int i = 0; i < futures.size(); i++) { - try (FileInputStream fileInputStream = new FileInputStream(testFile)) { - byte[] expected; - if (i == futures.size() - 1) { - int lastChunk = CONTENT_SIZE % chunkSize == 0 ? chunkSize : (CONTENT_SIZE % chunkSize); - expected = new byte[lastChunk]; - } else { - expected = new byte[chunkSize]; - } - fileInputStream.skip(i * chunkSize); - fileInputStream.read(expected); - byte[] actualBytes = futures.get(i).join(); - assertThat(actualBytes).isEqualTo(expected); - }; - } + verifyIndividualAsyncRequestBody(splittingPublisher, testFile.toPath(), chunkSize); } private static class TestAsyncRequestBody implements AsyncRequestBody { diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java new file mode 100644 index 000000000000..04da97adbf42 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.assertj.core.api.Assertions; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; +import software.amazon.awssdk.core.internal.async.SplittingPublisherTest; + +public final class SplittingPublisherTestUtils { + + public static void verifyIndividualAsyncRequestBody(SdkPublisher publisher, + Path file, + int chunkSize) throws Exception { + + List> futures = new ArrayList<>(); + publisher.subscribe(requestBody -> { + CompletableFuture baosFuture = new CompletableFuture<>(); + ByteArrayAsyncResponseTransformer.BaosSubscriber subscriber = + new ByteArrayAsyncResponseTransformer.BaosSubscriber(baosFuture); + requestBody.subscribe(subscriber); + futures.add(baosFuture); + }).get(5, TimeUnit.SECONDS); + + long contentLength = file.toFile().length(); + Assertions.assertThat(futures.size()).isEqualTo((int) Math.ceil(contentLength / (double) chunkSize)); + + for (int i = 0; i < futures.size(); i++) { + try (FileInputStream fileInputStream = new FileInputStream(file.toFile())) { + byte[] expected; + if (i == futures.size() - 1) { + int lastChunk = contentLength % chunkSize == 0 ? chunkSize : (int) (contentLength % chunkSize); + expected = new byte[lastChunk]; + } else { + expected = new byte[chunkSize]; + } + fileInputStream.skip(i * chunkSize); + fileInputStream.read(expected); + byte[] actualBytes = futures.get(i).join(); + Assertions.assertThat(actualBytes).isEqualTo(expected); + } + } + } +} diff --git a/services/s3/pom.xml b/services/s3/pom.xml index b044265be1bb..692b1c08f790 100644 --- a/services/s3/pom.xml +++ b/services/s3/pom.xml @@ -153,6 +153,11 @@ equalsverifier test + + com.google.jimfs + jimfs + test + diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java index f7d199ac3aa6..46caefca8d61 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java @@ -196,7 +196,9 @@ public void onSubscribe(Subscription s) { returnFuture.whenComplete((r, t) -> { if (t != null) { s.cancel(); - multipartUploadHelper.cancelingOtherOngoingRequests(futures, t); + if (failureActionInitiated.compareAndSet(false, true)) { + multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest); + } } }); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java index b287270515b0..9758b77a9d84 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -193,7 +194,8 @@ void mpu_onePartFailed_shouldFailOtherPartsAndAbort(AsyncRequestBody asyncReques CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, asyncRequestBody); - assertThatThrownBy(future::join).hasMessageContaining("Failed to send multipart upload requests").hasRootCause(exception); + assertThatThrownBy(() -> future.get(100, TimeUnit.MILLISECONDS)) + .hasMessageContaining("Failed to send multipart upload requests").hasRootCause(exception); verify(s3AsyncClient, never()).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); @@ -203,10 +205,10 @@ void mpu_onePartFailed_shouldFailOtherPartsAndAbort(AsyncRequestBody asyncReques assertThat(actualRequest.uploadId()).isEqualTo(UPLOAD_ID); try { - ongoingRequest.get(1, TimeUnit.MILLISECONDS); + ongoingRequest.get(100, TimeUnit.MILLISECONDS); fail("no exception thrown"); } catch (Exception e) { - assertThat(e.getCause()).hasMessageContaining("Failed to send multipart upload requests").hasRootCause(exception); + assertThat(e.getCause()).isEqualTo(exception); } } @@ -247,9 +249,17 @@ void upload_knownContentLengthCancelResponseFuture_shouldCancelUploadPart() { CompletableFuture future = uploadHelper.uploadObject(putObjectRequest, AsyncRequestBody.fromFile(testFile)); + when(s3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) + .thenReturn(CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); + future.cancel(true); - assertThat(ongoingRequest).isCancelled(); + try { + ongoingRequest.join(); + fail("no exception"); + } catch (Exception exception) { + assertThat(ongoingRequest).isCancelled(); + } } @ParameterizedTest diff --git a/test/s3-benchmarks/src/main/resources/log4j2.properties b/test/s3-benchmarks/src/main/resources/log4j2.properties index 58a399c44f10..e4d18ecc6eac 100644 --- a/test/s3-benchmarks/src/main/resources/log4j2.properties +++ b/test/s3-benchmarks/src/main/resources/log4j2.properties @@ -43,3 +43,6 @@ rootLogger.appenderRef.file.ref = FileAppender # #logger.netty.name = io.netty.handler.logging #logger.netty.level = debug + +#logger.s3mpu.name = software.amazon.awssdk.services.s3.internal.multipart +#logger.s3mpu.level = debug \ No newline at end of file diff --git a/utils/src/main/java/software/amazon/awssdk/utils/Validate.java b/utils/src/main/java/software/amazon/awssdk/utils/Validate.java index 7890c3ee14cf..6941ad9a2527 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/Validate.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/Validate.java @@ -656,6 +656,19 @@ public static int isNotNegative(int num, String fieldName) { return num; } + public static Long isNotNegativeOrNull(Long num, String fieldName) { + + if (num == null) { + return null; + } + + if (num < 0) { + throw new IllegalArgumentException(String.format("%s must not be negative", fieldName)); + } + + return num; + } + public static long isNotNegative(long num, String fieldName) { if (num < 0) { diff --git a/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java b/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java index 2983398f83d9..29bc80edbe83 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/ValidateTest.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.utils; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -610,6 +611,19 @@ public void isNull_notNull_shouldThrow() { Validate.isNull("string", "not null"); } + @Test + public void isNotNegativeOrNull_negative_throws() { + expected.expect(IllegalArgumentException.class); + expected.expectMessage("foo"); + Validate.isNotNegativeOrNull(-1L, "foo"); + } + + @Test + public void isNotNegativeOrNull_notNegative_notThrow() { + assertThat(Validate.isNotNegativeOrNull(5L, "foo")).isEqualTo(5L); + assertThat(Validate.isNotNegativeOrNull(0L, "foo")).isEqualTo(0L); + } + @Test public void isNull_null_shouldPass() { Validate.isNull(null, "not null");