diff --git a/.changes/next-release/feature-S3TransferManager-acedb56.json b/.changes/next-release/feature-S3TransferManager-acedb56.json new file mode 100644 index 000000000000..cbec6af6edd7 --- /dev/null +++ b/.changes/next-release/feature-S3TransferManager-acedb56.json @@ -0,0 +1,6 @@ +{ + "category": "S3TransferManager", + "contributor": "", + "type": "feature", + "description": "Add support for S3TransferManager TransferListeners" +} diff --git a/core/annotations/src/main/java/software/amazon/awssdk/annotations/Immutable.java b/core/annotations/src/main/java/software/amazon/awssdk/annotations/Immutable.java index cf172c06a6d8..8d3cb250f81e 100644 --- a/core/annotations/src/main/java/software/amazon/awssdk/annotations/Immutable.java +++ b/core/annotations/src/main/java/software/amazon/awssdk/annotations/Immutable.java @@ -40,6 +40,8 @@ * Based on code developed by Brian Goetz and Tim Peierls and concepts * published in 'Java Concurrency in Practice' by Brian Goetz, Tim Peierls, * Joshua Bloch, Joseph Bowbeer, David Holmes and Doug Lea. + * + * @see Mutable */ @Documented @Target(ElementType.TYPE) diff --git a/core/annotations/src/main/java/software/amazon/awssdk/annotations/Mutable.java b/core/annotations/src/main/java/software/amazon/awssdk/annotations/Mutable.java new file mode 100644 index 000000000000..f7d09bcc65a4 --- /dev/null +++ b/core/annotations/src/main/java/software/amazon/awssdk/annotations/Mutable.java @@ -0,0 +1,38 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * The class to which this annotation is applied is explicitly mutable, + * meaning that its state is subject to change between calls. Mutable + * classes offer no inherent guarantees on thread-safety. Where possible, + * classes may be further annotated as either {@link ThreadSafe} or + * {@link NotThreadSafe}. + * + * @see Immutable + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.CLASS) +@SdkProtectedApi +public @interface Mutable { +} diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java index 52e49e587807..4b18c8231f25 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java @@ -26,40 +26,45 @@ import org.junit.Test; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import software.amazon.awssdk.utils.Md5Utils; public class S3TransferManagerDownloadIntegrationTest extends S3IntegrationTestBase { private static final String BUCKET = temporaryBucketName(S3TransferManagerDownloadIntegrationTest.class); private static final String KEY = "key"; - private static S3TransferManager transferManager; + private static final int OBJ_SIZE = 16 * 1024 * 1024; + private static S3TransferManager tm; private static File file; @BeforeClass public static void setup() throws IOException { createBucket(BUCKET); - file = new RandomTempFile(10_000); + file = new RandomTempFile(OBJ_SIZE); s3.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .build(), file.toPath()); - transferManager = S3TransferManager.builder() - .s3ClientConfiguration(b -> b.region(DEFAULT_REGION) + tm = S3TransferManager.builder() + .s3ClientConfiguration(b -> b.region(DEFAULT_REGION) .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)) - .build(); + .build(); } @AfterClass public static void cleanup() { deleteBucketAndAllContents(BUCKET); - transferManager.close(); + tm.close(); S3IntegrationTestBase.cleanUp(); } @Test public void download_shouldWork() throws IOException { Path path = RandomTempFile.randomUncreatedFile().toPath(); - Download download = transferManager.download(b -> b.getObjectRequest(r -> r.bucket(BUCKET).key(KEY)) - .destination(path)); + Download download = tm.download(DownloadRequest.builder() + .getObjectRequest(b -> b.bucket(BUCKET).key(KEY)) + .destination(path) + .overrideConfiguration(b -> b.addListener(LoggingTransferListener.create())) + .build()); CompletedDownload completedDownload = download.completionFuture().join(); assertThat(Md5Utils.md5AsBase64(path.toFile())).isEqualTo(Md5Utils.md5AsBase64(file)); assertThat(completedDownload.response().responseMetadata().requestId()).isNotNull(); diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java index bd006f9511e5..f0165db0f67d 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + import java.io.IOException; import java.nio.file.Files; import org.junit.AfterClass; @@ -24,14 +25,15 @@ import org.junit.Test; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.sync.ResponseTransformer; -import software.amazon.awssdk.transfer.s3.util.ChecksumUtils; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; +import software.amazon.awssdk.transfer.s3.util.ChecksumUtils; public class S3TransferManagerUploadIntegrationTest extends S3IntegrationTestBase { private static final String TEST_BUCKET = temporaryBucketName(S3TransferManagerUploadIntegrationTest.class); - private static final String TEST_KEY = "8mib_file.dat"; - private static final int OBJ_SIZE = 8 * 1024 * 1024; + private static final String TEST_KEY = "16mib_file.dat"; + private static final int OBJ_SIZE = 16 * 1024 * 1024; private static RandomTempFile testFile; private static S3TransferManager tm; @@ -64,6 +66,7 @@ public void upload_fileSentCorrectly() throws IOException { Upload upload = tm.upload(UploadRequest.builder() .putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY)) .source(testFile.toPath()) + .overrideConfiguration(b -> b.addListener(LoggingTransferListener.create())) .build()); CompletedUpload completedUpload = upload.completionFuture().join(); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Download.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Download.java index 69319fd196b6..a9fd3e498e06 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Download.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Download.java @@ -18,6 +18,7 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; /** * A download transfer of a single object from S3. @@ -28,4 +29,9 @@ public interface Download extends Transfer { @Override CompletableFuture completionFuture(); + + /** + * The stateful {@link TransferProgress} associated with this transfer. + */ + TransferProgress progress(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/DownloadRequest.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/DownloadRequest.java index 5137827628df..6b85372b8e66 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/DownloadRequest.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/DownloadRequest.java @@ -18,11 +18,13 @@ import java.io.File; import java.nio.file.Path; import java.util.Objects; +import java.util.Optional; import java.util.function.Consumer; import software.amazon.awssdk.annotations.NotThreadSafe; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.utils.ToString; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.CopyableBuilder; import software.amazon.awssdk.utils.builder.ToCopyableBuilder; @@ -35,10 +37,12 @@ public final class DownloadRequest implements TransferRequest, ToCopyableBuilder { private final Path destination; private final GetObjectRequest getObjectRequest; + private final TransferRequestOverrideConfiguration overrideConfiguration; private DownloadRequest(BuilderImpl builder) { this.destination = Validate.paramNotNull(builder.destination, "destination"); this.getObjectRequest = Validate.paramNotNull(builder.getObjectRequest, "getObjectRequest"); + this.overrideConfiguration = builder.configuration; } /** @@ -72,6 +76,23 @@ public GetObjectRequest getObjectRequest() { return getObjectRequest; } + /** + * @return the optional override configuration + * @see Builder#overrideConfiguration(TransferRequestOverrideConfiguration) + */ + public Optional overrideConfiguration() { + return Optional.ofNullable(overrideConfiguration); + } + + @Override + public String toString() { + return ToString.builder("DownloadRequest") + .add("destination", destination) + .add("getObjectRequest", getObjectRequest) + .add("overrideConfiguration", overrideConfiguration) + .build(); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -86,13 +107,17 @@ public boolean equals(Object o) { if (!Objects.equals(destination, that.destination)) { return false; } - return Objects.equals(getObjectRequest, that.getObjectRequest); + if (!Objects.equals(getObjectRequest, that.getObjectRequest)) { + return false; + } + return Objects.equals(overrideConfiguration, that.overrideConfiguration); } @Override public int hashCode() { int result = destination != null ? destination.hashCode() : 0; result = 31 * result + (getObjectRequest != null ? getObjectRequest.hashCode() : 0); + result = 31 * result + (overrideConfiguration != null ? overrideConfiguration.hashCode() : 0); return result; } @@ -157,6 +182,31 @@ default Builder getObjectRequest(Consumer getObjectReq return this; } + /** + * Add an optional request override configuration. + * + * @param configuration The override configuration. + * @return This builder for method chaining. + */ + Builder overrideConfiguration(TransferRequestOverrideConfiguration configuration); + + /** + * Similar to {@link #overrideConfiguration(TransferRequestOverrideConfiguration)}, but takes a lambda to configure a new + * {@link TransferRequestOverrideConfiguration.Builder}. This removes the need to call + * {@link TransferRequestOverrideConfiguration#builder()} and + * {@link TransferRequestOverrideConfiguration.Builder#build()}. + * + * @param configurationBuilder the upload configuration + * @return this builder for method chaining. + * @see #overrideConfiguration(TransferRequestOverrideConfiguration) + */ + default Builder overrideConfiguration(Consumer configurationBuilder) { + Validate.paramNotNull(configurationBuilder, "configurationBuilder"); + return overrideConfiguration(TransferRequestOverrideConfiguration.builder() + .applyMutation(configurationBuilder) + .build()); + } + /** * @return The built request. */ @@ -167,6 +217,7 @@ default Builder getObjectRequest(Consumer getObjectReq private static final class BuilderImpl implements Builder { private Path destination; private GetObjectRequest getObjectRequest; + private TransferRequestOverrideConfiguration configuration; private BuilderImpl() { } @@ -199,6 +250,20 @@ public void setGetObjectRequest(GetObjectRequest getObjectRequest) { getObjectRequest(getObjectRequest); } + @Override + public Builder overrideConfiguration(TransferRequestOverrideConfiguration configuration) { + this.configuration = configuration; + return this; + } + + public void setOverrideConfiguration(TransferRequestOverrideConfiguration configuration) { + overrideConfiguration(configuration); + } + + public TransferRequestOverrideConfiguration getOverrideConfiguration() { + return configuration; + } + @Override public DownloadRequest build() { return new DownloadRequest(this); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/TransferRequestOverrideConfiguration.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/TransferRequestOverrideConfiguration.java new file mode 100644 index 000000000000..bbbdacaaeeae --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/TransferRequestOverrideConfiguration.java @@ -0,0 +1,158 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.ToString; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * Configuration options for {@link UploadRequest} and {@link DownloadRequest}. All values are optional, and not specifying them + * will use the SDK default values. + * + *

Use {@link #builder()} to create a set of options. + */ +@SdkPublicApi +@SdkPreviewApi +public final class TransferRequestOverrideConfiguration + implements ToCopyableBuilder { + + private final List listeners; + + public TransferRequestOverrideConfiguration(DefaultBuilder builder) { + this.listeners = builder.listeners != null ? Collections.unmodifiableList(builder.listeners) : Collections.emptyList(); + } + + /** + * @return The {@link TransferListener}s that will be notified as part of this request. + */ + public List listeners() { + return listeners; + } + + @Override + public Builder toBuilder() { + return new DefaultBuilder(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TransferRequestOverrideConfiguration that = (TransferRequestOverrideConfiguration) o; + + return Objects.equals(listeners, that.listeners); + } + + @Override + public int hashCode() { + return listeners != null ? listeners.hashCode() : 0; + } + + @Override + public String toString() { + return ToString.builder("TransferRequestOverrideConfiguration") + .add("listeners", listeners) + .build(); + } + + public static Builder builder() { + return new DefaultBuilder(); + } + + public static Class serializableBuilderClass() { + return DefaultBuilder.class; + } + + public interface Builder extends CopyableBuilder { + + /** + * The {@link TransferListener}s that will be notified as part of this request. This method overrides and replaces any + * listeners that have already been set. Add an optional request override configuration. + * + * @param listeners the collection of listeners + * @param configuration The override configuration. + * @return Returns a reference to this object so that method calls can be chained together. + * @return This builder for method chaining. + * @see TransferListener + */ + Builder listeners(Collection listeners); + + /** + * Add a {@link TransferListener} that will be notified as part of this request. + * + * @param listener the listener to add + * @return Returns a reference to this object so that method calls can be chained together. + * @see TransferListener + */ + Builder addListener(TransferListener listener); + + @Override + TransferRequestOverrideConfiguration build(); + } + + private static final class DefaultBuilder implements Builder { + private List listeners; + + private DefaultBuilder(TransferRequestOverrideConfiguration configuration) { + this.listeners = configuration.listeners; + } + + private DefaultBuilder() { + } + + @Override + public Builder listeners(Collection listeners) { + this.listeners = new ArrayList<>(listeners); + return this; + } + + @Override + public Builder addListener(TransferListener listener) { + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(listener); + return this; + } + + public List getListeners() { + return listeners; + } + + public void setListeners(Collection listeners) { + listeners(listeners); + } + + @Override + public TransferRequestOverrideConfiguration build() { + return new TransferRequestOverrideConfiguration(this); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Upload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Upload.java index ba862ea3ba36..1c7b56c17d2f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Upload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/Upload.java @@ -18,6 +18,7 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPreviewApi; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; /** * An upload transfer of a single object to S3. @@ -28,4 +29,8 @@ public interface Upload extends Transfer { @Override CompletableFuture completionFuture(); + /** + * The stateful {@link TransferProgress} associated with this transfer. + */ + TransferProgress progress(); } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/UploadRequest.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/UploadRequest.java index d475f38f20f7..c77951eca233 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/UploadRequest.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/UploadRequest.java @@ -20,6 +20,7 @@ import java.io.File; import java.nio.file.Path; import java.util.Objects; +import java.util.Optional; import java.util.function.Consumer; import software.amazon.awssdk.annotations.NotThreadSafe; import software.amazon.awssdk.annotations.SdkPreviewApi; @@ -39,10 +40,12 @@ public final class UploadRequest implements TransferRequest, ToCopyableBuilder { private final PutObjectRequest putObjectRequest; private final Path source; + private final TransferRequestOverrideConfiguration overrideConfiguration; private UploadRequest(BuilderImpl builder) { this.putObjectRequest = paramNotNull(builder.putObjectRequest, "putObjectRequest"); this.source = paramNotNull(builder.source, "source"); + this.overrideConfiguration = builder.configuration; } /** @@ -61,6 +64,14 @@ public Path source() { return source; } + /** + * @return the optional override configuration + * @see Builder#overrideConfiguration(TransferRequestOverrideConfiguration) + */ + public Optional overrideConfiguration() { + return Optional.ofNullable(overrideConfiguration); + } + /** * Create a builder that can be used to create a {@link UploadRequest}. * @@ -79,6 +90,15 @@ public Builder toBuilder() { return new BuilderImpl(); } + @Override + public String toString() { + return ToString.builder("UploadRequest") + .add("putObjectRequest", putObjectRequest) + .add("source", source) + .add("overrideConfiguration", overrideConfiguration) + .build(); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -93,24 +113,20 @@ public boolean equals(Object o) { if (!Objects.equals(putObjectRequest, that.putObjectRequest)) { return false; } - return Objects.equals(source, that.source); + if (!Objects.equals(source, that.source)) { + return false; + } + return Objects.equals(overrideConfiguration, that.overrideConfiguration); } @Override public int hashCode() { int result = putObjectRequest != null ? putObjectRequest.hashCode() : 0; result = 31 * result + (source != null ? source.hashCode() : 0); + result = 31 * result + (overrideConfiguration != null ? overrideConfiguration.hashCode() : 0); return result; } - @Override - public String toString() { - return ToString.builder("UploadRequest") - .add("putObjectRequest", putObjectRequest) - .add("source", source) - .build(); - } - /** * A builder for a {@link UploadRequest}, created with {@link #builder()} */ @@ -167,6 +183,30 @@ default Builder putObjectRequest(Consumer putObjectReq .build()); } + /** + * Add an optional request override configuration. + * + * @param configuration The override configuration. + * @return This builder for method chaining. + */ + Builder overrideConfiguration(TransferRequestOverrideConfiguration configuration); + + /** + * Similar to {@link #overrideConfiguration(TransferRequestOverrideConfiguration)}, but takes a lambda to configure a new + * {@link TransferRequestOverrideConfiguration.Builder}. This removes the need to call {@link + * TransferRequestOverrideConfiguration#builder()} and {@link TransferRequestOverrideConfiguration.Builder#build()}. + * + * @param configurationBuilder the upload configuration + * @return this builder for method chaining. + * @see #overrideConfiguration(TransferRequestOverrideConfiguration) + */ + default Builder overrideConfiguration(Consumer configurationBuilder) { + Validate.paramNotNull(configurationBuilder, "configurationBuilder"); + return overrideConfiguration(TransferRequestOverrideConfiguration.builder() + .applyMutation(configurationBuilder) + .build()); + } + /** * @return The built request. */ @@ -177,6 +217,7 @@ default Builder putObjectRequest(Consumer putObjectReq private static class BuilderImpl implements Builder { private PutObjectRequest putObjectRequest; private Path source; + private TransferRequestOverrideConfiguration configuration; @Override public Builder source(Path source) { @@ -206,6 +247,20 @@ public void setPutObjectRequest(PutObjectRequest putObjectRequest) { putObjectRequest(putObjectRequest); } + @Override + public Builder overrideConfiguration(TransferRequestOverrideConfiguration configuration) { + this.configuration = configuration; + return this; + } + + public void setOverrideConfiguration(TransferRequestOverrideConfiguration configuration) { + overrideConfiguration(configuration); + } + + public TransferRequestOverrideConfiguration getOverrideConfiguration() { + return configuration; + } + @Override public UploadRequest build() { return new UploadRequest(this); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultDownload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultDownload.java index 8d112c4a3190..0c5611759f0a 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultDownload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultDownload.java @@ -19,17 +19,25 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.transfer.s3.CompletedDownload; import software.amazon.awssdk.transfer.s3.Download; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; @SdkInternalApi public final class DefaultDownload implements Download { private final CompletableFuture completionFuture; + private final TransferProgress progress; - public DefaultDownload(CompletableFuture completionFuture) { + public DefaultDownload(CompletableFuture completionFuture, TransferProgress progress) { this.completionFuture = completionFuture; + this.progress = progress; } @Override public CompletableFuture completionFuture() { return completionFuture; } + + @Override + public TransferProgress progress() { + return progress; + } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java index 4f33172bc1b4..dd9e83dfe30c 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java @@ -27,7 +27,6 @@ import software.amazon.awssdk.services.s3.internal.resource.S3ArnConverter; import software.amazon.awssdk.services.s3.internal.resource.S3Resource; import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedDownload; import software.amazon.awssdk.transfer.s3.CompletedUpload; @@ -40,6 +39,7 @@ import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.UploadDirectoryTransfer; import software.amazon.awssdk.transfer.s3.UploadRequest; +import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Validate; @@ -91,28 +91,41 @@ private S3CrtAsyncClient initializeS3CrtClient(DefaultBuilder tmBuilder) { @Override public Upload upload(UploadRequest uploadRequest) { + Validate.paramNotNull(uploadRequest, "uploadRequest"); + + AsyncRequestBody requestBody = requestBodyFor(uploadRequest); + + CompletableFuture uploadFuture = new CompletableFuture<>(); + + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody); + progressUpdater.transferInitiated(); + requestBody = progressUpdater.wrapRequestBody(requestBody); + progressUpdater.registerCompletion(uploadFuture); + try { - Validate.paramNotNull(uploadRequest, "uploadRequest"); assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload"); - PutObjectRequest putObjectRequest = uploadRequest.putObjectRequest(); - AsyncRequestBody requestBody = requestBodyFor(uploadRequest); - - CompletableFuture putObjFuture = s3CrtAsyncClient.putObject(putObjectRequest, requestBody); + CompletableFuture putObjFuture = + s3CrtAsyncClient.putObject(uploadRequest.putObjectRequest(), requestBody); + + // Forward upload cancellation to CRT future + CompletableFutureUtils.forwardExceptionTo(uploadFuture, putObjFuture); - CompletableFuture future = putObjFuture.thenApply(r -> CompletedUpload.builder() - .response(r) - .build()); - return new DefaultUpload(CompletableFutureUtils.forwardExceptionTo(future, putObjFuture)); + CompletableFutureUtils.forwardTransformedResultTo(putObjFuture, uploadFuture, r -> CompletedUpload.builder() + .response(r) + .build()); } catch (Throwable throwable) { - return new DefaultUpload(CompletableFutureUtils.failedFuture(throwable)); + uploadFuture.completeExceptionally(throwable); } + + return new DefaultUpload(uploadFuture, progressUpdater.progress()); } @Override public UploadDirectoryTransfer uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) { + Validate.paramNotNull(uploadDirectoryRequest, "uploadDirectoryRequest"); + try { - Validate.paramNotNull(uploadDirectoryRequest, "uploadDirectoryRequest"); assertNotUnsupportedArn(uploadDirectoryRequest.bucket(), "uploadDirectory"); return uploadDirectoryManager.uploadDirectory(uploadDirectoryRequest); @@ -123,20 +136,35 @@ public UploadDirectoryTransfer uploadDirectory(UploadDirectoryRequest uploadDire @Override public Download download(DownloadRequest downloadRequest) { + Validate.paramNotNull(downloadRequest, "downloadRequest"); + + AsyncResponseTransformer responseTransformer = + AsyncResponseTransformer.toFile(downloadRequest.destination()); + + CompletableFuture downloadFuture = new CompletableFuture<>(); + + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest); + progressUpdater.transferInitiated(); + responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer); + progressUpdater.registerCompletion(downloadFuture); + try { - Validate.paramNotNull(downloadRequest, "downloadRequest"); assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download"); CompletableFuture getObjectFuture = - s3CrtAsyncClient.getObject(downloadRequest.getObjectRequest(), - AsyncResponseTransformer.toFile(downloadRequest.destination())); - CompletableFuture future = - getObjectFuture.thenApply(r -> CompletedDownload.builder().response(r).build()); + s3CrtAsyncClient.getObject(downloadRequest.getObjectRequest(), responseTransformer); + + // Forward download cancellation to CRT future + CompletableFutureUtils.forwardExceptionTo(downloadFuture, getObjectFuture); - return new DefaultDownload(CompletableFutureUtils.forwardExceptionTo(future, getObjectFuture)); + CompletableFutureUtils.forwardTransformedResultTo(getObjectFuture, downloadFuture, r -> CompletedDownload.builder() + .response(r) + .build()); } catch (Throwable throwable) { - return new DefaultDownload(CompletableFutureUtils.failedFuture(throwable)); + downloadFuture.completeExceptionally(throwable); } + + return new DefaultDownload(downloadFuture, progressUpdater.progress()); } @Override diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultUpload.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultUpload.java index 4ac6dbe7e78f..f1423adf0b37 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultUpload.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultUpload.java @@ -19,17 +19,25 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.transfer.s3.CompletedUpload; import software.amazon.awssdk.transfer.s3.Upload; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; @SdkInternalApi public final class DefaultUpload implements Upload { - private final CompletableFuture completionFeature; + private final CompletableFuture completionFuture; + private final TransferProgress progress; - public DefaultUpload(CompletableFuture completionFeature) { - this.completionFeature = completionFeature; + public DefaultUpload(CompletableFuture completionFuture, TransferProgress progress) { + this.completionFuture = completionFuture; + this.progress = progress; } @Override public CompletableFuture completionFuture() { - return completionFeature; + return completionFuture; + } + + @Override + public TransferProgress progress() { + return progress; } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgress.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgress.java new file mode 100644 index 000000000000..fd372fcaf409 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgress.java @@ -0,0 +1,65 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import software.amazon.awssdk.annotations.Mutable; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot.Builder; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.ToString; + +/** + * An SDK-internal implementation of {@link TransferProgress}. This implementation acts as a thin wrapper around {@link + * AtomicReference}, where calls to get the latest {@link #snapshot()} simply return the latest reference, while {@link + * TransferProgressUpdater} is responsible for continuously updating the latest reference. + * + * @see TransferProgress + */ +@Mutable +@ThreadSafe +@SdkInternalApi +public final class DefaultTransferProgress implements TransferProgress { + + private final AtomicReference snapshot; + + public DefaultTransferProgress(TransferProgressSnapshot snapshot) { + this.snapshot = new AtomicReference<>(snapshot); + } + + /** + * Atomically convert the current snapshot reference to its {@link Builder}, perform updates using the provided {@link + * Consumer}, and save the result as the latest snapshot. + */ + public TransferProgressSnapshot updateAndGet(Consumer updater) { + return this.snapshot.updateAndGet(s -> ((DefaultTransferProgressSnapshot) s).copy(updater)); + } + + @Override + public TransferProgressSnapshot snapshot() { + return snapshot.get(); + } + + @Override + public String toString() { + return ToString.builder("TransferProgress") + .add("snapshot", snapshot) + .build(); + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java new file mode 100644 index 000000000000..0a713376a3df --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java @@ -0,0 +1,123 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.util.Optional; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.ToString; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * An SDK-internal implementation of {@link TransferProgressSnapshot}. + */ +@SdkInternalApi +public final class DefaultTransferProgressSnapshot + implements ToCopyableBuilder, + TransferProgressSnapshot { + + private final long bytesTransferred; + private final Long transferSizeInBytes; + + private DefaultTransferProgressSnapshot(Builder builder) { + if (builder.transferSizeInBytes != null) { + Validate.isNotNegative(builder.transferSizeInBytes, "transferSizeInBytes"); + Validate.isTrue(builder.bytesTransferred <= builder.transferSizeInBytes, + "bytesTransferred (%s) must not be greater than transferSizeInBytes (%s)", + builder.bytesTransferred, builder.transferSizeInBytes); + } + this.bytesTransferred = Validate.isNotNegative(builder.bytesTransferred, "bytesTransferred"); + this.transferSizeInBytes = builder.transferSizeInBytes; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public long bytesTransferred() { + return bytesTransferred; + } + + @Override + public Optional transferSizeInBytes() { + return Optional.ofNullable(transferSizeInBytes); + } + + @Override + public Optional ratioTransferred() { + return transferSizeInBytes() + .map(Long::doubleValue) + .map(size -> (size == 0) ? 1.0 : (bytesTransferred / size)); + } + + @Override + public Optional bytesRemaining() { + return transferSizeInBytes().map(size -> size - bytesTransferred); + } + + @Override + public String toString() { + return ToString.builder("TransferProgressSnapshot") + .add("bytesTransferred", bytesTransferred) + .add("transferSizeInBytes", transferSizeInBytes) + .build(); + } + + public static final class Builder implements CopyableBuilder { + private long bytesTransferred = 0L; + private Long transferSizeInBytes; + + private Builder() { + super(); + } + + private Builder(DefaultTransferProgressSnapshot snapshot) { + this.bytesTransferred = snapshot.bytesTransferred; + this.transferSizeInBytes = snapshot.transferSizeInBytes; + } + + public Builder bytesTransferred(long bytesTransferred) { + this.bytesTransferred = bytesTransferred; + return this; + } + + public long getBytesTransferred() { + return bytesTransferred; + } + + public Builder transferSizeInBytes(Long transferSizeInBytes) { + this.transferSizeInBytes = transferSizeInBytes; + return this; + } + + public Long getTransferSizeInBytes() { + return transferSizeInBytes; + } + + @Override + public DefaultTransferProgressSnapshot build() { + return new DefaultTransferProgressSnapshot(this); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncRequestBody.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncRequestBody.java new file mode 100644 index 000000000000..86d4ac9f60cb --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncRequestBody.java @@ -0,0 +1,93 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.nio.ByteBuffer; +import java.util.Optional; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; + +@SdkInternalApi +public class NotifyingAsyncRequestBody implements AsyncRequestBody { + + public interface AsyncRequestBodyListener { + default void beforeSubscribe(Subscriber subscriber) { + } + + default void beforeOnNext(ByteBuffer byteBuffer) { + } + } + + private final AsyncRequestBody delegate; + private final AsyncRequestBodyListener listener; + + public NotifyingAsyncRequestBody(AsyncRequestBody delegate, + AsyncRequestBodyListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public Optional contentLength() { + return delegate.contentLength(); + } + + @Override + public String contentType() { + return delegate.contentType(); + } + + @Override + public void subscribe(Subscriber subscriber) { + listener.beforeSubscribe(subscriber); + delegate.subscribe(new NotifyingSubscriber(subscriber, listener)); + } + + @SdkInternalApi + private static final class NotifyingSubscriber implements Subscriber { + private final Subscriber delegate; + private final AsyncRequestBodyListener listener; + + NotifyingSubscriber(Subscriber delegate, + AsyncRequestBodyListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + listener.beforeOnNext(byteBuffer); + delegate.onNext(byteBuffer); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncResponseTransformer.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncResponseTransformer.java new file mode 100644 index 000000000000..2af6d4c6750b --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncResponseTransformer.java @@ -0,0 +1,119 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; + +@SdkInternalApi +public class NotifyingAsyncResponseTransformer implements AsyncResponseTransformer { + + public interface AsyncResponseTransformerListener { + default void beforeOnResponse(ResponseT response) { + } + + default void beforeSubscribe(Subscriber subscriber) { + } + + default void beforeOnNext(ByteBuffer byteBuffer) { + } + } + + private final AsyncResponseTransformer delegate; + private final AsyncResponseTransformerListener listener; + + public NotifyingAsyncResponseTransformer(AsyncResponseTransformer delegate, + AsyncResponseTransformerListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public CompletableFuture prepare() { + return delegate.prepare(); + } + + @Override + public void onResponse(ResponseT response) { + listener.beforeOnResponse(response); + delegate.onResponse(response); + } + + public void onStream(SdkPublisher publisher) { + delegate.onStream(new NotifyingPublisher<>(publisher, listener)); + } + + @Override + public void exceptionOccurred(Throwable error) { + delegate.exceptionOccurred(error); + } + + @SdkInternalApi + private static final class NotifyingPublisher implements SdkPublisher { + private final SdkPublisher delegate; + private final AsyncResponseTransformerListener listener; + + NotifyingPublisher(SdkPublisher delegate, + AsyncResponseTransformerListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public void subscribe(Subscriber s) { + listener.beforeSubscribe(s); + delegate.subscribe(new NotifyingSubscriber<>(s, listener)); + } + } + + @SdkInternalApi + private static final class NotifyingSubscriber implements Subscriber { + private final Subscriber delegate; + private final AsyncResponseTransformerListener listener; + + NotifyingSubscriber(Subscriber delegate, + AsyncResponseTransformerListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + listener.beforeOnNext(byteBuffer); + delegate.onNext(byteBuffer); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java new file mode 100644 index 000000000000..4437bb601ae4 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerContext.java @@ -0,0 +1,117 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import software.amazon.awssdk.annotations.Immutable; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.transfer.s3.CompletedTransfer; +import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.ToString; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * An SDK-internal implementation of {@link TransferComplete} and its parent interfaces. + * + * @see TransferListenerFailedContext + */ +@SdkProtectedApi +@Immutable +public final class TransferListenerContext + implements TransferListener.Context.TransferComplete, + ToCopyableBuilder { + + private final TransferRequest request; + private final TransferProgressSnapshot progressSnapshot; + private final CompletedTransfer completedTransfer; + + private TransferListenerContext(Builder builder) { + this.request = builder.request; + this.progressSnapshot = builder.progressSnapshot; + this.completedTransfer = builder.completedTransfer; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public TransferRequest request() { + return request; + } + + @Override + public TransferProgressSnapshot progressSnapshot() { + return progressSnapshot; + } + + @Override + public CompletedTransfer completedTransfer() { + return completedTransfer; + } + + @Override + public String toString() { + return ToString.builder("TransferListenerContext") + .add("request", request) + .add("progressSnapshot", progressSnapshot) + .add("completedTransfer", completedTransfer) + .build(); + } + + public static final class Builder implements CopyableBuilder { + private TransferRequest request; + private TransferProgressSnapshot progressSnapshot; + private CompletedTransfer completedTransfer; + + private Builder() { + super(); + } + + private Builder(TransferListenerContext context) { + this.request = context.request; + this.progressSnapshot = context.progressSnapshot; + this.completedTransfer = context.completedTransfer; + } + + public Builder request(TransferRequest request) { + this.request = request; + return this; + } + + public Builder progressSnapshot(TransferProgressSnapshot progressSnapshot) { + this.progressSnapshot = progressSnapshot; + return this; + } + + public Builder completedTransfer(CompletedTransfer completedTransfer) { + this.completedTransfer = completedTransfer; + return this; + } + + @Override + public TransferListenerContext build() { + return new TransferListenerContext(this); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerFailedContext.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerFailedContext.java new file mode 100644 index 000000000000..46a55cb5402a --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerFailedContext.java @@ -0,0 +1,114 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.util.concurrent.CompletionException; +import software.amazon.awssdk.annotations.Immutable; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.ToString; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.CopyableBuilder; +import software.amazon.awssdk.utils.builder.ToCopyableBuilder; + +/** + * An SDK-internal implementation of {@link TransferListener.Context.TransferFailed}. + * + * @see TransferListenerContext + */ +@SdkInternalApi +@Immutable +public class TransferListenerFailedContext + implements TransferListener.Context.TransferFailed, + ToCopyableBuilder { + + private final TransferListenerContext transferContext; + private final Throwable exception; + + private TransferListenerFailedContext(Builder builder) { + this.exception = unwrap(Validate.paramNotNull(builder.exception, "exception")); + this.transferContext = Validate.paramNotNull(builder.transferContext, "transferContext"); + } + + private Throwable unwrap(Throwable exception) { + while (exception instanceof CompletionException) { + exception = exception.getCause(); + } + return exception; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public TransferRequest request() { + return transferContext.request(); + } + + @Override + public TransferProgressSnapshot progressSnapshot() { + return transferContext.progressSnapshot(); + } + + @Override + public Throwable exception() { + return exception; + } + + @Override + public String toString() { + return ToString.builder("TransferListenerFailedContext") + .add("transferContext", transferContext) + .add("exception", exception) + .build(); + } + + public static final class Builder implements CopyableBuilder { + private TransferListenerContext transferContext; + private Throwable exception; + + private Builder() { + } + + public Builder(TransferListenerFailedContext failedContext) { + this.exception = failedContext.exception; + this.transferContext = failedContext.transferContext; + } + + public Builder exception(Throwable exception) { + this.exception = exception; + return this; + } + + public Builder transferContext(TransferListenerContext transferContext) { + this.transferContext = transferContext; + return this; + } + + @Override + public TransferListenerFailedContext build() { + return new TransferListenerFailedContext(this); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java new file mode 100644 index 000000000000..183592bb54de --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java @@ -0,0 +1,67 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError; + +import java.util.List; +import java.util.function.Consumer; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * An SDK-internal helper class that composes multiple provided {@link TransferListener}s together into a single logical chain. + * Invocations on {@link TransferListenerInvoker} will be delegated to the underlying chain, while suppressing (and logging) any + * exceptions that are thrown. + */ +@SdkInternalApi +public class TransferListenerInvoker implements TransferListener { + private static final Logger log = Logger.loggerFor(TransferListener.class); + private final List listeners; + + public TransferListenerInvoker(List listeners) { + this.listeners = Validate.paramNotNull(listeners, "listeners"); + } + + @Override + public void transferInitiated(Context.TransferInitiated context) { + forEach(listener -> listener.transferInitiated(context)); + } + + @Override + public void bytesTransferred(Context.BytesTransferred context) { + forEach(listener -> listener.bytesTransferred(context)); + } + + @Override + public void transferComplete(Context.TransferComplete context) { + forEach(listener -> listener.transferComplete(context)); + } + + @Override + public void transferFailed(Context.TransferFailed context) { + forEach(listener -> listener.transferFailed(context)); + } + + private void forEach(Consumer action) { + for (TransferListener listener : listeners) { + runAndLogError(log.logger(), "Exception thrown in TransferListener, ignoring", + () -> action.accept(listener)); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java new file mode 100644 index 000000000000..cfc7469a35ca --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -0,0 +1,155 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal.progress; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.transfer.s3.CompletedTransfer; +import software.amazon.awssdk.transfer.s3.DownloadRequest; +import software.amazon.awssdk.transfer.s3.TransferRequestOverrideConfiguration; +import software.amazon.awssdk.transfer.s3.UploadRequest; +import software.amazon.awssdk.transfer.s3.internal.progress.NotifyingAsyncRequestBody.AsyncRequestBodyListener; +import software.amazon.awssdk.transfer.s3.internal.progress.NotifyingAsyncResponseTransformer.AsyncResponseTransformerListener; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.transfer.s3.progress.TransferProgress; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; + +/** + * An SDK-internal helper class that facilitates updating a {@link TransferProgress} and invoking {@link TransferListener}s. + */ +@SdkInternalApi +public class TransferProgressUpdater { + + private final DefaultTransferProgress progress; + private final TransferListenerContext context; + private final TransferListenerInvoker listeners; + + public TransferProgressUpdater(UploadRequest request, AsyncRequestBody requestBody) { + DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); + getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes); + TransferProgressSnapshot snapshot = snapshotBuilder.build(); + progress = new DefaultTransferProgress(snapshot); + context = TransferListenerContext.builder() + .request(request) + .progressSnapshot(snapshot) + .build(); + listeners = new TransferListenerInvoker(request.overrideConfiguration() + .map(TransferRequestOverrideConfiguration::listeners) + .orElseGet(Collections::emptyList)); + } + + public TransferProgressUpdater(DownloadRequest request) { + TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder().build(); + progress = new DefaultTransferProgress(snapshot); + context = TransferListenerContext.builder() + .request(request) + .progressSnapshot(snapshot) + .build(); + listeners = new TransferListenerInvoker(request.overrideConfiguration() + .map(TransferRequestOverrideConfiguration::listeners) + .orElseGet(Collections::emptyList)); + } + + public TransferProgress progress() { + return progress; + } + + public void transferInitiated() { + listeners.transferInitiated(context); + } + + public AsyncRequestBody wrapRequestBody(AsyncRequestBody requestBody) { + return new NotifyingAsyncRequestBody( + requestBody, + new AsyncRequestBodyListener() { + @Override + public void beforeSubscribe(Subscriber subscriber) { + progress.updateAndGet(b -> b.bytesTransferred(0)); + } + + @Override + public void beforeOnNext(ByteBuffer byteBuffer) { + TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { + b.bytesTransferred(b.getBytesTransferred() + byteBuffer.limit()); + }); + listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + } + }); + } + + public AsyncResponseTransformer wrapResponseTransformer( + AsyncResponseTransformer responseTransformer) { + return new NotifyingAsyncResponseTransformer<>( + responseTransformer, + new AsyncResponseTransformerListener() { + @Override + public void beforeOnResponse(GetObjectResponse response) { + if (response.contentLength() != null) { + progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength())); + } + } + + @Override + public void beforeSubscribe(Subscriber subscriber) { + progress.updateAndGet(b -> b.bytesTransferred(0)); + } + + @Override + public void beforeOnNext(ByteBuffer byteBuffer) { + TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { + b.bytesTransferred(b.getBytesTransferred() + byteBuffer.limit()); + }); + listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + } + }); + } + + public void registerCompletion(CompletableFuture future) { + future.whenComplete((r, t) -> { + if (t == null) { + listeners.transferComplete(context.copy(b -> { + b.progressSnapshot(progress.snapshot()); + b.completedTransfer(r); + })); + } else { + listeners.transferFailed(TransferListenerFailedContext.builder() + .transferContext(context.copy(b -> { + b.progressSnapshot(progress.snapshot()); + })) + .exception(t) + .build()); + } + }); + } + + private static Optional getContentLengthSafe(AsyncRequestBody requestBody) { + // requestBody.contentLength() may throw if the file does not exist. + // We ignore any potential exception here to defer failure + // to the s3CrtAsyncClient call and its associated future. + try { + return requestBody.contentLength(); + } catch (Exception ignored) { + return Optional.empty(); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListener.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListener.java new file mode 100644 index 000000000000..10da1487f376 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListener.java @@ -0,0 +1,105 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.progress; + +import static software.amazon.awssdk.utils.StringUtils.repeat; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.concurrent.atomic.AtomicInteger; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.utils.Logger; + +/** + * An example implementation of {@link TransferListener} that logs a progress bar at the {@code INFO} level. This implementation + * effectively rate-limits how frequently updates are logged by only logging when a new "tick" advances in the progress bar. By + * default, the progress bar has {@value #DEFAULT_MAX_TICKS} ticks, meaning an update is only logged, at most, once every 5%. + */ +@SdkPublicApi +@SdkPreviewApi +public class LoggingTransferListener implements TransferListener { + private static final Logger log = Logger.loggerFor(LoggingTransferListener.class); + private static final int DEFAULT_MAX_TICKS = 20; + private final ProgressBar progressBar; + + private LoggingTransferListener(int maxTicks) { + progressBar = new ProgressBar(maxTicks); + } + + /** + * Create an instance of {@link LoggingTransferListener} with a custom {@code maxTicks} value. + * + * @param maxTicks the number of ticks in the logged progress bar + */ + public static LoggingTransferListener create(int maxTicks) { + return new LoggingTransferListener(maxTicks); + } + + /** + * Create an instance of {@link LoggingTransferListener} with the default configuration. + */ + public static LoggingTransferListener create() { + return new LoggingTransferListener(DEFAULT_MAX_TICKS); + } + + @Override + public void transferInitiated(Context.TransferInitiated context) { + log.info(() -> "Transfer initiated..."); + context.progressSnapshot().ratioTransferred().ifPresent(progressBar::update); + } + + @Override + public void bytesTransferred(Context.BytesTransferred context) { + context.progressSnapshot().ratioTransferred().ifPresent(progressBar::update); + } + + @Override + public void transferComplete(Context.TransferComplete context) { + context.progressSnapshot().ratioTransferred().ifPresent(progressBar::update); + log.info(() -> "Transfer complete!"); + } + + @Override + public void transferFailed(Context.TransferFailed context) { + log.warn(() -> "Transfer failed.", context.exception()); + } + + private static class ProgressBar { + private final int maxTicks; + private final AtomicInteger prevTicks = new AtomicInteger(-1); + + ProgressBar(int maxTicks) { + this.maxTicks = maxTicks; + } + + void update(double ratio) { + int ticks = (int) Math.floor(ratio * maxTicks); + if (prevTicks.getAndSet(ticks) != ticks) { + log.info(() -> String.format("|%s%s| %s", + repeat("=", ticks), + repeat(" ", maxTicks - ticks), + round(ratio * 100, 1) + "%")); + } + } + + private static double round(double value, int places) { + BigDecimal bd = BigDecimal.valueOf(value); + bd = bd.setScale(places, RoundingMode.FLOOR); + return bd.doubleValue(); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java new file mode 100644 index 000000000000..8fcfd15c5103 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferListener.java @@ -0,0 +1,273 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.progress; + +import java.util.concurrent.CompletionException; +import software.amazon.awssdk.annotations.Immutable; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.transfer.s3.CompletedDownload; +import software.amazon.awssdk.transfer.s3.CompletedTransfer; +import software.amazon.awssdk.transfer.s3.CompletedUpload; +import software.amazon.awssdk.transfer.s3.DownloadRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.UploadRequest; + +/** + * The {@link TransferListener} interface may be implemented by your application in order to receive event-driven updates on the + * progress of a transfer initiated by {@link S3TransferManager}. When you construct an {@link UploadRequest} or {@link + * DownloadRequest} request to submit to {@link S3TransferManager}, you may provide a variable number of {@link TransferListener}s + * to be associated with that request. Then, throughout the lifecycle of the request, {@link S3TransferManager} will invoke the + * provided {@link TransferListener}s when important events occur, like additional bytes being transferred, allowing you to + * monitor the ongoing progress of the transfer. + *

+ * Each {@link TransferListener} callback is invoked with an immutable {@link Context} object. Depending on the current lifecycle + * of the request, different {@link Context} objects have different attributes available (indicated by the provided context + * interface). Most notably, every callback is given access to the current {@link TransferProgressSnapshot}, which contains + * helpful progress-related methods like {@link TransferProgressSnapshot#bytesTransferred()} and {@link + * TransferProgressSnapshot#ratioTransferred()}. + *

+ * A successful transfer callback lifecycle is sequenced as follows: + *

    + *
  1. {@link #transferInitiated(Context.TransferInitiated)} - A new transfer has been initiated. This method is called + * exactly once per transfer.
  2. + *
      Available context attributes: + *
    • {@link Context.TransferInitiated#request()}
    • + *
    • {@link Context.TransferInitiated#progressSnapshot()}
    • + *
    + *
  3. {@link #bytesTransferred(Context.BytesTransferred)} - Additional bytes have been submitted or received. This method + * may be called many times per transfer, depending on the transfer size and I/O buffer sizes. + *
  4. {@link #transferComplete(Context.TransferComplete)} - The transfer has completed successfully. This method is called + * exactly once for a successful transfer.
  5. + *
      Additional available context attributes: + *
    • {@link Context.TransferComplete#completedTransfer()}
    • + *
    + *
+ * In the case of a failed transfer, both {@link #transferInitiated(Context.TransferInitiated)} and + * {@link #transferFailed(Context.TransferFailed)} will be called exactly once. There are no guarantees on whether any other + * callbacks are invoked. + *

+ * There are a few important rules and best practices that govern the usage of {@link TransferListener}s: + *

    + *
  1. {@link TransferListener} implementations should not block, sleep, or otherwise delay the calling thread. If you need + * to perform blocking operations, you should schedule them in a separate thread or executor that you control.
  2. + *
  3. Be mindful that {@link #bytesTransferred(Context.BytesTransferred)} may be called extremely often (subject to I/O + * buffer sizes). Be careful in implementing expensive operations as a side effect. Consider rate-limiting your side + * effect operations, if needed.
  4. + *
  5. In the case of uploads, there may be some delay between the bytes being fully transferred and the transfer + * successfully completing. Internally, {@link S3TransferManager} uses the Amazon S3 + * multipart upload API + * and must finalize uploads with a {@link CompleteMultipartUploadRequest}.
  6. + *
  7. {@link TransferListener}s may be invoked by different threads. If your {@link TransferListener} is stateful, + * ensure that it is also thread-safe.
  8. + *
  9. {@link TransferListener}s are not intended to be used for control flow, and therefore your implementation + * should not throw. Any thrown exceptions will be suppressed and logged as an error.
  10. + *
+ *

+ * A classical use case of {@link TransferListener} is to create a progress bar to monitor an ongoing transfer's progress. + * Refer to the implementation of {@link LoggingTransferListener} for a basic example, or test it in your application by providing + * the listener as part of your {@link TransferRequest}. E.g., + *

{@code
+ * Upload upload = tm.upload(UploadRequest.builder()
+ *                                        .putObjectRequest(b -> b.bucket("bucket").key("key"))
+ *                                        .source(Paths.get(...))
+ *                                        .overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
+ *                                        .build());
+ * }
+ * And then a successful transfer may output something similar to: + *
+ * Transfer initiated...
+ * |                    | 0.0%
+ * |==                  | 12.5%
+ * |=====               | 25.0%
+ * |=======             | 37.5%
+ * |==========          | 50.0%
+ * |============        | 62.5%
+ * |===============     | 75.0%
+ * |=================   | 87.5%
+ * |====================| 100.0%
+ * Transfer complete!
+ * 
+ */ +@SdkPublicApi +@SdkPreviewApi +public interface TransferListener { + + /** + * A new transfer has been initiated. This method is called exactly once per transfer. + *

+ * Available context attributes: + *

    + *
  1. {@link Context.TransferInitiated#request()}
  2. + *
  3. {@link Context.TransferInitiated#progressSnapshot()}
  4. + *
+ */ + default void transferInitiated(Context.TransferInitiated context) { + } + + /** + * Additional bytes have been submitted or received. This method may be called many times per transfer, depending on the + * transfer size and I/O buffer sizes. + *

+ * Available context attributes: + *

    + *
  1. {@link Context.BytesTransferred#request()}
  2. + *
  3. {@link Context.BytesTransferred#progressSnapshot()}
  4. + *
+ */ + default void bytesTransferred(Context.BytesTransferred context) { + } + + /** + * The transfer has completed successfully. This method is called exactly once for a successful transfer. + *

+ * Available context attributes: + *

    + *
  1. {@link Context.TransferComplete#request()}
  2. + *
  3. {@link Context.TransferComplete#progressSnapshot()}
  4. + *
  5. {@link Context.TransferComplete#completedTransfer()}
  6. + *
+ */ + default void transferComplete(Context.TransferComplete context) { + } + + /** + * The transfer failed. This method is called exactly once for a failed transfer. + *

+ * Available context attributes: + *

    + *
  1. {@link Context.TransferFailed#request()}
  2. + *
  3. {@link Context.TransferFailed#progressSnapshot()}
  4. + *
  5. {@link Context.TransferFailed#exception()}
  6. + *
+ */ + default void transferFailed(Context.TransferFailed context) { + } + + /** + * A wrapper class that groups together the different context interfaces that are exposed to {@link TransferListener}s. + *

+ * Successful transfer interface hierarchy: + *

    + *
  1. {@link TransferInitiated}
  2. + *
  3. {@link BytesTransferred}
  4. + *
  5. {@link TransferComplete}
  6. + *
+ * Failed transfer interface hierarchy: + *
    + *
  1. {@link TransferInitiated}
  2. + *
  3. {@link TransferFailed}
  4. + *
+ * + * @see TransferListener + */ + @SdkProtectedApi + final class Context { + private Context() { + } + + /** + * A new transfer has been initiated. + *

+ * Available context attributes: + *

    + *
  1. {@link TransferInitiated#request()}
  2. + *
  3. {@link TransferInitiated#progressSnapshot()}
  4. + *
+ */ + @Immutable + @ThreadSafe + @SdkPublicApi + @SdkPreviewApi + public interface TransferInitiated { + /** + * The {@link TransferRequest} that was submitted to {@link S3TransferManager}, i.e., the {@link UploadRequest} or + * {@link DownloadRequest}. + */ + TransferRequest request(); + + /** + * The immutable {@link TransferProgressSnapshot} for this specific update. + */ + TransferProgressSnapshot progressSnapshot(); + } + + /** + * Additional bytes have been submitted or received. + *

+ * Available context attributes: + *

    + *
  1. {@link BytesTransferred#request()}
  2. + *
  3. {@link BytesTransferred#progressSnapshot()}
  4. + *
+ */ + @Immutable + @ThreadSafe + @SdkPublicApi + @SdkPreviewApi + public interface BytesTransferred extends TransferInitiated { + } + + /** + * The transfer has completed successfully. + *

+ * Available context attributes: + *

    + *
  1. {@link TransferComplete#request()}
  2. + *
  3. {@link TransferComplete#progressSnapshot()}
  4. + *
  5. {@link TransferComplete#completedTransfer()}
  6. + *
+ */ + @Immutable + @ThreadSafe + @SdkPublicApi + @SdkPreviewApi + public interface TransferComplete extends BytesTransferred { + /** + * The completed transfer, i.e., the {@link CompletedUpload} or {@link CompletedDownload}. + */ + CompletedTransfer completedTransfer(); + } + + /** + * The transfer failed. + *

+ * Available context attributes: + *

    + *
  1. {@link TransferFailed#request()}
  2. + *
  3. {@link TransferFailed#progressSnapshot()}
  4. + *
  5. {@link TransferFailed#exception()}
  6. + *
+ */ + @Immutable + @ThreadSafe + @SdkPublicApi + @SdkPreviewApi + public interface TransferFailed extends TransferInitiated { + /** + * The exception associated with the failed transfer. + *

+ * Note that this would be the cause> of a {@link CompletionException}, and not a {@link CompletionException} + * itself. + */ + Throwable exception(); + } + } +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgress.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgress.java new file mode 100644 index 000000000000..71e39ea3f739 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgress.java @@ -0,0 +1,60 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.progress; + +import software.amazon.awssdk.annotations.Mutable; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.transfer.s3.Download; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.Transfer; +import software.amazon.awssdk.transfer.s3.Upload; + +/** + * {@link TransferProgress} is a stateful representation of the progress of a transfer initiated by {@link + * S3TransferManager}. {@link TransferProgress} offers the ability to take a {@link #snapshot()} of the current progress, + * represented by an immutable {@link TransferProgressSnapshot}, which contains helpful progress-related methods like {@link + * TransferProgressSnapshot#bytesTransferred()} and {@link TransferProgressSnapshot#ratioTransferred()}. {@link TransferProgress} + * is attached to {@link Transfer} objects, namely {@link Upload} and {@link Download}. + *

+ * Where possible, it is typically recommended to avoid directly querying {@link TransferProgress} and to instead leverage + * the {@link TransferListener} interface to receive event-driven updates of the latest {@link TransferProgressSnapshot}. See the + * {@link TransferListener} documentation for usage instructions. However, if desired, {@link TransferProgress} can be used for + * poll-like checking of the current progress. E.g., + *

{@code
+ * Upload upload = tm.upload(...);
+ * while (!upload.completionFuture().isDone()) {
+ *     upload.progress().snapshot().ratioTransferred().ifPresent(System.out::println);
+ *     Thread.sleep(1000);
+ * }
+ * }
+ * + * @see TransferProgressSnapshot + * @see TransferListener + * @see S3TransferManager + */ +@Mutable +@ThreadSafe +@SdkPublicApi +@SdkPreviewApi +public interface TransferProgress { + + /** + * Take a snapshot of the current progress, represented by an immutable {@link TransferProgressSnapshot}. + */ + TransferProgressSnapshot snapshot(); +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java new file mode 100644 index 000000000000..4299fb641e06 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/progress/TransferProgressSnapshot.java @@ -0,0 +1,86 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.progress; + +import java.util.Optional; +import software.amazon.awssdk.annotations.Immutable; +import software.amazon.awssdk.annotations.SdkPreviewApi; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.annotations.ThreadSafe; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.transfer.s3.Download; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.Upload; + +/** + * {@link TransferProgressSnapshot} is an immutable, point-in-time representation of the progress of a given transfer + * initiated by {@link S3TransferManager}. {@link TransferProgressSnapshot} offers several helpful methods for checking the + * progress of a transfer, like {@link #bytesTransferred()} and {@link #ratioTransferred()}. + *

+ * {@link TransferProgressSnapshot}'s methods that return {@link Optional} are dependent upon the size of a transfer (i.e., the + * {@code Content-Length}) being known. In the case of file-based {@link Upload}s, transfer sizes are known up front and + * immediately available. In the case of {@link Download}s, the transfer size is not known until {@link S3TransferManager} + * receives a {@link GetObjectResponse} from Amazon S3. + *

+ * The recommended way to receive updates of the latest {@link TransferProgressSnapshot} is to implement the {@link + * TransferListener} interface. See the {@link TransferListener} documentation for usage instructions. A {@link + * TransferProgressSnapshot} can also be obtained from a {@link TransferProgress} returned as part of the result from a {@link + * TransferRequest}. + * + * @see TransferProgress + * @see TransferListener + * @see S3TransferManager + */ +@Immutable +@ThreadSafe +@SdkPublicApi +@SdkPreviewApi +public interface TransferProgressSnapshot { + + /** + * The total number of bytes that have been transferred so far. + */ + long bytesTransferred(); + + /** + * The total size of the transfer, in bytes, or {@link Optional#empty()} if unknown. + *

+ * In the case of file-based {@link Upload}s, transfer sizes are known up front and immediately available. In the case of + * {@link Download}s, the transfer size is not known until {@link S3TransferManager} receives a {@link GetObjectResponse} from + * Amazon S3. + */ + Optional transferSizeInBytes(); + + /** + * The ratio of the {@link #transferSizeInBytes()} that has been transferred so far, or {@link Optional#empty()} if unknown. + * This method depends on the {@link #transferSizeInBytes()} being known in order to return non-empty. + *

+ * Ratio is computed as {@link #bytesTransferred()} {@code /} {@link #transferSizeInBytes()}, where a transfer that is + * half-complete would return {@code 0.5}. + * + * @see #transferSizeInBytes() + */ + Optional ratioTransferred(); + + /** + * The total number of bytes that are remaining to be transferred, or {@link Optional#empty()} if unknown. This method depends + * on the {@link #transferSizeInBytes()} being known in order to return non-empty. + * + * @see #transferSizeInBytes() + */ + Optional bytesRemaining(); +} diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java new file mode 100644 index 000000000000..58713ca19e98 --- /dev/null +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultTransferProgressSnapshotTest.java @@ -0,0 +1,69 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.Test; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; + +public class DefaultTransferProgressSnapshotTest { + @Test + public void bytesTransferred_greaterThan_transferSize_shouldThrow() { + DefaultTransferProgressSnapshot.Builder builder = DefaultTransferProgressSnapshot.builder() + .bytesTransferred(2) + .transferSizeInBytes(1L); + assertThatThrownBy(builder::build) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("bytesTransferred (2) must not be greater than transferSizeInBytes (1)"); + } + + @Test + public void ratioTransferred_withoutTransferSize_isEmpty() { + TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1) + .build(); + assertThat(snapshot.ratioTransferred()).isNotPresent(); + } + + @Test + public void ratioTransferred_withTransferSize_isCorrect() { + TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1) + .transferSizeInBytes(2L) + .build(); + assertThat(snapshot.ratioTransferred()).hasValue(0.5); + } + + @Test + public void bytesRemainingTransferred_withoutTransferSize_isEmpty() { + TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1) + .build(); + assertThat(snapshot.bytesRemaining()).isNotPresent(); + } + + @Test + public void bytesRemainingTransferred_withTransferSize_isCorrect() { + TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() + .bytesTransferred(1) + .transferSizeInBytes(3L) + .build(); + assertThat(snapshot.bytesRemaining()).hasValue(2L); + } +} \ No newline at end of file diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerListenerTest.java new file mode 100644 index 000000000000..b263b1b54d5c --- /dev/null +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerListenerTest.java @@ -0,0 +1,283 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; +import java.nio.ByteBuffer; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ThreadLocalRandom; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.DrainingSubscriber; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.transfer.s3.CompletedUpload; +import software.amazon.awssdk.transfer.s3.Download; +import software.amazon.awssdk.transfer.s3.DownloadRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.Upload; +import software.amazon.awssdk.transfer.s3.UploadRequest; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +public class S3TransferManagerListenerTest { + private final FileSystem fs = Jimfs.newFileSystem(Configuration.unix()); + private S3CrtAsyncClient s3Crt; + private S3TransferManager tm; + private long contentLength; + + @Before + public void methodSetup() { + s3Crt = mock(S3CrtAsyncClient.class); + tm = new DefaultS3TransferManager(s3Crt, mock(UploadDirectoryHelper.class), mock(TransferManagerConfiguration.class)); + contentLength = 1024L; + when(s3Crt.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + .thenAnswer(drainPutRequestBody()); + when(s3Crt.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))) + .thenAnswer(randomGetResponseBody(contentLength)); + } + + @After + public void methodTeardown() { + tm.close(); + } + + @Test + public void upload_success_shouldInvokeListener() throws Exception { + TransferListener listener = mock(TransferListener.class); + + Path path = newTempFile(); + Files.write(path, randomBytes(contentLength)); + + UploadRequest uploadRequest = UploadRequest.builder() + .putObjectRequest(r -> r.bucket("bucket") + .key("key")) + .source(path) + .overrideConfiguration(b -> b.addListener(listener)) + .build(); + Upload upload = tm.upload(uploadRequest); + upload.completionFuture().join(); + + ArgumentCaptor captor1 = + ArgumentCaptor.forClass(TransferListener.Context.TransferInitiated.class); + verify(listener, times(1)).transferInitiated(captor1.capture()); + TransferListener.Context.TransferInitiated ctx1 = captor1.getValue(); + assertThat(ctx1.request()).isSameAs(uploadRequest); + assertThat(ctx1.progressSnapshot().transferSizeInBytes()).hasValue(contentLength); + assertThat(ctx1.progressSnapshot().bytesTransferred()).isZero(); + + ArgumentCaptor captor2 = + ArgumentCaptor.forClass(TransferListener.Context.BytesTransferred.class); + verify(listener, times(1)).bytesTransferred(captor2.capture()); + TransferListener.Context.BytesTransferred ctx2 = captor2.getValue(); + assertThat(ctx2.request()).isSameAs(uploadRequest); + assertThat(ctx2.progressSnapshot().transferSizeInBytes()).hasValue(contentLength); + assertThat(ctx2.progressSnapshot().bytesTransferred()).isPositive(); + + ArgumentCaptor captor3 = + ArgumentCaptor.forClass(TransferListener.Context.TransferComplete.class); + verify(listener, times(1)).transferComplete(captor3.capture()); + TransferListener.Context.TransferComplete ctx3 = captor3.getValue(); + assertThat(ctx3.request()).isSameAs(uploadRequest); + assertThat(ctx3.progressSnapshot().transferSizeInBytes()).hasValue(contentLength); + assertThat(ctx3.progressSnapshot().bytesTransferred()).isEqualTo(contentLength); + assertThat(ctx3.completedTransfer()).isSameAs(upload.completionFuture().get()); + + verifyNoMoreInteractions(listener); + } + + @Test + public void download_success_shouldInvokeListener() throws Exception { + TransferListener listener = mock(TransferListener.class); + + DownloadRequest downloadRequest = DownloadRequest.builder() + .getObjectRequest(r -> r.bucket("bucket") + .key("key")) + .destination(newTempFile()) + .overrideConfiguration(b -> b.addListener(listener)) + .build(); + Download download = tm.download(downloadRequest); + download.completionFuture().join(); + + ArgumentCaptor captor1 = + ArgumentCaptor.forClass(TransferListener.Context.TransferInitiated.class); + verify(listener, times(1)).transferInitiated(captor1.capture()); + TransferListener.Context.TransferInitiated ctx1 = captor1.getValue(); + assertThat(ctx1.request()).isSameAs(downloadRequest); + // transferSize is not known until we receive GetObjectResponse header + assertThat(ctx1.progressSnapshot().transferSizeInBytes()).isNotPresent(); + assertThat(ctx1.progressSnapshot().bytesTransferred()).isZero(); + + ArgumentCaptor captor2 = + ArgumentCaptor.forClass(TransferListener.Context.BytesTransferred.class); + verify(listener, times(1)).bytesTransferred(captor2.capture()); + TransferListener.Context.BytesTransferred ctx2 = captor2.getValue(); + assertThat(ctx2.request()).isSameAs(downloadRequest); + // transferSize should now be known + assertThat(ctx2.progressSnapshot().transferSizeInBytes()).hasValue(contentLength); + assertThat(ctx2.progressSnapshot().bytesTransferred()).isPositive(); + + ArgumentCaptor captor3 = + ArgumentCaptor.forClass(TransferListener.Context.TransferComplete.class); + verify(listener, times(1)).transferComplete(captor3.capture()); + TransferListener.Context.TransferComplete ctx3 = captor3.getValue(); + assertThat(ctx3.request()).isSameAs(downloadRequest); + assertThat(ctx3.progressSnapshot().transferSizeInBytes()).hasValue(contentLength); + assertThat(ctx3.progressSnapshot().bytesTransferred()).isEqualTo(contentLength); + assertThat(ctx3.completedTransfer()).isSameAs(download.completionFuture().get()); + + verifyNoMoreInteractions(listener); + } + + @Test + public void upload_failure_shouldInvokeListener() throws Exception { + TransferListener listener = mock(TransferListener.class); + + Path path = newTempFile(); + Files.write(path, randomBytes(contentLength)); + + UploadRequest uploadRequest = UploadRequest.builder() + .putObjectRequest(r -> r.bucket("bucket") + .key("key")) + .source(Paths.get("/some/nonexistent/path")) + .overrideConfiguration(b -> b.addListener(listener)) + .build(); + Upload upload = tm.upload(uploadRequest); + + CompletableFuture future = upload.completionFuture(); + assertThatThrownBy(future::join) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(NoSuchFileException.class); + + ArgumentCaptor captor1 = + ArgumentCaptor.forClass(TransferListener.Context.TransferInitiated.class); + verify(listener, times(1)).transferInitiated(captor1.capture()); + TransferListener.Context.TransferInitiated ctx1 = captor1.getValue(); + assertThat(ctx1.request()).isSameAs(uploadRequest); + // transferSize is not known since file did not exist + assertThat(ctx1.progressSnapshot().transferSizeInBytes()).isNotPresent(); + assertThat(ctx1.progressSnapshot().bytesTransferred()).isZero(); + + ArgumentCaptor captor2 = + ArgumentCaptor.forClass(TransferListener.Context.TransferFailed.class); + verify(listener, times(1)).transferFailed(captor2.capture()); + TransferListener.Context.TransferFailed ctx2 = captor2.getValue(); + assertThat(ctx2.request()).isSameAs(uploadRequest); + assertThat(ctx2.progressSnapshot().transferSizeInBytes()).isNotPresent(); + assertThat(ctx2.progressSnapshot().bytesTransferred()).isZero(); + assertThat(ctx2.exception()).isInstanceOf(NoSuchFileException.class); + + verifyNoMoreInteractions(listener); + } + + @Test + public void listener_exception_shouldBeSuppressed() throws Exception { + TransferListener listener = throwingListener(); + + Path path = newTempFile(); + Files.write(path, randomBytes(contentLength)); + + UploadRequest uploadRequest = UploadRequest.builder() + .putObjectRequest(r -> r.bucket("bucket") + .key("key")) + .source(path) + .overrideConfiguration(b -> b.addListener(listener)) + .build(); + Upload upload = tm.upload(uploadRequest); + upload.completionFuture().join(); + + verify(listener, times(1)).transferInitiated(any()); + verify(listener, times(1)).bytesTransferred(any()); + verify(listener, times(1)).transferComplete(any()); + verifyNoMoreInteractions(listener); + } + + private static TransferListener throwingListener() { + TransferListener listener = mock(TransferListener.class); + RuntimeException e = new RuntimeException("Intentional exception for testing purposes"); + doThrow(e).when(listener).transferInitiated(any()); + doThrow(e).when(listener).bytesTransferred(any()); + doThrow(e).when(listener).transferComplete(any()); + doThrow(e).when(listener).transferFailed(any()); + return listener; + } + + private static Answer> drainPutRequestBody() { + return invocationOnMock -> { + AsyncRequestBody requestBody = invocationOnMock.getArgumentAt(1, AsyncRequestBody.class); + CompletableFuture cf = new CompletableFuture<>(); + requestBody.subscribe(new DrainingSubscriber() { + @Override + public void onError(Throwable t) { + cf.completeExceptionally(t); + } + + @Override + public void onComplete() { + cf.complete(PutObjectResponse.builder().build()); + } + }); + return cf; + }; + } + + private static Answer> randomGetResponseBody(long contentLength) { + return invocationOnMock -> { + AsyncResponseTransformer responseTransformer = + invocationOnMock.getArgumentAt(1, AsyncResponseTransformer.class); + CompletableFuture cf = responseTransformer.prepare(); + responseTransformer.onResponse(GetObjectResponse.builder() + .contentLength(contentLength) + .build()); + responseTransformer.onStream(AsyncRequestBody.fromBytes(randomBytes(contentLength))); + return cf; + }; + } + + private Path newTempFile() { + return fs.getPath("/", UUID.randomUUID().toString()); + } + + private static byte[] randomBytes(long size) { + byte[] bytes = new byte[Math.toIntExact(size)]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + } +} diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerTest.java index a1bad44562d4..e018b645816e 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerTest.java @@ -185,21 +185,21 @@ public void close_shouldCloseUnderlyingResources() { public void uploadDirectory_requestNull_shouldThrowException() { UploadDirectoryRequest request = null; assertThatThrownBy(() -> tm.uploadDirectory(request).completionFuture().join()) - .hasCauseInstanceOf(NullPointerException.class) + .isInstanceOf(NullPointerException.class) .hasMessageContaining("must not be null"); } @Test public void upload_requestNull_shouldThrowException() { UploadRequest request = null; - assertThatThrownBy(() -> tm.upload(request).completionFuture().join()).hasCauseInstanceOf(NullPointerException.class) + assertThatThrownBy(() -> tm.upload(request).completionFuture().join()).isInstanceOf(NullPointerException.class) .hasMessageContaining("must not be null"); } @Test public void download_requestNull_shouldThrowException() { DownloadRequest request = null; - assertThatThrownBy(() -> tm.download(request).completionFuture().join()).hasCauseInstanceOf(NullPointerException.class) + assertThatThrownBy(() -> tm.download(request).completionFuture().join()).isInstanceOf(NullPointerException.class) .hasMessageContaining("must not be null"); } } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java index a41061c2c14c..25538052a3a8 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperParameterizedTest.java @@ -50,6 +50,8 @@ import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.UploadDirectoryTransfer; import software.amazon.awssdk.transfer.s3.UploadRequest; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; import software.amazon.awssdk.utils.IoUtils; /** @@ -311,7 +313,8 @@ public void uploadDirectory_notDirectoryFollowSymlinkTrue_shouldCompleteSuccessf private DefaultUpload completedUpload() { return new DefaultUpload(CompletableFuture.completedFuture(CompletedUpload.builder() .response(PutObjectResponse.builder().build()) - .build())); + .build()), + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())); } private Path createTestDirectory() throws IOException { diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java index 205bbb54dd0d..e95fca2288f3 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java @@ -44,6 +44,8 @@ import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest; import software.amazon.awssdk.transfer.s3.UploadDirectoryTransfer; import software.amazon.awssdk.transfer.s3.UploadRequest; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; public class UploadDirectoryHelperTest { private static FileSystem jimfs; @@ -74,10 +76,10 @@ public void methodSetup() { @Test public void uploadDirectory_cancel_shouldCancelAllFutures() { CompletableFuture future = new CompletableFuture<>(); - Upload upload = new DefaultUpload(future); + Upload upload = newUpload(future); CompletableFuture future2 = new CompletableFuture<>(); - Upload upload2 = new DefaultUpload(future2); + Upload upload2 = newUpload(future2); when(singleUploadFunction.apply(any(UploadRequest.class))).thenReturn(upload, upload2); @@ -103,13 +105,13 @@ public void uploadDirectory_allUploadsSucceed_failedUploadsShouldBeEmpty() throw CompletedUpload completedUpload = CompletedUpload.builder().response(putObjectResponse).build(); CompletableFuture successfulFuture = new CompletableFuture<>(); - Upload upload = new DefaultUpload(successfulFuture); + Upload upload = newUpload(successfulFuture); successfulFuture.complete(completedUpload); PutObjectResponse putObjectResponse2 = PutObjectResponse.builder().eTag("5678").build(); CompletedUpload completedUpload2 = CompletedUpload.builder().response(putObjectResponse2).build(); CompletableFuture failedFuture = new CompletableFuture<>(); - Upload upload2 = new DefaultUpload(failedFuture); + Upload upload2 = newUpload(failedFuture); failedFuture.complete(completedUpload2); when(singleUploadFunction.apply(any(UploadRequest.class))).thenReturn(upload, upload2); @@ -131,12 +133,12 @@ public void uploadDirectory_partialSuccess_shouldProvideFailedUploads() throws E PutObjectResponse putObjectResponse = PutObjectResponse.builder().eTag("1234").build(); CompletedUpload completedUpload = CompletedUpload.builder().response(putObjectResponse).build(); CompletableFuture successfulFuture = new CompletableFuture<>(); - Upload upload = new DefaultUpload(successfulFuture); + Upload upload = newUpload(successfulFuture); successfulFuture.complete(completedUpload); SdkClientException exception = SdkClientException.create("failed"); CompletableFuture failedFuture = new CompletableFuture<>(); - Upload upload2 = new DefaultUpload(failedFuture); + Upload upload2 = newUpload(failedFuture); failedFuture.completeExceptionally(exception); when(singleUploadFunction.apply(any(UploadRequest.class))).thenReturn(upload, upload2); @@ -153,4 +155,10 @@ public void uploadDirectory_partialSuccess_shouldProvideFailedUploads() throws E assertThat(completedUploadDirectory.failedUploads().iterator().next().exception()).isEqualTo(exception); assertThat(completedUploadDirectory.failedUploads().iterator().next().request().source().toString()).isEqualTo("test/2"); } + + private Upload newUpload(CompletableFuture future) { + return new DefaultUpload(future, + new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build()) + ); + } } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java new file mode 100644 index 000000000000..930b6094bdca --- /dev/null +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/progress/LoggingTransferListenerTest.java @@ -0,0 +1,122 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.progress; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import java.util.List; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Before; +import org.junit.Test; +import software.amazon.awssdk.testutils.LogCaptor; +import software.amazon.awssdk.transfer.s3.CompletedTransfer; +import software.amazon.awssdk.transfer.s3.TransferRequest; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress; +import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot; +import software.amazon.awssdk.transfer.s3.internal.progress.TransferListenerContext; + +public class LoggingTransferListenerTest { + + private static final long TRANSFER_SIZE_IN_BYTES = 1024L; + + private DefaultTransferProgress progress; + private TransferListenerContext context; + private LoggingTransferListener listener; + + @Before + public void setUp() throws Exception { + TransferProgressSnapshot snapshot = DefaultTransferProgressSnapshot.builder() + .transferSizeInBytes(TRANSFER_SIZE_IN_BYTES) + .build(); + progress = new DefaultTransferProgress(snapshot); + context = TransferListenerContext.builder() + .request(mock(TransferRequest.class)) + .progressSnapshot(snapshot) + .build(); + listener = LoggingTransferListener.create(); + } + + @Test + public void test_defaultListener_successfulTransfer() { + try (LogCaptor logCaptor = new LogCaptor.DefaultLogCaptor(Level.ALL)) { + invokeSuccessfulLifecycle(); + List events = logCaptor.loggedEvents(); + assertLogged(events, Level.INFO, "Transfer initiated..."); + assertLogged(events, Level.INFO, "| | 0.0%"); + assertLogged(events, Level.INFO, "|= | 5.0%"); + assertLogged(events, Level.INFO, "|== | 10.0%"); + assertLogged(events, Level.INFO, "|=== | 15.0%"); + assertLogged(events, Level.INFO, "|==== | 20.0%"); + assertLogged(events, Level.INFO, "|===== | 25.0%"); + assertLogged(events, Level.INFO, "|====== | 30.0%"); + assertLogged(events, Level.INFO, "|======= | 35.0%"); + assertLogged(events, Level.INFO, "|======== | 40.0%"); + assertLogged(events, Level.INFO, "|========= | 45.0%"); + assertLogged(events, Level.INFO, "|========== | 50.0%"); + assertLogged(events, Level.INFO, "|=========== | 55.0%"); + assertLogged(events, Level.INFO, "|============ | 60.0%"); + assertLogged(events, Level.INFO, "|============= | 65.0%"); + assertLogged(events, Level.INFO, "|============== | 70.0%"); + assertLogged(events, Level.INFO, "|=============== | 75.0%"); + assertLogged(events, Level.INFO, "|================ | 80.0%"); + assertLogged(events, Level.INFO, "|================= | 85.0%"); + assertLogged(events, Level.INFO, "|================== | 90.0%"); + assertLogged(events, Level.INFO, "|=================== | 95.0%"); + assertLogged(events, Level.INFO, "|====================| 100.0%"); + assertLogged(events, Level.INFO, "Transfer complete!"); + assertThat(events).isEmpty(); + } + } + + @Test + public void test_customTicksListener_successfulTransfer() { + try (LogCaptor logCaptor = new LogCaptor.DefaultLogCaptor(Level.ALL)) { + listener = LoggingTransferListener.create(5); + invokeSuccessfulLifecycle(); + List events = logCaptor.loggedEvents(); + assertLogged(events, Level.INFO, "Transfer initiated..."); + assertLogged(events, Level.INFO, "| | 0.0%"); + assertLogged(events, Level.INFO, "|= | 20.0%"); + assertLogged(events, Level.INFO, "|== | 40.0%"); + assertLogged(events, Level.INFO, "|=== | 60.0%"); + assertLogged(events, Level.INFO, "|==== | 80.0%"); + assertLogged(events, Level.INFO, "|=====| 100.0%"); + assertLogged(events, Level.INFO, "Transfer complete!"); + assertThat(events).isEmpty(); + } + } + + private void invokeSuccessfulLifecycle() { + listener.transferInitiated(context); + + for (int i = 0; i <= TRANSFER_SIZE_IN_BYTES; i++) { + int bytes = i; + listener.bytesTransferred(context.copy(c -> c.progressSnapshot( + progress.updateAndGet(p -> p.bytesTransferred(bytes))))); + } + + listener.transferComplete(context.copy(b -> b.progressSnapshot(progress.snapshot()) + .completedTransfer(mock(CompletedTransfer.class)))); + } + + private void assertLogged(List events, Level level, String message) { + LoggingEvent event = events.remove(0); + assertThat(event.getLevel()).isEqualTo(level); + assertThat(event.getMessage()).isEqualTo(message); + } +} \ No newline at end of file diff --git a/services-custom/s3-transfer-manager/src/test/resources/log4j.properties b/services-custom/s3-transfer-manager/src/test/resources/log4j.properties index b821297c6731..9088e99fcdec 100644 --- a/services-custom/s3-transfer-manager/src/test/resources/log4j.properties +++ b/services-custom/s3-transfer-manager/src/test/resources/log4j.properties @@ -13,7 +13,7 @@ # permissions and limitations under the License. # -log4j.rootLogger=WARN, A1 +log4j.rootLogger=INFO, A1 log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1.layout=org.apache.log4j.PatternLayout diff --git a/utils/src/main/java/software/amazon/awssdk/utils/CompletableFutureUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/CompletableFutureUtils.java index 05f1c600d21f..ee397b26b2af 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/CompletableFutureUtils.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/CompletableFutureUtils.java @@ -122,4 +122,26 @@ public static CompletableFuture forwardResultTo(CompletableFuture src, return src; } + + /** + * Completes the {@code dst} future based on the result of the {@code src} future, synchronously, + * after applying the provided transformation {@link Function} if successful. + * + * @param src The source {@link CompletableFuture} + * @param dst The destination where the {@code Throwable} or transformed result will be forwarded to. + * @return the {@code src} future. + */ + public static CompletableFuture forwardTransformedResultTo(CompletableFuture src, + CompletableFuture dst, + Function function) { + src.whenComplete((r, e) -> { + if (e != null) { + dst.completeExceptionally(e); + } else { + dst.complete(function.apply(r)); + } + }); + + return src; + } } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/StringUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/StringUtils.java index 2d819b78438d..312dfda2a340 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/StringUtils.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/StringUtils.java @@ -753,4 +753,43 @@ public static boolean safeStringToBoolean(String value) { throw new IllegalArgumentException("Value was defined as '" + value + "', but should be 'false' or 'true'"); } + + /** + * Returns a string whose value is the concatenation of this string repeated {@code count} times. + *

+ * If this string is empty or count is zero then the empty string is returned. + *

+ * Logical clone of JDK11's {@link String#repeat(int)}. + * + * @param value the string to repeat + * @param count number of times to repeat + * @return A string composed of this string repeated {@code count} times or the empty string if this string is empty or count + * is zero + * @throws IllegalArgumentException if the {@code count} is negative. + */ + public static String repeat(String value, int count) { + if (count < 0) { + throw new IllegalArgumentException("count is negative: " + count); + } + if (value == null || value.length() == 0 || count == 1) { + return value; + } + if (count == 0) { + return ""; + } + if (value.length() > Integer.MAX_VALUE / count) { + throw new OutOfMemoryError("Repeating " + value.length() + " bytes String " + count + + " times will produce a String exceeding maximum size."); + } + int len = value.length(); + int limit = len * count; + char[] array = new char[limit]; + value.getChars(0, len, array, 0); + int copied; + for (copied = len; copied < limit - copied; copied <<= 1) { + System.arraycopy(array, 0, array, copied, copied); + } + System.arraycopy(array, 0, array, copied, limit - copied); + return new String(array); + } } \ No newline at end of file diff --git a/utils/src/test/java/software/amazon/awssdk/utils/CompletableFutureUtilsTest.java b/utils/src/test/java/software/amazon/awssdk/utils/CompletableFutureUtilsTest.java index e2756cbbff66..1c08021cda54 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/CompletableFutureUtilsTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/CompletableFutureUtilsTest.java @@ -15,18 +15,17 @@ package software.amazon.awssdk.utils; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.fail; + +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.util.concurrent.CompletableFuture; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.fail; - public class CompletableFutureUtilsTest { private static ExecutorService executors; @@ -82,4 +81,28 @@ public void forwardResultTo_srcCompletesExceptionally_shouldCompleteDstFuture() src.completeExceptionally(exception); assertThatThrownBy(dst::join).hasCause(exception); } + + @Test(timeout = 1000) + public void forwardTransformedResultTo_srcCompletesSuccessfully_shouldCompleteDstFuture() { + CompletableFuture src = new CompletableFuture<>(); + CompletableFuture dst = new CompletableFuture<>(); + + CompletableFuture returnedFuture = CompletableFutureUtils.forwardTransformedResultTo(src, dst, String::valueOf); + assertThat(returnedFuture).isSameAs(src); + + src.complete(123); + assertThat(dst.join()).isEqualTo("123"); + } + + @Test(timeout = 1000) + public void forwardTransformedResultTo_srcCompletesExceptionally_shouldCompleteDstFuture() { + CompletableFuture src = new CompletableFuture<>(); + CompletableFuture dst = new CompletableFuture<>(); + + RuntimeException exception = new RuntimeException("foobar"); + CompletableFutureUtils.forwardTransformedResultTo(src, dst, String::valueOf); + + src.completeExceptionally(exception); + assertThatThrownBy(dst::join).hasCause(exception); + } } diff --git a/utils/src/test/java/software/amazon/awssdk/utils/StringUtilsTest.java b/utils/src/test/java/software/amazon/awssdk/utils/StringUtilsTest.java index 151f3ba36f91..865cf5be430a 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/StringUtilsTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/StringUtilsTest.java @@ -151,4 +151,30 @@ public void safeStringTooBoolean_mixedSpaceFalse_shouldReturnFalse() { public void safeStringTooBoolean_invalidValue_shouldThrowException() { assertFalse(StringUtils.safeStringToBoolean("foobar")); } + + @Test + public void testRepeat() { + assertNull(StringUtils.repeat(null, 0)); + assertNull(StringUtils.repeat(null, 1)); + assertNull(StringUtils.repeat(null, 2)); + + assertEquals("", StringUtils.repeat("", 0)); + assertEquals("", StringUtils.repeat("", 1)); + assertEquals("", StringUtils.repeat("", 2)); + + assertEquals("", StringUtils.repeat("ab", 0)); + assertEquals("ab", StringUtils.repeat("ab", 1)); + assertEquals("abab", StringUtils.repeat("ab", 2)); + assertEquals("ababab", StringUtils.repeat("ab", 3)); + } + + @Test(expected = IllegalArgumentException.class) + public void repeat_negativeCount_shouldThrowIae() { + StringUtils.repeat("a", -1); + } + + @Test(expected = OutOfMemoryError.class) + public void repeat_maxCount_shouldThrowOom() { + StringUtils.repeat("a", Integer.MAX_VALUE); + } }