Skip to content

Commit 995ca7d

Browse files
author
Bennett Lynch
committed
Refactor S3TransferManager to support non-file-based transfers
## Motivation We would like for S3TransferManager to support non-file-based uploads and downloads, such as reactive streams or in-memory byte buffers, but the current implementation is directly coupled to file/Path-based transfers. While the underlying S3AsyncClient already has support for AsyncRequestBodies and AsyncResponseTransformers, these constructs are not exposed through the current S3TransferManager interface. Additionally, the existing S3TransferManager interfaces have made some assumptions that make it difficult to introduce this support, such as UploadRequest/DownloadRequest having a Path attribute, and not having interfaces that allow us to distinguish between transfer types, e.g., single-object and multi-object (directory) transfers. Likewise, the current interface hierarchy also makes it difficult to distinguish between file-based and non-file-based. Therefore, this refactoring requires backwards-incompatible changes, which are permissible while S3TransferManager is still in PREVIEW. ## Modifications The diff here is very large, but this change set primarily introduces the following interface hierarchy: * TransferRequest * TransferObjectRequest * UploadFileRequest * DownloadFileRequest * UploadRequest * DownloadRequest<T> * TransferDirectoryRequest * UploadDirectoryRequest * Transfer * ObjectTransfer * FileUpload * FileDownload * Upload * Download<T> * DirectoryTransfer * DirectoryUpload * CompletedTransfer * CompletedObjectTransfer * CompletedFileUpload * CompletedFileDownload * CompletedUpload * CompletedDownload<T> * CompletedDirectoryTransfer * CompletedDirectoryUpload * FailedObjectTransfer * FailedFileUpload These interfaces allow us to more selectively choose which attributes will be shared between different data types. Additionally, we take this opportunity to make the naming convention and class-vs-interface distinction consistent across all the above data types. It is important that we distinguish between file-based-requests and non-file-based-requests so that the Collection<> of failed file transfers from a directory upload can be re-driven, if needed. Note that the above Download-oriented data types are all generic. This is because of how AsyncResponseTransformer is designed to be generic, being parameterized with the type of data that a given AsyncResponseTransformer produces. Therefore, a user must declare the type at the time of instantiating a DownloadRequest, and the type will be consistent throughout the transfer lifecycle, including the Download and CompletedDownload interfaces.
1 parent b47d247 commit 995ca7d

File tree

66 files changed

+2758
-994
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2758
-994
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon S3",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Refactor S3TransferManager to support non-file-based transfers"
6+
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/CrtExceptionTransformationIntegrationTest.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515

1616
package software.amazon.awssdk.transfer.s3;
1717

18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
20+
21+
import java.io.IOException;
22+
import java.nio.file.Files;
23+
import java.nio.file.Paths;
1824
import org.junit.AfterClass;
1925
import org.junit.BeforeClass;
2026
import org.junit.Test;
@@ -25,13 +31,6 @@
2531
import software.amazon.awssdk.testutils.RandomTempFile;
2632
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
2733

