Skip to content

Commit f5b7611

Browse files
millemszoewangg
authored andcommitted
Prototyping:
1. AsyncRequestBody.split() to break up a request body into multiple request bodies. This can be overridden in the file-based publisher to reduce memory usage by reading from disk in parallel. 2. Simplified UploadPartRequestPublisher using AsyncRequestBody.split(). No longer requires a separate executor?
1 parent c5b79f0 commit f5b7611

File tree

4 files changed

+193
-283
lines changed

4 files changed

+193
-283
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
3131
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3232
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
33+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
3334
import software.amazon.awssdk.core.internal.util.Mimetype;
3435
import software.amazon.awssdk.utils.BinaryUtils;
3536

@@ -69,6 +70,11 @@ default String contentType() {
6970
return Mimetype.MIMETYPE_OCTET_STREAM;
7071
}
7172

73+
default SdkPublisher<AsyncRequestBody> split(long partSizeInBytes,
74+
long maxMemoryUsageInBytes) {
75+
return new SplittingPublisher(this, partSizeInBytes, maxMemoryUsageInBytes);
76+
}
77+
7278
/**
7379
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
7480
* The data is delivered when the publisher publishes the data.
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import software.amazon.awssdk.core.async.AsyncRequestBody;
25+
import software.amazon.awssdk.core.async.SdkPublisher;
26+
import software.amazon.awssdk.utils.async.SimplePublisher;
27+
28+
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
29+
private final AsyncRequestBody upstreamPublisher;
30+
private final SplittingSubscriber splittingSubscriber = new SplittingSubscriber();
31+
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
32+
private final long partSizeInBytes;
33+
private final long maxMemoryUsageInBytes;
34+
35+
public SplittingPublisher(AsyncRequestBody asyncRequestBody,
36+
long partSizeInBytes,
37+
long maxMemoryUsageInBytes) {
38+
this.upstreamPublisher = asyncRequestBody;
39+
this.partSizeInBytes = partSizeInBytes;
40+
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
41+
}
42+
43+
@Override
44+
public void subscribe(Subscriber<? super AsyncRequestBody> downstreamSubscriber) {
45+
downstreamPublisher.subscribe(downstreamSubscriber);
46+
upstreamPublisher.subscribe(splittingSubscriber);
47+
}
48+
49+
private class SplittingSubscriber implements Subscriber<ByteBuffer> {
50+
private Subscription upstreamSubscription;
51+
private final Long upstreamSize = upstreamPublisher.contentLength().orElse(null);
52+
53+
private int partNumber = 0;
54+
private DownstreamBody currentBody;
55+
56+
private final AtomicBoolean hasOpenUpstreamDemand = new AtomicBoolean(false);
57+
private final AtomicLong dataBuffered = new AtomicLong(0);
58+
59+
@Override
60+
public void onSubscribe(Subscription s) {
61+
this.upstreamSubscription = s;
62+
this.currentBody = new DownstreamBody(calculatePartSize());
63+
}
64+
65+
@Override
66+
public void onNext(ByteBuffer byteBuffer) {
67+
hasOpenUpstreamDemand.set(false);
68+
69+
while (true) {
70+
int amountRemainingInPart = Math.toIntExact(partSizeInBytes - currentBody.partLength);
71+
72+
if (amountRemainingInPart < byteBuffer.remaining()) {
73+
// TODO: should we avoid sending empty byte buffers, which can happen here?
74+
currentBody.send(byteBuffer);
75+
break;
76+
}
77+
78+
ByteBuffer firstHalf = byteBuffer.duplicate();
79+
int newLimit = firstHalf.position() + amountRemainingInPart;
80+
firstHalf.limit(newLimit); // TODO: Am I off by one here?
81+
currentBody.send(firstHalf);
82+
currentBody.complete();
83+
84+
++partNumber;
85+
currentBody = new DownstreamBody(calculatePartSize());
86+
downstreamPublisher.send(currentBody);
87+
byteBuffer.position(newLimit); // TODO: Am I off by one here?
88+
}
89+
90+
maybeRequestMoreUpstreamData();
91+
}
92+
93+
@Override
94+
public void onComplete() {
95+
currentBody.complete();
96+
}
97+
98+
@Override
99+
public void onError(Throwable t) {
100+
currentBody.error(t);
101+
}
102+
103+
private Long calculatePartSize() {
104+
Long dataRemaining = dataRemaining();
105+
if (dataRemaining == null) {
106+
return null;
107+
}
108+
109+
return Math.min(partSizeInBytes, dataRemaining);
110+
}
111+
112+
private void maybeRequestMoreUpstreamData() {
113+
if (dataBuffered.get() < maxMemoryUsageInBytes && hasOpenUpstreamDemand.compareAndSet(false, true)) {
114+
// TODO: max memory usage might not be the best name, since we can technically go a little above
115+
// this limit when we add on a new byte buffer. But we don't know what the size of a buffer we request
116+
// will be, so I don't think we can have a truly accurate max. Maybe we call it minimum buffer size instead?
117+
upstreamSubscription.request(1);
118+
}
119+
}
120+
121+
private Long dataRemaining() {
122+
if (upstreamSize == null) {
123+
return null;
124+
}
125+
return upstreamSize - (partNumber * partSizeInBytes);
126+
}
127+
128+
private class DownstreamBody implements AsyncRequestBody {
129+
private final Long size;
130+
private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher<>();
131+
private long partLength = 0;
132+
133+
private DownstreamBody(Long size) {
134+
this.size = size;
135+
}
136+
137+
@Override
138+
public Optional<Long> contentLength() {
139+
return Optional.ofNullable(size);
140+
}
141+
142+
public void send(ByteBuffer data) {
143+
int length = data.remaining();
144+
partLength += length;
145+
addDataBuffered(length);
146+
delegate.send(data).thenRun(() -> addDataBuffered(-length));
147+
}
148+
149+
public void complete() {
150+
delegate.complete();
151+
}
152+
153+
public void error(Throwable error) {
154+
delegate.error(error);
155+
}
156+
157+
@Override
158+
public void subscribe(Subscriber<? super ByteBuffer> s) {
159+
delegate.subscribe(s);
160+
}
161+
162+
private void addDataBuffered(int length) {
163+
dataBuffered.addAndGet(length);
164+
if (length < 0) {
165+
maybeRequestMoreUpstreamData();
166+
}
167+
}
168+
}
169+
}
170+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartUploadHelper.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,9 @@ private void doUploadInParts(PutObjectRequest putObjectRequest,
140140
UploadPartRequestPublisher uploadPartRequestPublisher =
141141
UploadPartRequestPublisher.builder()
142142
.partSize(partSizeInBytes)
143-
.numOfParts(partCount)
144143
.asyncRequestBody(asyncRequestBody)
145-
.contentLength(contentLength)
146144
.putObjectRequest(putObjectRequest)
147145
.numOfPartsBuffered(numOfPartsBuffered)
148-
.executor(executor)
149146
.uploadId(uploadId)
150147
.build();
151148

0 commit comments

Comments
 (0)