Skip to content

Commit 59288f9

Browse files
committed
Override split in FileAsyncRequestBody
1 parent 89fdb89 commit 59288f9

File tree

14 files changed

+492
-305
lines changed

14 files changed

+492
-305
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -441,22 +441,18 @@ static AsyncRequestBody empty() {
441441
* is 2MB and the default buffer size is 8MB.
442442
*
443443
* <p>
444-
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
445-
* subscriber right after it's initialized.
446-
* <p>
447-
* If content length is null, it is sent after the entire content for that chunk is buffered.
448-
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
444+
* By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
445+
* delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
446+
* the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
447+
* than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
448+
* interface overrides this method.
449449
*
450450
* @see AsyncRequestBodySplitConfiguration
451451
*/
452452
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
453453
Validate.notNull(splitConfiguration, "splitConfiguration");
454454

455-
return SplittingPublisher.builder()
456-
.asyncRequestBody(this)
457-
.chunkSizeInBytes(splitConfiguration.chunkSizeInBytes())
458-
.bufferSizeInBytes(splitConfiguration.bufferSizeInBytes())
459-
.build();
455+
return new SplittingPublisher(this, splitConfiguration);
460456
}
461457

462458
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBodySplitConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@
2828
@SdkPublicApi
2929
public final class AsyncRequestBodySplitConfiguration implements ToCopyableBuilder<AsyncRequestBodySplitConfiguration.Builder,
3030
AsyncRequestBodySplitConfiguration> {
31+
private static final long DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024L;
32+
private static final long DEFAULT_BUFFER_SIZE = DEFAULT_CHUNK_SIZE * 4;
33+
private static final AsyncRequestBodySplitConfiguration DEFAULT_CONFIG = builder()
34+
.bufferSizeInBytes(DEFAULT_BUFFER_SIZE)
35+
.chunkSizeInBytes(DEFAULT_CHUNK_SIZE)
36+
.build();
3137
private final Long chunkSizeInBytes;
3238
private final Long bufferSizeInBytes;
3339

@@ -36,6 +42,10 @@ private AsyncRequestBodySplitConfiguration(DefaultBuilder builder) {
3642
this.bufferSizeInBytes = Validate.isPositiveOrNull(builder.bufferSizeInBytes, "bufferSizeInBytes");
3743
}
3844

45+
public static AsyncRequestBodySplitConfiguration defaultConfiguration() {
46+
return DEFAULT_CONFIG;
47+
}
48+
3949
/**
4050
* The configured chunk size for each divided {@link AsyncRequestBody}.
4151
*/

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.reactivestreams.Subscription;
3434
import software.amazon.awssdk.annotations.SdkInternalApi;
3535
import software.amazon.awssdk.core.async.AsyncRequestBody;
36+
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
37+
import software.amazon.awssdk.core.async.SdkPublisher;
3638
import software.amazon.awssdk.core.internal.util.Mimetype;
3739
import software.amazon.awssdk.core.internal.util.NoopSubscription;
3840
import software.amazon.awssdk.utils.Logger;
@@ -78,6 +80,32 @@ private FileAsyncRequestBody(DefaultBuilder builder) {
7880
Validate.isNotNegative(builder.numBytesToRead, "numBytesToRead");
7981
}
8082

83+
@Override
84+
public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
85+
Validate.notNull(splitConfiguration, "splitConfiguration");
86+
return new FileAsyncRequestBodySplitHelper(this, splitConfiguration).split();
87+
}
88+
89+
public Path path() {
90+
return path;
91+
}
92+
93+
public long fileLength() {
94+
return fileLength;
95+
}
96+
97+
public int chunkSizeInBytes() {
98+
return chunkSizeInBytes;
99+
}
100+
101+
public long position() {
102+
return position;
103+
}
104+
105+
public long numBytesToRead() {
106+
return numBytesToRead;
107+
}
108+
81109
@Override
82110
public Optional<Long> contentLength() {
83111
return Optional.of(numBytesToRead);
@@ -97,7 +125,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
97125
// We need to synchronize here because the subscriber could call
98126
// request() from within onSubscribe which would potentially
99127
// trigger onNext before onSubscribe is finished.
100-
Subscription subscription = new FileSubscription(path, channel, s, chunkSizeInBytes, position, numBytesToRead);
128+
Subscription subscription = new FileSubscription(channel, s);
101129

102130
synchronized (subscription) {
103131
s.onSubscribe(subscription);
@@ -214,13 +242,11 @@ public FileAsyncRequestBody build() {
214242
/**
215243
* Reads the file for one subscriber.
216244
*/
217-
private static final class FileSubscription implements Subscription {
218-
private final Path path;
245+
private final class FileSubscription implements Subscription {
219246
private final AsynchronousFileChannel inputChannel;
220247
private final Subscriber<? super ByteBuffer> subscriber;
221-
private final int chunkSize;
222248

223-
private final AtomicLong position;
249+
private final AtomicLong currentPosition;
224250
private final AtomicLong remainingBytes;
225251
private final long sizeAtStart;
226252
private final FileTime modifiedTimeAtStart;
@@ -229,20 +255,14 @@ private static final class FileSubscription implements Subscription {
229255
private volatile boolean done = false;
230256
private final Object lock = new Object();
231257

232-
private FileSubscription(Path path,
233-
AsynchronousFileChannel inputChannel,
234-
Subscriber<? super ByteBuffer> subscriber,
235-
int chunkSize,
236-
long position,
237-
long numBytesToRead) throws IOException {
238-
this.path = path;
258+
private FileSubscription(AsynchronousFileChannel inputChannel,
259+
Subscriber<? super ByteBuffer> subscriber) throws IOException {
239260
this.inputChannel = inputChannel;
240261
this.subscriber = subscriber;
241-
this.chunkSize = chunkSize;
242262
this.sizeAtStart = inputChannel.size();
243263
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
244264
this.remainingBytes = new AtomicLong(numBytesToRead);
245-
this.position = new AtomicLong(position);
265+
this.currentPosition = new AtomicLong(position);
246266
}
247267

248268
@Override
@@ -297,16 +317,16 @@ private void readData() {
297317
return;
298318
}
299319

300-
ByteBuffer buffer = ByteBuffer.allocate(Math.min(chunkSize, NumericUtils.saturatedCast(remainingBytes.get())));
301-
inputChannel.read(buffer, position.get(), buffer, new CompletionHandler<Integer, ByteBuffer>() {
320+
ByteBuffer buffer = ByteBuffer.allocate(Math.min(chunkSizeInBytes, NumericUtils.saturatedCast(remainingBytes.get())));
321+
inputChannel.read(buffer, currentPosition.get(), buffer, new CompletionHandler<Integer, ByteBuffer>() {
302322
@Override
303323
public void completed(Integer result, ByteBuffer attachment) {
304324
try {
305325
if (result > 0) {
306326
attachment.flip();
307327

308328
int readBytes = attachment.remaining();
309-
position.addAndGet(readBytes);
329+
currentPosition.addAndGet(readBytes);
310330
remainingBytes.addAndGet(-readBytes);
311331

312332
signalOnNext(attachment);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.nio.file.Path;
20+
import java.util.Optional;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import org.reactivestreams.Subscriber;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
27+
import software.amazon.awssdk.core.async.AsyncRequestBody;
28+
import software.amazon.awssdk.core.async.AsyncRequestBodySplitConfiguration;
29+
import software.amazon.awssdk.core.async.SdkPublisher;
30+
import software.amazon.awssdk.utils.Logger;
31+
import software.amazon.awssdk.utils.Validate;
32+
import software.amazon.awssdk.utils.async.SimplePublisher;
33+
34+
/**
35+
* A helper class to split a {@link FileAsyncRequestBody} to multiple smaller async request bodies. It ensures the buffer used to
36+
* be under the configured size via {@link AsyncRequestBodySplitConfiguration#bufferSizeInBytes()} by tracking the number of
37+
* concurrent ongoing {@link AsyncRequestBody}s.
38+
*/
39+
@SdkInternalApi
40+
public final class FileAsyncRequestBodySplitHelper {
41+
private static final Logger log = Logger.loggerFor(FileAsyncRequestBodySplitHelper.class);
42+
43+
private final AtomicBoolean isSendingRequestBody = new AtomicBoolean(false);
44+
private final AtomicLong remainingBytes;
45+
46+
private final long totalContentLength;
47+
private final Path path;
48+
private final int bufferPerAsyncRequestBody;
49+
private final long totalBufferSize;
50+
private final long chunkSize;
51+
52+
private volatile boolean isDone = false;
53+
54+
private AtomicInteger numAsyncRequestBodiesInFlight = new AtomicInteger(0);
55+
private AtomicInteger chunkIndex = new AtomicInteger(0);
56+
57+
public FileAsyncRequestBodySplitHelper(FileAsyncRequestBody asyncRequestBody,
58+
AsyncRequestBodySplitConfiguration splitConfiguration) {
59+
Validate.notNull(asyncRequestBody, "asyncRequestBody");
60+
Validate.notNull(splitConfiguration, "splitConfiguration");
61+
Validate.isTrue(asyncRequestBody.contentLength().isPresent(), "Content length must be present", asyncRequestBody);
62+
this.totalContentLength = asyncRequestBody.contentLength().get();
63+
this.remainingBytes = new AtomicLong(totalContentLength);
64+
this.path = asyncRequestBody.path();
65+
this.chunkSize = splitConfiguration.chunkSizeInBytes() == null ?
66+
AsyncRequestBodySplitConfiguration.defaultConfiguration().chunkSizeInBytes() :
67+
splitConfiguration.chunkSizeInBytes();
68+
this.totalBufferSize = splitConfiguration.bufferSizeInBytes() == null ?
69+
AsyncRequestBodySplitConfiguration.defaultConfiguration().bufferSizeInBytes() :
70+
splitConfiguration.bufferSizeInBytes();
71+
this.bufferPerAsyncRequestBody = asyncRequestBody.chunkSizeInBytes();
72+
}
73+
74+
public SdkPublisher<AsyncRequestBody> split() {
75+
76+
SimplePublisher<AsyncRequestBody> simplePublisher = new SimplePublisher<>();
77+
78+
try {
79+
sendAsyncRequestBody(simplePublisher);
80+
} catch (Throwable throwable) {
81+
simplePublisher.error(throwable);
82+
}
83+
84+
return SdkPublisher.adapt(simplePublisher);
85+
}
86+
87+
private void sendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
88+
if (!isSendingRequestBody.compareAndSet(false, true)) {
89+
return;
90+
}
91+
92+
try {
93+
doSendAsyncRequestBody(simplePublisher);
94+
} finally {
95+
isSendingRequestBody.set(false);
96+
}
97+
}
98+
99+
private void doSendAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
100+
while (true) {
101+
if (!shouldSendMore()) {
102+
break;
103+
}
104+
105+
AsyncRequestBody currentAsyncRequestBody = newFileAsyncRequestBody(simplePublisher);
106+
simplePublisher.send(currentAsyncRequestBody);
107+
numAsyncRequestBodiesInFlight.incrementAndGet();
108+
checkCompletion(simplePublisher, currentAsyncRequestBody);
109+
}
110+
}
111+
112+
private void checkCompletion(SimplePublisher<AsyncRequestBody> simplePublisher, AsyncRequestBody currentAsyncRequestBody) {
113+
long remaining = remainingBytes.addAndGet(-currentAsyncRequestBody.contentLength().get());
114+
115+
if (remaining == 0) {
116+
isDone = true;
117+
simplePublisher.complete();
118+
}
119+
}
120+
121+
private AsyncRequestBody newFileAsyncRequestBody(SimplePublisher<AsyncRequestBody> simplePublisher) {
122+
long position = chunkSize * chunkIndex.getAndIncrement();
123+
long numBytesToReadForThisChunk = Math.min(totalContentLength - position, chunkSize);
124+
FileAsyncRequestBody fileAsyncRequestBody = FileAsyncRequestBody.builder()
125+
.path(path)
126+
.position(position)
127+
.numBytesToRead(numBytesToReadForThisChunk)
128+
.build();
129+
return new AsyncRequestBody() {
130+
131+
@Override
132+
public void subscribe(Subscriber<? super ByteBuffer> s) {
133+
fileAsyncRequestBody.doAfterOnComplete(() -> {
134+
numAsyncRequestBodiesInFlight.decrementAndGet();
135+
sendAsyncRequestBody(simplePublisher);
136+
}).subscribe(s);
137+
}
138+
139+
@Override
140+
public Optional<Long> contentLength() {
141+
return fileAsyncRequestBody.contentLength();
142+
}
143+
};
144+
}
145+
146+
/**
147+
* Should not send more if it's done OR sending next request body would exceed the total buffer size
148+
*/
149+
private boolean shouldSendMore() {
150+
if (isDone) {
151+
return false;
152+
}
153+
154+
long currentUsedBuffer = numAsyncRequestBodiesInFlight.get() * bufferPerAsyncRequestBody;
155+
return currentUsedBuffer + bufferPerAsyncRequestBody <= totalBufferSize;
156+
}
157+
158+
@SdkTestInternalApi
159+
AtomicInteger numAsyncRequestBodiesInFlight() {
160+
return numAsyncRequestBodiesInFlight;
161+
}
162+
}

0 commit comments

Comments
 (0)