28-
import java.io.IOException;
29-
import java.nio.file.Files;
30-
import java.nio.file.Paths;
31-
32-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
33-
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
34-
3534
public class CrtExceptionTransformationIntegrationTest extends S3IntegrationTestBase {
3635

3736
private static final String BUCKET = temporaryBucketName(CrtExceptionTransformationIntegrationTest.class);
@@ -87,21 +86,21 @@ public void getObjectNoSuchBucket() throws IOException {
8786
@Test
8887
public void transferManagerDownloadObjectWithNoSuchKey() throws IOException {
8988
String randomBaseDirectory = Files.createTempDirectory(getClass().getSimpleName()).toString();
90-
assertThatThrownBy(() -> transferManager.download(DownloadRequest.builder()
91-
.getObjectRequest(GetObjectRequest.builder().bucket(BUCKET).key("randomKey").build())
92-
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
93-
.build()).completionFuture().join())
89+
assertThatThrownBy(() -> transferManager.downloadFile(DownloadFileRequest.builder()
90+
.getObjectRequest(GetObjectRequest.builder().bucket(BUCKET).key("randomKey").build())
91+
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
92+
.build()).completionFuture().join())
9493
.hasCauseInstanceOf(NoSuchKeyException.class)
9594
.hasMessageContaining("software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist");
9695
}
9796

9897
@Test
9998
public void transferManagerDownloadObjectWithNoSuchBucket() throws IOException {
10099
String randomBaseDirectory = Files.createTempDirectory(getClass().getSimpleName()).toString();
101-
assertThatThrownBy(() -> transferManager.download(DownloadRequest.builder()
102-
.getObjectRequest(GetObjectRequest.builder().bucket("nonExistingTestBucket").key(KEY).build())
103-
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
104-
.build()).completionFuture().join())
100+
assertThatThrownBy(() -> transferManager.downloadFile(DownloadFileRequest.builder()
101+
.getObjectRequest(GetObjectRequest.builder().bucket("nonExistingTestBucket").key(KEY).build())
102+
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
103+
.build()).completionFuture().join())
105104
.hasCauseInstanceOf(NoSuchBucketException.class)
106105
.hasMessageContaining("software.amazon.awssdk.services.s3.model.NoSuchBucketException: The specified bucket does not exist");
107106
}
@@ -127,10 +126,10 @@ public void putObjectNoSuchBucket() throws IOException {
127126

128127
@Test
129128
public void transferManagerUploadObjectWithNoSuchObject() throws IOException{
130-
assertThatThrownBy(() -> transferManager.upload(UploadRequest.builder()
131-
.putObjectRequest(PutObjectRequest.builder().bucket("nonExistingTestBucket").key("someKey").build())
132-
.source(testFile.toPath())
133-
.build()).completionFuture().join())
129+
assertThatThrownBy(() -> transferManager.uploadFile(UploadFileRequest.builder()
130+
.putObjectRequest(PutObjectRequest.builder().bucket("nonExistingTestBucket").key("someKey").build())
131+
.source(testFile.toPath())
132+
.build()).completionFuture().join())
134133
.hasCauseInstanceOf(NoSuchBucketException.class)
135134
.hasMessageContaining("software.amazon.awssdk.services.s3.model.NoSuchBucketException: The specified bucket does not exist");
136135
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.junit.AfterClass;
2525
import org.junit.BeforeClass;
2626
import org.junit.Test;
27+
import software.amazon.awssdk.core.ResponseBytes;
28+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
29+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2730
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
2831
import software.amazon.awssdk.testutils.RandomTempFile;
2932
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
@@ -46,7 +49,7 @@ public static void setup() throws IOException {
4649
.build(), file.toPath());
4750
tm = S3TransferManager.builder()
4851
.s3ClientConfiguration(b -> b.region(DEFAULT_REGION)
49-
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
52+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
5053
.build();
5154
}
5255

@@ -58,15 +61,30 @@ public static void cleanup() {
5861
}
5962

6063
@Test
61-
public void download_shouldWork() throws IOException {
64+
public void download_toFile() throws IOException {
6265
Path path = RandomTempFile.randomUncreatedFile().toPath();
63-
Download download = tm.download(DownloadRequest.builder()
64-
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
65-
.destination(path)
66-
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
67-
.build());
68-
CompletedDownload completedDownload = download.completionFuture().join();
66+
FileDownload download =
67+
tm.downloadFile(DownloadFileRequest.builder()
68+
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
69+
.destination(path)
70+
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
71+
.build());
72+
CompletedFileDownload completedFileDownload = download.completionFuture().join();
6973
assertThat(Md5Utils.md5AsBase64(path.toFile())).isEqualTo(Md5Utils.md5AsBase64(file));
70-
assertThat(completedDownload.response().responseMetadata().requestId()).isNotNull();
74+
assertThat(completedFileDownload.response().responseMetadata().requestId()).isNotNull();
75+
}
76+
77+
@Test
78+
public void download_toBytes() throws Exception {
79+
Download<ResponseBytes<GetObjectResponse>> download =
80+
tm.download(DownloadRequest.builder()
81+
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
82+
.responseTransformer(AsyncResponseTransformer.toBytes())
83+
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
84+
.build());
85+
CompletedDownload<ResponseBytes<GetObjectResponse>> completedDownload = download.completionFuture().join();
86+
ResponseBytes<GetObjectResponse> result = completedDownload.result();
87+
assertThat(Md5Utils.md5AsBase64(result.asByteArray())).isEqualTo(Md5Utils.md5AsBase64(file));
88+
assertThat(result.response().responseMetadata().requestId()).isNotNull();
7189
}
7290
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadDirectoryIntegrationTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ public static void teardown() {
8585
@Test
8686
public void uploadDirectory_filesSentCorrectly() {
8787
String prefix = "yolo";
88-
UploadDirectoryTransfer uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
89-
.bucket(TEST_BUCKET)
90-
.prefix(prefix)
91-
.overrideConfiguration(o -> o.recursive(true)));
92-
CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
93-
assertThat(completedUploadDirectory.failedUploads()).isEmpty();
88+
DirectoryUpload uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
89+
.bucket(TEST_BUCKET)
90+
.prefix(prefix)
91+
.overrideConfiguration(o -> o.recursive(true)));
92+
CompletedDirectoryUpload completedDirectoryUpload = uploadDirectory.completionFuture().join();
93+
assertThat(completedDirectoryUpload.failedTransfers()).isEmpty();
9494

9595
List<String> keys =
9696
s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().stream().map(S3Object::key)
@@ -105,13 +105,13 @@ public void uploadDirectory_filesSentCorrectly() {
105105
public void uploadDirectory_withDelimiter_filesSentCorrectly() {
106106
String prefix = "hello";
107107
String delimiter = "0";
108-
UploadDirectoryTransfer uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
109-
.bucket(TEST_BUCKET)
110-
.delimiter(delimiter)
111-
.prefix(prefix)
112-
.overrideConfiguration(o -> o.recursive(true)));
113-
CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
114-
assertThat(completedUploadDirectory.failedUploads()).isEmpty();
108+
DirectoryUpload uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
109+
.bucket(TEST_BUCKET)
110+
.delimiter(delimiter)
111+
.prefix(prefix)
112+
.overrideConfiguration(o -> o.recursive(true)));
113+
CompletedDirectoryUpload completedDirectoryUpload = uploadDirectory.completionFuture().join();
114+
assertThat(completedDirectoryUpload.failedTransfers()).isEmpty();
115115

116116
List<String> keys =
117117
s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().stream().map(S3Object::key)

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
2020

2121
import java.io.IOException;
22+
import java.nio.charset.StandardCharsets;
2223
import java.nio.file.Files;
24+
import java.util.UUID;
2325
import org.junit.AfterClass;
2426
import org.junit.BeforeClass;
2527
import org.junit.Test;
2628
import software.amazon.awssdk.core.ResponseInputStream;
29+
import software.amazon.awssdk.core.async.AsyncRequestBody;
2730
import software.amazon.awssdk.core.sync.ResponseTransformer;
2831
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2932
import software.amazon.awssdk.testutils.RandomTempFile;
@@ -62,16 +65,17 @@ public static void teardown() throws IOException {
6265
}
6366

6467
@Test
65-
public void upload_fileSentCorrectly() throws IOException {
66-
Upload upload = tm.upload(UploadRequest.builder()
67-
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
68-
.source(testFile.toPath())
69-
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
70-
.build());
68+
public void upload_file_SentCorrectly() throws IOException {
69+
FileUpload fileUpload =
70+
tm.uploadFile(UploadFileRequest.builder()
71+
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
72+
.source(testFile.toPath())
73+
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
74+
.build());
7175

72-
CompletedUpload completedUpload = upload.completionFuture().join();
73-
assertThat(completedUpload.response().responseMetadata().requestId()).isNotNull();
74-
assertThat(completedUpload.response().sdkHttpResponse()).isNotNull();
76+
CompletedFileUpload completedFileUpload = fileUpload.completionFuture().join();
77+
assertThat(completedFileUpload.response().responseMetadata().requestId()).isNotNull();
78+
assertThat(completedFileUpload.response().sdkHttpResponse()).isNotNull();
7579

7680
ResponseInputStream<GetObjectResponse> obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
7781
ResponseTransformer.toInputStream());
@@ -80,4 +84,27 @@ public void upload_fileSentCorrectly() throws IOException {
8084
.isEqualTo(ChecksumUtils.computeCheckSum(obj));
8185
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
8286
}
87+
88+
@Test
89+
public void upload_asyncRequestBody_SentCorrectly() throws IOException {
90+
String content = UUID.randomUUID().toString();
91+
92+
Upload upload =
93+
tm.upload(UploadRequest.builder()
94+
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
95+
.requestBody(AsyncRequestBody.fromString(content))
96+
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
97+
.build());
98+
99+
CompletedUpload completedUpload = upload.completionFuture().join();
100+
assertThat(completedUpload.response().responseMetadata().requestId()).isNotNull();
101+
assertThat(completedUpload.response().sdkHttpResponse()).isNotNull();
102+
103+
ResponseInputStream<GetObjectResponse> obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
104+
ResponseTransformer.toInputStream());
105+
106+
assertThat(ChecksumUtils.computeCheckSum(content.getBytes(StandardCharsets.UTF_8)))
107+
.isEqualTo(ChecksumUtils.computeCheckSum(obj));
108+
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
109+
}
83110
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3;
17+
18+
import java.util.Collection;
19+
import software.amazon.awssdk.annotations.SdkPreviewApi;
20+
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
22+
/**
23+
* A completed directory-based transfer.
24+
*
25+
* @see CompletedDirectoryUpload
26+
*/
27+
@SdkPublicApi
28+
@SdkPreviewApi
29+
public interface CompletedDirectoryTransfer extends CompletedTransfer {
30+
31+
/**
32+
* An immutable collection of failed transfers with error details, request metadata about each file that is failed to
33+
* transfer.
34+
*
35+
* <p>
36+
* Failed single object transfers can be retried by calling {@link S3TransferManager#uploadFile(UploadFileRequest)} or
37+
* {@link S3TransferManager#downloadFile(DownloadFileRequest)}.
38+
*
39+
* <pre>
40+
* {@code
41+
* // Retrying failed uploads if the exception is retryable
42+
* List<CompletableFuture<CompletedUpload>> futures =
43+
* completedDirectoryUpload.failedTransfers()
44+
* .stream()
45+
* .filter(failedSingleFileUpload -> isRetryable(failedSingleFileUpload.exception()))
46+
* .map(failedSingleFileUpload ->
47+
* tm.upload(failedSingleFileUpload.request()).completionFuture())
48+
* .collect(Collectors.toList());
49+
* CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
50+
* }
51+
* </pre>
52+
*
53+
* @return a list of failed transfers
54+
*/
55+
Collection<? extends FailedObjectTransfer> failedTransfers();
56+
}

0 commit comments

Comments
 (0)