Skip to content

Commit cffd8ea

Browse files
authored
Refactoring - separating CRT logic from S3 Transfer Manager code (#3985)
* Refactoring - separating CRT logic from S3 Transfer Manager code * Rename S3TransferManagerFactory so that it doesn't pop up in IDE suggestions
1 parent ee31237 commit cffd8ea

18 files changed

+918
-571
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fixed the issue where S3 Transfer Manager attempted to load AWS CRT classes when Java based S3 client was used. See [#3936](https://github.com/aws/aws-sdk-java-v2/issues/3936)."
6+
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import java.nio.charset.StandardCharsets;
2525
import java.nio.file.Files;
2626
import java.time.Duration;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.ScheduledExecutorService;
2730
import org.junit.jupiter.api.AfterAll;
2831
import org.junit.jupiter.api.BeforeAll;
2932
import org.junit.jupiter.api.Test;
3033
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
34+
import software.amazon.awssdk.core.waiters.AsyncWaiter;
3135
import software.amazon.awssdk.core.waiters.Waiter;
3236
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
3337
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
@@ -48,19 +52,22 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
4852
private static final long OBJ_SIZE = 24 * MB;
4953
private static File largeFile;
5054
private static File smallFile;
55+
private static ScheduledExecutorService executorService;
5156

5257
@BeforeAll
5358
public static void setup() throws Exception {
5459
createBucket(BUCKET);
5560
largeFile = new RandomTempFile(OBJ_SIZE);
5661
smallFile = new RandomTempFile(2 * MB);
62+
executorService = Executors.newScheduledThreadPool(3);
5763
}
5864

5965
@AfterAll
6066
public static void cleanup() {
6167
deleteBucketAndAllContents(BUCKET);
6268
largeFile.delete();
6369
smallFile.delete();
70+
executorService.shutdown();
6471
}
6572

6673
@Test
@@ -151,8 +158,13 @@ private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUplo
151158

152159
private void verifyMultipartUploadIdNotExist(ResumableFileUpload resumableFileUpload) {
153160
String multipartUploadId = resumableFileUpload.multipartUploadId().get();
154-
assertThatThrownBy(() -> s3Async.listParts(r -> r.uploadId(multipartUploadId).bucket(BUCKET).key(KEY)).join())
155-
.hasCauseInstanceOf(NoSuchUploadException.class);
161+
AsyncWaiter<ListPartsResponse> waiter = AsyncWaiter.builder(ListPartsResponse.class)
162+
.addAcceptor(WaiterAcceptor.successOnExceptionAcceptor(e -> e instanceof NoSuchUploadException))
163+
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
164+
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(1)))
165+
.scheduledExecutorService(executorService)
166+
.build();
167+
waiter.runAsync(() -> s3Async.listParts(r -> r.uploadId(multipartUploadId).bucket(BUCKET).key(KEY)));
156168
}
157169

