Skip to content

Commit 0b89885

Browse files
committed
Refactoring
1 parent 9b661d8 commit 0b89885

File tree

11 files changed

+238
-373
lines changed

11 files changed

+238
-373
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -420,24 +420,20 @@ static AsyncRequestBody empty() {
420420
* @param maxMemoryUsageInBytes the max memory the SDK will use to buffer the content
421421
* @return SplitAsyncRequestBodyResult
422422
*/
423-
default SplitAsyncRequestBodyResponse split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
423+
default SdkPublisher<AsyncRequestBody> split(long chunkSizeInBytes, long maxMemoryUsageInBytes) {
424424
Validate.isPositive(chunkSizeInBytes, "chunkSizeInBytes");
425425
Validate.isPositive(maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
426426

427-
if (!this.contentLength().isPresent()) {
427+
if (!contentLength().isPresent()) {
428428
Validate.isTrue(maxMemoryUsageInBytes >= chunkSizeInBytes,
429429
"maxMemoryUsageInBytes must be larger than or equal to " +
430430
"chunkSizeInBytes if the content length is unknown");
431431
}
432432

433-
CompletableFuture<Void> future = new CompletableFuture<>();
434-
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
435-
.asyncRequestBody(this)
436-
.chunkSizeInBytes(chunkSizeInBytes)
437-
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
438-
.resultFuture(future)
439-
.build();
440-
441-
return SplitAsyncRequestBodyResponse.create(splittingPublisher, future);
433+
return SplittingPublisher.builder()
434+
.asyncRequestBody(this)
435+
.chunkSizeInBytes(chunkSizeInBytes)
436+
.maxMemoryUsageInBytes(maxMemoryUsageInBytes)
437+
.build();
442438
}
443439
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponse.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,12 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
4545
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
4646
private final long chunkSizeInBytes;
4747
private final long maxMemoryUsageInBytes;
48-
private final CompletableFuture<Void> future;
4948

5049
private SplittingPublisher(Builder builder) {
5150
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
5251
this.chunkSizeInBytes = Validate.isPositive(builder.chunkSizeInBytes, "chunkSizeInBytes");
5352
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
5453
this.maxMemoryUsageInBytes = Validate.isPositive(builder.maxMemoryUsageInBytes, "maxMemoryUsageInBytes");
55-
this.future = builder.future;
56-
57-
// We need to cancel upstream subscription if the future gets cancelled.
58-
future.whenComplete((r, t) -> {
59-
if (t != null) {
60-
if (splittingSubscriber.upstreamSubscription != null) {
61-
log.trace(() -> "Cancelling subscription because return future completed exceptionally ", t);
62-
splittingSubscriber.upstreamSubscription.cancel();
63-
}
64-
}
65-
});
6654
}
6755

6856
public static Builder builder() {
@@ -117,16 +105,20 @@ public void onNext(ByteBuffer byteBuffer) {
117105
byteBufferSizeHint = byteBuffer.remaining();
118106

119107
while (true) {
108+
109+
if (!byteBuffer.hasRemaining()) {
110+
break;
111+
}
112+
120113
int amountRemainingInChunk = amountRemainingInChunk();
121114

122115
// If we have fulfilled this chunk,
123116
// complete the current body
124117
if (amountRemainingInChunk == 0) {
125118
completeCurrentBodyAndCreateNewIfNeeded(byteBuffer);
119+
amountRemainingInChunk = amountRemainingInChunk();
126120
}
127121

128-
amountRemainingInChunk = amountRemainingInChunk();
129-
130122
// If the current ByteBuffer < this chunk, send it as-is
131123
if (amountRemainingInChunk > byteBuffer.remaining()) {
132124
currentBody.send(byteBuffer.duplicate());
@@ -154,29 +146,28 @@ public void onNext(ByteBuffer byteBuffer) {
154146

155147
private void completeCurrentBodyAndCreateNewIfNeeded(ByteBuffer byteBuffer) {
156148
completeCurrentBody();
149+
int currentChunk = chunkNumber.incrementAndGet();
150+
boolean shouldCreateNewDownstreamRequestBody;
151+
Long dataRemaining = totalDataRemaining();
157152

158-
if (shouldCreateNewDownstreamRequestBody(byteBuffer)) {
159-
int currentChunk = chunkNumber.incrementAndGet();
160-
long chunkSize = calculateChunkSize(totalDataRemaining());
161-
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
153+
if (upstreamSize == null) {
154+
shouldCreateNewDownstreamRequestBody = !upstreamComplete || byteBuffer.hasRemaining();
155+
} else {
156+
shouldCreateNewDownstreamRequestBody = dataRemaining != null && dataRemaining > 0;
162157
}
163-
}
164-
165158

166-
/**
167-
* If content length is known, we should create new DownstreamRequestBody if there's remaining data.
168-
* If content length is unknown, we should create new DownstreamRequestBody if upstream is not completed yet.
169-
*/
170-
private boolean shouldCreateNewDownstreamRequestBody(ByteBuffer byteBuffer) {
171-
return !upstreamComplete || byteBuffer.remaining() > 0;
159+
if (shouldCreateNewDownstreamRequestBody) {
160+
long chunkSize = calculateChunkSize(dataRemaining);
161+
currentBody = initializeNextDownstreamBody(upstreamSize != null, chunkSize, currentChunk);
162+
}
172163
}
173164

174165
private int amountRemainingInChunk() {
175166
return Math.toIntExact(currentBody.maxLength - currentBody.transferredLength);
176167
}
177168

178169
private void completeCurrentBody() {
179-
log.debug(() -> "completeCurrentBody");
170+
log.debug(() -> "completeCurrentBody for chunk " + chunkNumber.get());
180171
currentBody.complete();
181172
if (upstreamSize == null) {
182173
sendCurrentBody(currentBody);
@@ -188,7 +179,7 @@ public void onComplete() {
188179
upstreamComplete = true;
189180
log.trace(() -> "Received onComplete()");
190181
completeCurrentBody();
191-
downstreamPublisher.complete().thenRun(() -> future.complete(null));
182+
downstreamPublisher.complete();
192183
}
193184

194185
@Override
@@ -197,7 +188,6 @@ public void onError(Throwable t) {
197188
}
198189

199190
private void sendCurrentBody(AsyncRequestBody body) {
200-
log.debug(() -> "sendCurrentBody");
201191
downstreamPublisher.send(body).exceptionally(t -> {
202192
downstreamPublisher.error(t);
203193
return null;
@@ -339,18 +329,6 @@ public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
339329
return this;
340330
}
341331

342-
/**
343-
* Sets the result future. The future will be completed when all request bodies
344-
* have been sent.
345-
*
346-
* @param future The new future value.
347-
* @return This object for method chaining.
348-
*/
349-
public Builder resultFuture(CompletableFuture<Void> future) {
350-
this.future = future;
351-
return this;
352-
}
353-
354332
public SplittingPublisher build() {
355333
return new SplittingPublisher(this);
356334
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SplitAsyncRequestBodyResponseTest.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@
3131
import java.util.Optional;
3232
import java.util.UUID;
3333
import java.util.concurrent.CompletableFuture;
34-
import java.util.concurrent.ExecutionException;
35-
import java.util.concurrent.ThreadLocalRandom;
3634
import java.util.concurrent.TimeUnit;
37-
import java.util.concurrent.TimeoutException;
3835
import java.util.concurrent.atomic.AtomicInteger;
3936
import org.apache.commons.lang3.RandomStringUtils;
4037
import org.junit.jupiter.api.AfterAll;
@@ -45,7 +42,6 @@
4542
import org.reactivestreams.Subscriber;
4643
import org.reactivestreams.Subscription;
4744
import software.amazon.awssdk.core.async.AsyncRequestBody;
48-
import software.amazon.awssdk.testutils.RandomTempFile;
4945
import software.amazon.awssdk.utils.BinaryUtils;
5046

5147
public class SplittingPublisherTest {
@@ -87,26 +83,6 @@ void differentChunkSize_byteArrayShouldSplitAsyncRequestBodyCorrectly(int chunkS
8783
verifySplitContent(AsyncRequestBody.fromBytes(CONTENT), chunkSize);
8884
}
8985

90-
91-
@Test
92-
void cancelFuture_shouldCancelUpstream() throws IOException {
93-
CompletableFuture<Void> future = new CompletableFuture<>();
94-
TestAsyncRequestBody asyncRequestBody = new TestAsyncRequestBody();
95-
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
96-
.resultFuture(future)
97-
.asyncRequestBody(asyncRequestBody)
98-
.chunkSizeInBytes(CHUNK_SIZE)
99-
.maxMemoryUsageInBytes(10L)
100-
.build();
101-
102-
OnlyRequestOnceSubscriber downstreamSubscriber = new OnlyRequestOnceSubscriber();
103-
splittingPublisher.subscribe(downstreamSubscriber);
104-
105-
future.completeExceptionally(new RuntimeException("test"));
106-
assertThat(asyncRequestBody.cancelled).isTrue();
107-
assertThat(downstreamSubscriber.asyncRequestBodies.size()).isEqualTo(1);
108-
}
109-
11086
@Test
11187
void contentLengthNotPresent_shouldHandle() throws Exception {
11288
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -117,7 +93,6 @@ public Optional<Long> contentLength() {
11793
}
11894
};
11995
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
120-
.resultFuture(future)
12196
.asyncRequestBody(asyncRequestBody)
12297
.chunkSizeInBytes(CHUNK_SIZE)
12398
.maxMemoryUsageInBytes(10L)
@@ -159,11 +134,8 @@ public Optional<Long> contentLength() {
159134

160135

161136
private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception {
162-
CompletableFuture<Void> future = new CompletableFuture<>();
163137
SplittingPublisher splittingPublisher = SplittingPublisher.builder()
164-
.resultFuture(future)
165138
.asyncRequestBody(asyncRequestBody)
166-
.resultFuture(future)
167139
.chunkSizeInBytes(chunkSize)
168140
.maxMemoryUsageInBytes((long) chunkSize * 4)
169141
.build();
@@ -194,7 +166,6 @@ private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int ch
194166
assertThat(actualBytes).isEqualTo(expected);
195167
};
196168
}
197-
assertThat(future).isCompleted();
198169
}
199170

200171
private static class TestAsyncRequestBody implements AsyncRequestBody {

services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3MultipartClientPutObjectIntegrationTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest
5252

5353
private static final String TEST_BUCKET = temporaryBucketName(S3MultipartClientPutObjectIntegrationTest.class);
5454
private static final String TEST_KEY = "testfile.dat";
55-
private static final int OBJ_SIZE = 31 * 1024 * 1024;
55+
private static final int OBJ_SIZE = 19 * 1024 * 1024;
5656

5757
private static File testFile;
5858
private static S3AsyncClient mpuS3Client;
@@ -90,7 +90,6 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
9090
}
9191

9292
@Test
93-
@Timeout(value = 30, unit = SECONDS)
9493
void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception {
9594
byte[] bytes = RandomStringUtils.randomAscii(OBJ_SIZE).getBytes(Charset.defaultCharset());
9695
AsyncRequestBody body = AsyncRequestBody.fromBytes(bytes);
@@ -105,11 +104,9 @@ void putObject_byteAsyncRequestBody_objectSentCorrectly() throws Exception {
105104
}
106105

107106
@Test
108-
@Timeout(value = 30, unit = SECONDS)
109107
void putObject_unknownContentLength_objectSentCorrectly() throws Exception {
110108
AsyncRequestBody body = FileAsyncRequestBody.builder()
111109
.path(testFile.toPath())
112-
//.chunkSizeInBytes(2 * 1024 * 1024)
113110
.build();
114111
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), new AsyncRequestBody() {
115112
@Override

services/s3/src/it/resources/log4j2.properties

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ rootLogger.appenderRef.stdout.ref = ConsoleAppender
2525

2626
# Uncomment below to enable more specific logging
2727
#
28-
logger.sdk.name = software.amazon.awssdk
29-
logger.sdk.level = debug
28+
#logger.sdk.name = software.amazon.awssdk
29+
#logger.sdk.level = debug
3030
#
31-
logger.request.name = software.amazon.awssdk.core.internal.async.SplittingPublisher
32-
logger.request.level = trace
31+
#logger.request.name = software.amazon.awssdk.request
32+
#logger.request.level = debug
3333
#
3434
#logger.apache.name = org.apache.http.wire
3535
#logger.apache.level = debug

0 commit comments

Comments
 (0)