Skip to content

Commit 9b661d8

Browse files
committed
Support uploading with unknown content length
1 parent e0b4bfc commit 9b661d8

File tree

10 files changed

+535
-78
lines changed

10 files changed

+535
-78
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,23 +120,28 @@ public void onNext(ByteBuffer byteBuffer) {
120120
int amountRemainingInChunk = amountRemainingInChunk();
121121

122122
// If we have fulfilled this chunk,
123-
// we should create a new DownstreamBody if needed
123+
// complete the current body
124124
if (amountRemainingInChunk == 0) {
125-
completeCurrentBody();
126-
127-
if (shouldCreateNewDownstreamRequestBody(byteBuffer)) {
128-
int currentChunk = chunkNumber.incrementAndGet();
129-
long chunkSize = calculateChunkSize(totalDataRemaining());
130-
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
131-
}
125+
completeCurrentBodyAndCreateNewIfNeeded(byteBuffer);
132126
}
133127

134128
amountRemainingInChunk = amountRemainingInChunk();
135-
if (amountRemainingInChunk >= byteBuffer.remaining()) {
129+
130+
// If the current ByteBuffer < this chunk, send it as-is
131+
if (amountRemainingInChunk > byteBuffer.remaining()) {
132+
currentBody.send(byteBuffer.duplicate());
133+
break;
134+
}
135+
136+
// If the current ByteBuffer == this chunk, send it as-is and
137+
// complete the current body
138+
if (amountRemainingInChunk == byteBuffer.remaining()) {
136139
currentBody.send(byteBuffer.duplicate());
140+
completeCurrentBodyAndCreateNewIfNeeded(byteBuffer);
137141
break;
138142
}
139143

144+
// If the current ByteBuffer > this chunk, split this ByteBuffer
140145
ByteBuffer firstHalf = byteBuffer.duplicate();
141146
int newLimit = firstHalf.position() + amountRemainingInChunk;
142147
firstHalf.limit(newLimit);
@@ -147,6 +152,16 @@ public void onNext(ByteBuffer byteBuffer) {
147152
maybeRequestMoreUpstreamData();
148153
}
149154

155+
private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
156+
completeCurrentBody();
157+
158+
if (shouldCreateNewDownstreamRequestBody(byteBuffer)) {
159+
int currentChunk = chunkNumber.incrementAndGet();
160+
long chunkSize = calculateChunkSize(totalDataRemaining());
161+
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
162+
}
163+
}
164+
150165

151166
/**
152167
* If content length is known, we should create new DownstreamRequestBody if there's remaining data.
@@ -161,6 +176,7 @@ private int amountRemainingInChunk() {
161176
}
162177

163178
private void completeCurrentBody() {
179+
log.debug(() -> "completeCurrentBody");
164180
currentBody.complete();
165181
if (upstreamSize == null) {
166182
sendCurrentBody(currentBody);
@@ -181,6 +197,7 @@ public void onError(Throwable t) {
181197
}
182198

183199
private void sendCurrentBody(AsyncRequestBody body) {
200+
log.debug(() -> "sendCurrentBody");
184201
downstreamPublisher.send(body).exceptionally(t -> {
185202
downstreamPublisher.error(t);
186203
return null;
@@ -206,7 +223,7 @@ private void maybeRequestMoreUpstreamData() {
206223
}
207224

208225
private boolean shouldRequestMoreData(long buffered) {
209-
return buffered == 0 || buffered + byteBufferSizeHint < maxMemoryUsageInBytes;
226+
return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes;
210227
}
211228

212229
private Long totalDataRemaining() {
@@ -240,7 +257,7 @@ public Optional<Long> contentLength() {
240257
}
241258

242259
public void send(ByteBuffer data) {
243-
log.trace(() -> "Sending bytebuffer " + data);
260+
log.trace(() -> String.format("Sending bytebuffer %s to chunk %d", data, chunkNumber));
244261
int length = data.remaining();
245262
transferredLength += length;
246263
addDataBuffered(length);

services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,23 @@
2222

2323
import java.io.ByteArrayInputStream;
2424
import java.io.File;
25+
import java.nio.ByteBuffer;
2526
import java.nio.charset.Charset;
2627
import java.nio.file.Files;
28+
import java.util.Optional;
2729
import java.util.UUID;
2830
import java.util.concurrent.ThreadLocalRandom;
31+
import java.util.concurrent.TimeUnit;
2932
import org.apache.commons.lang3.RandomStringUtils;
3033
import org.assertj.core.api.Assertions;
3134
import org.junit.jupiter.api.AfterAll;
3235
import org.junit.jupiter.api.BeforeAll;
3336
import org.junit.jupiter.api.Test;
3437
import org.junit.jupiter.api.Timeout;
38+
import org.reactivestreams.Subscriber;
3539
import software.amazon.awssdk.core.ResponseInputStream;
3640
import software.amazon.awssdk.core.async.AsyncRequestBody;
41+
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3742
import software.amazon.awssdk.core.sync.ResponseTransformer;
3843
import software.amazon.awssdk.services.s3.S3AsyncClient;
3944
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
@@ -42,11 +47,12 @@
4247
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
4348
import software.amazon.awssdk.testutils.RandomTempFile;
4449

50+
@Timeout(value = 30, unit = SECONDS)
4551
public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTestBase {
4652

4753
private static final String TEST_BUCKET = temporaryBucketName(S3MultipartClientPutObjectIntegrationTest.class);
4854
private static final String TEST_KEY = "testfile.dat";
49-
private static final int OBJ_SIZE = 19 * 1024 * 1024;
55+
private static final int OBJ_SIZE = 31 * 1024 * 1024;
5056

5157
private static File testFile;
5258
private static S3AsyncClient mpuS3Client;
@@ -71,7 +77,6 @@ public static void teardown() throws Exception {
7177
}
7278

7379
@Test
74-
@Timeout(value = 20, unit = SECONDS)
7580
void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
7681
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
7782
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();
@@ -99,4 +104,31 @@ void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception {
99104
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
100105
}
101106

107+
@Test
108+
@Timeout(value = 30, unit = SECONDS)
109+
void putObject_unknownContentLength_objectSentCorrectly() throws Exception {
110+
AsyncRequestBody body = FileAsyncRequestBody.builder()
111+
.path(testFile.toPath())
112+
//.chunkSizeInBytes(2 * 1024 * 1024)
113+
.build();
114+
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), new AsyncRequestBody() {
115+
@Override
116+
public Optional<Long> contentLength() {
117+
return Optional.empty();
118+
}
119+
120+
@Override
121+
public void subscribe(Subscriber<? super ByteBuffer> s) {
122+
body.subscribe(s);
123+
}
124+
}).get(30, SECONDS);
125+
126+
ResponseInputStream<GetObjectResponse> objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
127+
ResponseTransformer.toInputStream());
128+
129+
assertThat(objContent.response().contentLength()).isEqualTo(testFile.length());
130+
byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));
131+
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
132+
}
133+
102134
}

services/s3/src/it/resources/log4j2.properties

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ rootLogger.appenderRef.stdout.ref = ConsoleAppender
2525

2626
# Uncomment below to enable more specific logging
2727
#
28-
#logger.sdk.name = software.amazon.awssdk
29-
#logger.sdk.level = debug
28+
logger.sdk.name = software.amazon.awssdk
29+
logger.sdk.level = debug
3030
#
31-
#logger.request.name = software.amazon.awssdk.request
32-
#logger.request.level = debug
31+
logger.request.name = software.amazon.awssdk.core.internal.async.SplittingPublisher
32+
logger.request.level = trace
3333
#
3434
#logger.apache.name = org.apache.http.wire
3535
#logger.apache.level = debug
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.multipart;
17+
18+
19+
import static software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils.toAbortMultipartUploadRequest;
20+
21+
import java.util.Collection;
22+
import java.util.Queue;
23+
import java.util.concurrent.CompletableFuture;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.core.async.AsyncRequestBody;
26+
import software.amazon.awssdk.services.s3.S3AsyncClient;
27+
import software.amazon.awssdk.services.s3.model.CompletedPart;
28+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
29+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
30+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
31+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
32+
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
33+
import software.amazon.awssdk.utils.CompletableFutureUtils;
34+
import software.amazon.awssdk.utils.Logger;
35+
36+
/**
37+
* A base class contains common logic used by {@link UploadWithUnknownContentLengthHelper} and {@link UploadWithKnownContentLengthHelper}.
38+
*/
39+
@SdkInternalApi
40+
public abstract class BaseMultipartUploadHelper {
41+
private static final Logger log = Logger.loggerFor(BaseMultipartUploadHelper.class);
42+
43+
private final S3AsyncClient s3AsyncClient;
44+
private final long partSizeInBytes;
45+
private final GenericMultipartHelper<PutObjectRequest, PutObjectResponse> genericMultipartHelper;
46+
47+
private final long maxMemoryUsageInBytes;
48+
private final long multipartUploadThresholdInBytes;
49+
50+
public BaseMultipartUploadHelper(S3AsyncClient s3AsyncClient,
51+
long partSizeInBytes,
52+
long multipartUploadThresholdInBytes,
53+
long maxMemoryUsageInBytes) {
54+
this.s3AsyncClient = s3AsyncClient;
55+
this.partSizeInBytes = partSizeInBytes;
56+
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
57+
SdkPojoConversionUtils::toAbortMultipartUploadRequest,
58+
SdkPojoConversionUtils::toPutObjectResponse);
59+
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
60+
this.multipartUploadThresholdInBytes = multipartUploadThresholdInBytes;
61+
}
62+
63+
CompletableFuture<CreateMultipartUploadResponse> createMultipartUpload(PutObjectRequest putObjectRequest, CompletableFuture<PutObjectResponse> returnFuture) {
64+
CreateMultipartUploadRequest request = SdkPojoConversionUtils.toCreateMultipartUploadRequest(putObjectRequest);
65+
CompletableFuture<CreateMultipartUploadResponse> createMultipartUploadFuture =
66+
s3AsyncClient.createMultipartUpload(request);
67+
68+
// Ensure cancellations are forwarded to the createMultipartUploadFuture future
69+
CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture);
70+
return createMultipartUploadFuture;
71+
}
72+
73+
void completeMultipartUpload(CompletableFuture<PutObjectResponse> returnFuture,
74+
String uploadId,
75+
CompletedPart[] completedParts,
76+
PutObjectRequest putObjectRequest,
77+
Collection<CompletableFuture<CompletedPart>> futures) {
78+
CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(new CompletableFuture[0]))
79+
.thenCompose(ignore -> genericMultipartHelper.completeMultipartUpload(putObjectRequest,
80+
uploadId,
81+
completedParts))
82+
.handle(genericMultipartHelper.handleExceptionOrResponse(putObjectRequest, returnFuture,
83+
uploadId))
84+
.exceptionally(throwable -> {
85+
genericMultipartHelper.handleException(returnFuture, () -> "Unexpected exception occurred",
86+
throwable);
87+
return null;
88+
});
89+
}
90+
91+
static void cancelingOtherOngoingRequests(Collection<CompletableFuture<CompletedPart>> futures, Throwable t) {
92+
log.trace(() -> "cancelling other ongoing requests " + futures.size());
93+
futures.forEach(f -> f.completeExceptionally(t));
94+
}
95+
96+
static CompletedPart convertUploadPartResponse(Queue<CompletedPart> completedParts,
97+
Integer partNumber,
98+
UploadPartResponse uploadPartResponse) {
99+
CompletedPart completedPart = SdkPojoConversionUtils.toCompletedPart(uploadPartResponse, partNumber);
100+
101+
completedParts.add(completedPart);
102+
return completedPart;
103+
}
104+
105+
void uploadInOneChunk(PutObjectRequest putObjectRequest,
106+
AsyncRequestBody asyncRequestBody,
107+
CompletableFuture<PutObjectResponse> returnFuture) {
108+
CompletableFuture<PutObjectResponse> putObjectResponseCompletableFuture = s3AsyncClient.putObject(putObjectRequest,
109+
asyncRequestBody);
110+
CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectResponseCompletableFuture);
111+
CompletableFutureUtils.forwardResultTo(putObjectResponseCompletableFuture, returnFuture);
112+
}
113+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