158170
private static void waitUntilMultipartUploadExists() {

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3TransferManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2929
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3030
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
31-
import software.amazon.awssdk.transfer.s3.internal.DefaultS3TransferManager;
31+
import software.amazon.awssdk.transfer.s3.internal.TransferManagerFactory;
3232
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
3333
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
3434
import software.amazon.awssdk.transfer.s3.model.Copy;
@@ -678,7 +678,7 @@ static S3TransferManager create() {
678678
* Creates a default builder for {@link S3TransferManager}.
679679
*/
680680
static S3TransferManager.Builder builder() {
681-
return DefaultS3TransferManager.builder();
681+
return new TransferManagerFactory.DefaultBuilder();
682682
}
683683

684684
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3.internal;
17+
18+
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
19+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
20+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
21+
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.DEFAULT_FILE_UPLOAD_CHUNK_SIZE;
22+
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
23+
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.function.Consumer;
27+
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
29+
import software.amazon.awssdk.core.async.AsyncRequestBody;
30+
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
31+
import software.amazon.awssdk.crt.s3.ResumeToken;
32+
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
33+
import software.amazon.awssdk.services.s3.S3AsyncClient;
34+
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
35+
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
36+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
37+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
38+
import software.amazon.awssdk.transfer.s3.S3TransferManager;
39+
import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload;
40+
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
41+
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
42+
import software.amazon.awssdk.transfer.s3.model.FileUpload;
43+
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
44+
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
45+
import software.amazon.awssdk.utils.CompletableFutureUtils;
46+
import software.amazon.awssdk.utils.Logger;
47+
import software.amazon.awssdk.utils.Validate;
48+
49+
/**
50+
* An implementation of {@link S3TransferManager} that uses CRT-based S3 client under the hood.
51+
*/
52+
@SdkInternalApi
53+
class CrtS3TransferManager extends DelegatingS3TransferManager {
54+
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
55+
private final S3AsyncClient s3AsyncClient;
56+
57+
CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
58+
boolean isDefaultS3AsyncClient) {
59+
super(new GenericS3TransferManager(transferConfiguration, s3AsyncClient, isDefaultS3AsyncClient));
60+
this.s3AsyncClient = s3AsyncClient;
61+
}
62+
63+
@Override
64+
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
65+
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
66+
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
67+
68+
AsyncRequestBody requestBody =
69+
FileAsyncRequestBody.builder()
70+
.path(uploadFileRequest.source())
71+
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
72+
.build();
73+
74+
Consumer<SdkHttpExecutionAttributes.Builder> attachObservable =
75+
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable);
76+
77+
PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);
78+
79+
CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();
80+
81+
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody);
82+
progressUpdater.transferInitiated();
83+
requestBody = progressUpdater.wrapRequestBody(requestBody);
84+
progressUpdater.registerCompletion(returnFuture);
85+
86+
try {
87+
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");
88+
89+
CompletableFuture<PutObjectResponse> crtFuture =
90+
s3AsyncClient.putObject(putObjectRequest, requestBody);
91+
92+
// Forward upload cancellation to CRT future
93+
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
94+
95+
CompletableFutureUtils.forwardTransformedResultTo(crtFuture, returnFuture,
96+
r -> CompletedFileUpload.builder()
97+
.response(r)
98+
.build());
99+
} catch (Throwable throwable) {
100+
returnFuture.completeExceptionally(throwable);
101+
}
102+
103+
104+
return new CrtFileUpload(returnFuture, progressUpdater.progress(), observable, uploadFileRequest);
105+
}
106+
107+
private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
108+
boolean noResumeToken) {
109+
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
110+
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
111+
if (fileModified) {
112+
log.debug(() -> String.format("The file (%s) has been modified since "
113+
+ "the last pause. " +
114+
"The SDK will upload the requested object in bucket"
115+
+ " (%s) with key (%s) from "
116+
+ "the "
117+
+ "beginning.",
118+
uploadFileRequest.source(),
119+
putObjectRequest.bucket(),
120+
putObjectRequest.key()));
121+
resumableFileUpload.multipartUploadId()
122+
.ifPresent(id -> {
123+
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
124+
s3AsyncClient.abortMultipartUpload(
125+
AbortMultipartUploadRequest.builder()
126+
.bucket(putObjectRequest.bucket())
127+
.key(putObjectRequest.key())
128+
.uploadId(id)
129+
.build())
130+
.exceptionally(t -> {
131+
log.warn(() -> String.format("Failed to abort previous multipart upload "
132+
+ "(id: %s)"
133+
+ ". You may need to call "
134+
+ "S3AsyncClient#abortMultiPartUpload to "
135+
+ "free all storage consumed by"
136+
+ " all parts. ",
137+
id), t);
138+
return null;
139+
});
140+
});
141+
}
142+
143+
if (noResumeToken) {
144+
log.debug(() -> String.format("No resume token is found. " +
145+
"The SDK will upload the requested object in bucket"
146+
+ " (%s) with key (%s) from "
147+
+ "the beginning.",
148+
putObjectRequest.bucket(),
149+
putObjectRequest.key()));
150+
}
151+
152+
153+
return uploadFile(uploadFileRequest);
154+
}
155+
156+
@Override
157+
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
158+
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");
159+
160+
boolean fileModified = !fileNotModified(resumableFileUpload.fileLength(),
161+
resumableFileUpload.fileLastModified(),
162+
resumableFileUpload.uploadFileRequest().source());
163+
164+
boolean noResumeToken = !hasResumeToken(resumableFileUpload);
165+
166+
if (fileModified || noResumeToken) {
167+
return uploadFromBeginning(resumableFileUpload, fileModified, noResumeToken);
168+
}
169+
170+
return doResumeUpload(resumableFileUpload);
171+
}
172+
173+
private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
174+
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
175+
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
176+
ResumeToken resumeToken = crtResumeToken(resumableFileUpload);
177+
178+
Consumer<SdkHttpExecutionAttributes.Builder> attachResumeToken =
179+
b -> b.put(CRT_PAUSE_RESUME_TOKEN, resumeToken);
180+
181+
PutObjectRequest modifiedPutObjectRequest = attachSdkAttribute(putObjectRequest, attachResumeToken);
182+
183+
return uploadFile(uploadFileRequest.toBuilder()
184+
.putObjectRequest(modifiedPutObjectRequest)
185+
.build());
186+
}
187+
188+
private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUpload) {
189+
return new ResumeToken(new ResumeToken.PutResumeTokenBuilder()
190+
.withNumPartsCompleted(resumableFileUpload.transferredParts().orElse(0L))
191+
.withTotalNumParts(resumableFileUpload.totalParts().orElse(0L))
192+
.withPartSize(resumableFileUpload.partSizeInBytes().getAsLong())
193+
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
194+
}
195+
196+
private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
197+
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
198+
}
199+
200+
private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
201+
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
202+
SdkHttpExecutionAttributes modifiedAttributes =
203+
putObjectRequest.overrideConfiguration().map(o -> o.executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES))
204+
.map(b -> b.toBuilder().applyMutation(builderMutation).build())
205+
.orElseGet(() -> SdkHttpExecutionAttributes.builder().applyMutation(builderMutation).build());
206+
207+
Consumer<AwsRequestOverrideConfiguration.Builder> attachSdkHttpAttributes =
208+
b -> b.putExecutionAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES, modifiedAttributes);
209+
210+
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
211+
putObjectRequest.overrideConfiguration()
212+
.map(o -> o.toBuilder().applyMutation(attachSdkHttpAttributes).build())
213+
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
214+
.applyMutation(attachSdkHttpAttributes)
215+
.build());
216+
217+
return putObjectRequest.toBuilder()
218+
.overrideConfiguration(modifiedRequestOverrideConfig)
219+
.build();
220+
}
221+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3.internal;
17+
18+
import software.amazon.awssdk.annotations.SdkInternalApi;
19+
import software.amazon.awssdk.transfer.s3.S3TransferManager;
20+
import software.amazon.awssdk.transfer.s3.model.Copy;
21+
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
22+
import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
23+
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload;
24+
import software.amazon.awssdk.transfer.s3.model.Download;
25+
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
26+
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
27+
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
28+
import software.amazon.awssdk.transfer.s3.model.FileDownload;
29+
import software.amazon.awssdk.transfer.s3.model.FileUpload;
30+
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
31+
import software.amazon.awssdk.transfer.s3.model.Upload;
32+
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
33+
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
34+
import software.amazon.awssdk.transfer.s3.model.UploadRequest;
35+
36+
37+
/**
38+
* An {@link S3TransferManager} that just delegates to another {@link S3TransferManager}.
39+
*/
40+
@SdkInternalApi
41+
abstract class DelegatingS3TransferManager implements S3TransferManager {
42+
private final S3TransferManager delegate;
43+
44+
protected DelegatingS3TransferManager(S3TransferManager delegate) {
45+
this.delegate = delegate;
46+
}
47+
48+
@Override
49+
public Upload upload(UploadRequest uploadRequest) {
50+
return delegate.upload(uploadRequest);
51+
}
52+
53+
@Override
54+
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
55+
return delegate.uploadFile(uploadFileRequest);
56+
}
57+
58+
@Override
59+
public DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
60+
return delegate.uploadDirectory(uploadDirectoryRequest);
61+
}
62+
63+
@Override
64+
public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadRequest) {
65+
return delegate.download(downloadRequest);
66+
}
67+
68+
@Override
69+
public FileDownload downloadFile(DownloadFileRequest downloadRequest) {
70+
return delegate.downloadFile(downloadRequest);
71+
}
72+
73+
@Override
74+
public FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownload) {
75+
return delegate.resumeDownloadFile(resumableFileDownload);
76+
}
77+
78+
@Override
79+
public DirectoryDownload downloadDirectory(DownloadDirectoryRequest downloadDirectoryRequest) {
80+
return delegate.downloadDirectory(downloadDirectoryRequest);
81+
}
82+
83+
@Override
84+
public Copy copy(CopyRequest copyRequest) {
85+
return delegate.copy(copyRequest);
86+
}
87+
88+
@Override
89+
public void close() {
90+
delegate.close();
91+
}
92+
}

0 commit comments

Comments
 (0)