Skip to content

Commit 93879b5

Browse files
committed
Override split in FileAsyncRequestBody
1 parent 89fdb89 commit 93879b5

File tree

14 files changed

+450
-235
lines changed

14 files changed

+450
-235
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,11 +441,11 @@ static AsyncRequestBody empty() {
441441
* is 2MB and the default buffer size is 8MB.
442442
*
443443
* <p>
444-
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
445-
* subscriber right after it's initialized.
446-
* <p>
447-
* If content length is null, it is sent after the entire content for that chunk is buffered.
448-
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
444+
* By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
445+
* delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
446+
* the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
447+
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
448+
* interface overrides this method.
449449
*
450450
* @see AsyncRequestBodySplitConfiguration
451451
*/

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
@SdkPublicApi
2929
public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder<AsyncRequestBodySplitConfiguration.Builder,
3030
AsyncRequestBodySplitConfiguration> {
31+
32+
public static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L;
33+
public static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4;
3134
private final Long chunkSizeInBytes;
3235
private final Long bufferSizeInBytes;
3336

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.reactivestreams.Subscription;
3434
import software.amazon.awssdk.annotations.SdkInternalApi;
3535
import software.amazon.awssdk.core.async.AsyncRequestBody;
36+
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
37+
import software.amazon.awssdk.core.async.SdkPublisher;
3638
import software.amazon.awssdk.core.internal.util.Mimetype;
3739
import software.amazon.awssdk.core.internal.util.NoopSubscription;
3840
import software.amazon.awssdk.utils.Logger;
@@ -78,6 +80,32 @@ private FileAsyncRequestBody(DefaultBuilder builder) {
7880
Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead");
7981
}
8082

83+
@Override
84+
public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
85+
Validate.notNull(splitConfiguration, "splitConfiguration");
86+
return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split();
87+
}
88+
89+
public Path path() {
90+
return path;
91+
}
92+
93+
public long fileLength() {
94+
return fileLength;
95+
}
96+
97+
public int chunkSizeInBytes() {
98+
return chunkSizeInBytes;
99+
}
100+
101+
public long position() {
102+
return position;
103+
}
104+
105+
public long numBytesToRead() {
106+
return numBytesToRead;
107+
}
108+
81109
@Override
82110
public Optional<Long> contentLength() {
83111
return Optional.of(numBytesToRead);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration.DEFAULT_BUFFER_SIZE;
19+
import static software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration.DEFAULT_CHUNK_SIZE;
20+
21+
import java.nio.ByteBuffer;
22+
import java.nio.file.Path;
23+
import java.util.Optional;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
import org.reactivestreams.Subscriber;
28+
import software.amazon.awssdk.annotations.SdkInternalApi;
29+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
30+
import software.amazon.awssdk.core.async.AsyncRequestBody;
31+
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
32+
import software.amazon.awssdk.core.async.SdkPublisher;
33+
import software.amazon.awssdk.utils.Logger;
34+
import software.amazon.awssdk.utils.Validate;
35+
import software.amazon.awssdk.utils.async.SimplePublisher;
36+
37+
/**
38+
* A helper class to split a {@link FileAsyncRequestBody} to multiple smaller async request bodies. It ensures the buffer used to
39+
* be under the configured size via {@link AsyncRequestBodySplitConfiguration#bufferSizeInBytes()} by tracking the number of
40+
* concurrent ongoing {@link AsyncRequestBody}s.
41+
*/
42+
@SdkInternalApi
43+
public final class FileAsyncRequestBodySplitHelper {
44+
private static final Logger log = Logger.loggerFor(FileAsyncRequestBodySplitHelper.class);
45+
46+
private final AtomicBoolean isSendingRequestBody = new AtomicBoolean(false);
47+
private final AtomicLong remainingBytes;
48+
49+
private final long totalContentLength;
50+
private final Path path;
51+
private final int bufferPerAsyncRequestBody;
52+
private final long totalBufferSize;
53+
private final long chunkSize;
54+
55+
private volatile boolean isDone = false;
56+
57+
private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0);
58+
private AtomicInteger chunkIndex = new AtomicInteger(0);
59+
60+
public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
61+
AsyncRequestBodySplitConfiguration splitConfiguration) {
62+
Validate.notNull(asyncRequestBody, "asyncRequestBody");
63+
Validate.notNull(splitConfiguration, "splitConfiguration");
64+
Validate.isTrue(asyncRequestBody.contentLength().isPresent(), "Content length must be present", asyncRequestBody);
65+
this.totalContentLength = asyncRequestBody.contentLength().get();
66+
this.remainingBytes = new AtomicLong(totalContentLength);
67+
this.path = asyncRequestBody.path();
68+
this.chunkSize = splitConfiguration.chunkSizeInBytes() == null ?
69+
DEFAULT_CHUNK_SIZE : splitConfiguration.chunkSizeInBytes();
70+
this.totalBufferSize = splitConfiguration.bufferSizeInBytes() == null ? DEFAULT_BUFFER_SIZE :
71+
splitConfiguration.bufferSizeInBytes();
72+
this.bufferPerAsyncRequestBody = asyncRequestBody.chunkSizeInBytes();
73+
}
74+
75+
public SdkPublisher<AsyncRequestBody> split() {
76+
77+
SimplePublisher<AsyncRequestBody> simplePublisher = new SimplePublisher<>();
78+
79+
try {
80+
sendAsyncRequestBody(simplePublisher);
81+
} catch (Throwable throwable) {
82+
simplePublisher.error(throwable);
83+
}
84+
85+
return SdkPublisher.adapt(simplePublisher);
86+
}
87+
88+
private void sendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
89+
if (!isSendingRequestBody.compareAndSet(false, true)) {
90+
return;
91+
}
92+
93+
try {
94+
doSendAsyncRequestBody(simplePublisher);
95+
} finally {
96+
isSendingRequestBody.set(false);
97+
}
98+
}
99+
100+
private void doSendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
101+
while (true) {
102+
if (!shouldSendMore()) {
103+
break;
104+
}
105+
106+
AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher);
107+
simplePublisher.send(currentAsyncRequestBody);
108+
numAsyncRequestBodiesInFlight.incrementAndGet();
109+
checkCompletion(simplePublisher, currentAsyncRequestBody);
110+
}
111+
}
112+
113+
private void checkCompletion(SimplePublisher<AsyncRequestBody> simplePublisher, AsyncRequestBody currentAsyncRequestBody) {
114+
long remaining = remainingBytes.addAndGet(-currentAsyncRequestBody.contentLength().get());
115+
116+
if (remaining == 0) {
117+
isDone = true;
118+
simplePublisher.complete();
119+
}
120+
}
121+
122+
private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
123+
long position = chunkSize * chunkIndex.getAndIncrement();
124+
long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize);
125+
FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder()
126+
.path(path)
127+
.position(position)
128+
.numBytesToRead(numBytesToReadForThisChunk)
129+
.build();
130+
return new AsyncRequestBody() {
131+
132+
@Override
133+
public void subscribe(Subscriber<? super ByteBuffer> s) {
134+
fileAsyncRequestBody.doAfterOnComplete(() -> {
135+
numAsyncRequestBodiesInFlight.decrementAndGet();
136+
sendAsyncRequestBody(simplePublisher);
137+
}).subscribe(s);
138+
}
139+
140+
@Override
141+
public Optional<Long> contentLength() {
142+
return fileAsyncRequestBody.contentLength();
143+
}
144+
};
145+
}
146+
147+
/**
148+
* Should not send more if it's done OR sending next request body would exceed the total buffer size
149+
*/
150+
private boolean shouldSendMore() {
151+
if (isDone) {
152+
return false;
153+
}
154+
155+
long currentUsedBuffer = numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody;
156+
return currentUsedBuffer + bufferPerAsyncRequestBody < totalBufferSize;
157+
}
158+
159+
@SdkTestInternalApi
160+
AtomicInteger numAsyncRequestBodiesInFlight() {
161+
return numAsyncRequestBodiesInFlight;
162+
}
163+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18+
import static software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration.DEFAULT_BUFFER_SIZE;
19+
import static software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration.DEFAULT_CHUNK_SIZE;
20+
1821
import java.nio.ByteBuffer;
1922
import java.util.Optional;
2023
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,8 +44,6 @@
4144
@SdkInternalApi
4245
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
4346
private static final Logger log = Logger.loggerFor(SplittingPublisher.class);
44-
private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L;
45-
private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4;
4647
private final AsyncRequestBody upstreamPublisher;
4748
private final SplittingSubscriber splittingSubscriber;
4849
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody;
20+
21+
import java.io.IOException;
22+
import java.nio.file.Files;
23+
import java.nio.file.NoSuchFileException;
24+
import java.nio.file.Path;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.ScheduledFuture;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import org.junit.jupiter.api.AfterAll;
31+
import org.junit.jupiter.api.BeforeAll;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.ValueSource;
34+
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
35+
import software.amazon.awssdk.testutils.RandomTempFile;
36+
37+
public class FileAsyncRequestBodySplitHelperTest {
38+
39+
private static final int CHUNK_SIZE = 5;
40+
private static Path testFile;
41+
private static ScheduledExecutorService executor;
42+
43+
44+
@BeforeAll
45+
public static void setup() throws IOException {
46+
testFile = new RandomTempFile(2000).toPath();
47+
executor = Executors.newScheduledThreadPool(1);
48+
}
49+
50+
@AfterAll
51+
public static void teardown() throws IOException {
52+
try {
53+
Files.delete(testFile);
54+
} catch (NoSuchFileException e) {
55+
// ignore
56+
}
57+
executor.shutdown();
58+
}
59+
60+
@ParameterizedTest
61+
@ValueSource(ints = {CHUNK_SIZE, CHUNK_SIZE * 2 - 1, CHUNK_SIZE * 2})
62+
public void split_differentChunkSize_shouldSplitCorrectly(int chunkSize) throws Exception {
63+
long bufferSize = 55l;
64+
int chunkSizeInBytes = 10;
65+
FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder()
66+
.path(testFile)
67+
.chunkSizeInBytes(10)
68+
.build();
69+
AsyncRequestBodySplitConfiguration config =
70+
AsyncRequestBodySplitConfiguration.builder()
71+
.chunkSizeInBytes((long) chunkSize)
72+
.bufferSizeInBytes(55L)
73+
.build();
74+
FileAsyncRequestBodySplitHelper helper = new FileAsyncRequestBodySplitHelper(fileAsyncRequestBody, config);
75+
76+
AtomicInteger maxConcurrency = new AtomicInteger(0);
77+
ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(verifyConcurrentRequests(helper, maxConcurrency),
78+
1, 50, TimeUnit.MICROSECONDS);
79+
80+
verifyIndividualAsyncRequestBody(helper.split(), testFile, chunkSize);
81+
scheduledFuture.cancel(true);
82+
int expectedMaxConcurrency = (int) (bufferSize / chunkSizeInBytes);
83+
assertThat(maxConcurrency.get()).isLessThanOrEqualTo(expectedMaxConcurrency);
84+
}
85+
86+
private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper helper, AtomicInteger maxConcurrency) {
87+
return () -> {
88+
int concurrency = helper.numAsyncRequestBodiesInFlight().get();
89+
90+
if (concurrency > maxConcurrency.get()) {
91+
maxConcurrency.set(concurrency);
92+
}
93+
assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10);
94+
};
95+
}
96+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
21+
import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody;
2122
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
2223

23-
import io.reactivex.Flowable;
2424
import java.io.ByteArrayOutputStream;
2525
import java.io.FileInputStream;
2626
import java.io.IOException;
2727
import java.nio.ByteBuffer;
28-
import java.nio.channels.AsynchronousFileChannel;
2928
import java.nio.charset.StandardCharsets;
3029
import java.nio.file.Files;
3130
import java.nio.file.NoSuchFileException;
@@ -39,10 +38,12 @@
3938
import org.junit.jupiter.api.AfterEach;
4039
import org.junit.jupiter.api.BeforeEach;
4140
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.params.ParameterizedTest;
42+
import org.junit.jupiter.params.provider.ValueSource;
4243
import org.reactivestreams.Subscriber;
4344
import org.reactivestreams.Subscription;
4445
import software.amazon.awssdk.core.async.AsyncRequestBody;
45-
import software.amazon.awssdk.http.async.SimpleSubscriber;
46+
import software.amazon.awssdk.core.async.SdkPublisher;
4647
import software.amazon.awssdk.testutils.RandomTempFile;
4748
import software.amazon.awssdk.utils.BinaryUtils;
4849

0 commit comments

Comments
 (0)