18+
import java.util.Collection;
19+
import java.util.Comparator;
1820
import java.util.concurrent.CompletableFuture;
1921
import java.util.concurrent.CompletionException;
2022
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -79,13 +81,9 @@ public int determinePartCount(long contentLength, long partSize) {
7981
}
8082

8183
public CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(
82-
RequestT request, String uploadId, AtomicReferenceArray<CompletedPart> completedParts) {
84+
RequestT request, String uploadId, CompletedPart[] parts) {
8385
log.debug(() -> String.format("Sending completeMultipartUploadRequest, uploadId: %s",
8486
uploadId));
85-
CompletedPart[] parts =
86-
IntStream.range(0, completedParts.length())
87-
.mapToObj(completedParts::get)
88-
.toArray(CompletedPart[]::new);
8987
CompleteMultipartUploadRequest completeMultipartUploadRequest =
9088
CompleteMultipartUploadRequest.builder()
9189
.bucket(request.getValueForField("Bucket", String.class).get())
@@ -99,6 +97,15 @@ public CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUploa
9997
return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest);
10098
}
10199

100+
public CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(
101+
RequestT request, String uploadId, AtomicReferenceArray<CompletedPart> completedParts) {
102+
CompletedPart[] parts =
103+
IntStream.range(0, completedParts.length())
104+
.mapToObj(completedParts::get)
105+
.toArray(CompletedPart[]::new);
106+
return completeMultipartUpload(request, uploadId, parts);
107+
}
108+
102109
public BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(
103110
RequestT request,
104111
CompletableFuture<ResponseT> returnFuture,

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ public class MultipartS3AsyncClient extends DelegatingS3AsyncClient {
3434
private static final long DEFAULT_THRESHOLD = 8L * 1024 * 1024;
3535

3636
private static final long DEFAULT_MAX_MEMORY = DEFAULT_PART_SIZE_IN_BYTES * 2;
37-
private final MultipartUploadHelper mpuHelper;
37+
private final UploadObjectHelper mpuHelper;
3838
private final CopyObjectHelper copyObjectHelper;
3939

4040
public MultipartS3AsyncClient(S3AsyncClient delegate) {
4141
super(delegate);
4242
// TODO: pass a config object to the upload helper instead
43-
mpuHelper = new MultipartUploadHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD, DEFAULT_MAX_MEMORY);
43+
mpuHelper = new UploadObjectHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD, DEFAULT_MAX_MEMORY);
4444
copyObjectHelper = new CopyObjectHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD);
4545
}
4646

0 commit comments

Comments
 (0)