Skip to content

Commit bc80de0

Browse files
zoewanggmillems
andauthored
Implement multipart upload in Java-based S3 async client (#4052)
* Implement multipart upload in Java-based S3 async client Co-authored-by: Matthew Miller <[email protected]>
1 parent c83caea commit bc80de0

File tree

14 files changed

+1496
-107
lines changed

14 files changed

+1496
-107
lines changed
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
26+
import software.amazon.awssdk.annotations.SdkInternalApi;
27+
import software.amazon.awssdk.core.async.AsyncRequestBody;
28+
import software.amazon.awssdk.core.async.SdkPublisher;
29+
import software.amazon.awssdk.utils.Logger;
30+
import software.amazon.awssdk.utils.Validate;
31+
import software.amazon.awssdk.utils.async.SimplePublisher;
32+
33+
/**
34+
* Splits an {@link SdkPublisher} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of the
35+
* original data.
36+
* // TODO: create a default method in AsyncRequestBody for this
37+
* // TODO: fix the case where content length is null
38+
*/
39+
@SdkInternalApi
40+
public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
41+
private static final Logger log = Logger.loggerFor(SplittingPublisher.class);
42+
private final AsyncRequestBody upstreamPublisher;
43+
private final SplittingSubscriber splittingSubscriber;
44+
private final SimplePublisher<AsyncRequestBody> downstreamPublisher = new SimplePublisher<>();
45+
private final long chunkSizeInBytes;
46+
private final long maxMemoryUsageInBytes;
47+
private final CompletableFuture<Void> future;
48+
49+
private SplittingPublisher(Builder builder) {
50+
this.upstreamPublisher = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
51+
this.chunkSizeInBytes = Validate.paramNotNull(builder.chunkSizeInBytes, "chunkSizeInBytes");
52+
this.splittingSubscriber = new SplittingSubscriber(upstreamPublisher.contentLength().orElse(null));
53+
this.maxMemoryUsageInBytes = builder.maxMemoryUsageInBytes == null ? Long.MAX_VALUE : builder.maxMemoryUsageInBytes;
54+
this.future = builder.future;
55+
56+
// We need to cancel upstream subscription if the future gets cancelled.
57+
future.whenComplete((r, t) -> {
58+
if (t != null) {
59+
if (splittingSubscriber.upstreamSubscription != null) {
60+
log.trace(() -> "Cancelling subscription because return future completed exceptionally ", t);
61+
splittingSubscriber.upstreamSubscription.cancel();
62+
}
63+
}
64+
});
65+
}
66+
67+
public static Builder builder() {
68+
return new Builder();
69+
}
70+
71+
@Override
72+
public void subscribe(Subscriber<? super AsyncRequestBody> downstreamSubscriber) {
73+
downstreamPublisher.subscribe(downstreamSubscriber);
74+
upstreamPublisher.subscribe(splittingSubscriber);
75+
}
76+
77+
private class SplittingSubscriber implements Subscriber<ByteBuffer> {
78+
private Subscription upstreamSubscription;
79+
private final Long upstreamSize;
80+
private final AtomicInteger chunkNumber = new AtomicInteger(0);
81+
private volatile DownstreamBody currentBody;
82+
private final AtomicBoolean hasOpenUpstreamDemand = new AtomicBoolean(false);
83+
private final AtomicLong dataBuffered = new AtomicLong(0);
84+
85+
/**
86+
* A hint to determine whether we will exceed maxMemoryUsage by the next OnNext call.
87+
*/
88+
private int byteBufferSizeHint;
89+
90+
SplittingSubscriber(Long upstreamSize) {
91+
this.upstreamSize = upstreamSize;
92+
}
93+
94+
@Override
95+
public void onSubscribe(Subscription s) {
96+
this.upstreamSubscription = s;
97+
this.currentBody = new DownstreamBody(calculateChunkSize(), chunkNumber.get());
98+
sendCurrentBody();
99+
// We need to request subscription *after* we set currentBody because onNext could be invoked right away.
100+
upstreamSubscription.request(1);
101+
}
102+
103+
@Override
104+
public void onNext(ByteBuffer byteBuffer) {
105+
hasOpenUpstreamDemand.set(false);
106+
byteBufferSizeHint = byteBuffer.remaining();
107+
108+
while (true) {
109+
int amountRemainingInPart = amountRemainingInPart();
110+
int finalAmountRemainingInPart = amountRemainingInPart;
111+
if (amountRemainingInPart == 0) {
112+
currentBody.complete();
113+
int currentChunk = chunkNumber.incrementAndGet();
114+
Long partSize = calculateChunkSize();
115+
currentBody = new DownstreamBody(partSize, currentChunk);
116+
sendCurrentBody();
117+
}
118+
119+
amountRemainingInPart = amountRemainingInPart();
120+
if (amountRemainingInPart >= byteBuffer.remaining()) {
121+
currentBody.send(byteBuffer.duplicate());
122+
break;
123+
}
124+
125+
ByteBuffer firstHalf = byteBuffer.duplicate();
126+
int newLimit = firstHalf.position() + amountRemainingInPart;
127+
firstHalf.limit(newLimit);
128+
byteBuffer.position(newLimit);
129+
currentBody.send(firstHalf);
130+
}
131+
132+
maybeRequestMoreUpstreamData();
133+
}
134+
135+
private int amountRemainingInPart() {
136+
return Math.toIntExact(currentBody.totalLength - currentBody.transferredLength);
137+
}
138+
139+
@Override
140+
public void onComplete() {
141+
log.trace(() -> "Received onComplete()");
142+
downstreamPublisher.complete().thenRun(() -> future.complete(null));
143+
currentBody.complete();
144+
}
145+
146+
@Override
147+
public void onError(Throwable t) {
148+
currentBody.error(t);
149+
}
150+
151+
private void sendCurrentBody() {
152+
downstreamPublisher.send(currentBody).exceptionally(t -> {
153+
downstreamPublisher.error(t);
154+
return null;
155+
});
156+
}
157+
158+
private Long calculateChunkSize() {
159+
Long dataRemaining = dataRemaining();
160+
if (dataRemaining == null) {
161+
return null;
162+
}
163+
164+
return Math.min(chunkSizeInBytes, dataRemaining);
165+
}
166+
167+
private void maybeRequestMoreUpstreamData() {
168+
long buffered = dataBuffered.get();
169+
if (shouldRequestMoreData(buffered) &&
170+
hasOpenUpstreamDemand.compareAndSet(false, true)) {
171+
log.trace(() -> "Requesting more data, current data buffered: " + buffered);
172+
upstreamSubscription.request(1);
173+
}
174+
}
175+
176+
private boolean shouldRequestMoreData(long buffered) {
177+
return buffered == 0 || buffered + byteBufferSizeHint < maxMemoryUsageInBytes;
178+
}
179+
180+
private Long dataRemaining() {
181+
if (upstreamSize == null) {
182+
return null;
183+
}
184+
return upstreamSize - (chunkNumber.get() * chunkSizeInBytes);
185+
}
186+
187+
private class DownstreamBody implements AsyncRequestBody {
188+
private final Long totalLength;
189+
private final SimplePublisher<ByteBuffer> delegate = new SimplePublisher<>();
190+
private final int chunkNumber;
191+
private volatile long transferredLength = 0;
192+
193+
private DownstreamBody(Long totalLength, int chunkNumber) {
194+
this.totalLength = totalLength;
195+
this.chunkNumber = chunkNumber;
196+
}
197+
198+
@Override
199+
public Optional<Long> contentLength() {
200+
return Optional.ofNullable(totalLength);
201+
}
202+
203+
public void send(ByteBuffer data) {
204+
log.trace(() -> "Sending bytebuffer " + data);
205+
int length = data.remaining();
206+
transferredLength += length;
207+
addDataBuffered(length);
208+
delegate.send(data).whenComplete((r, t) -> {
209+
addDataBuffered(-length);
210+
if (t != null) {
211+
error(t);
212+
}
213+
});
214+
}
215+
216+
public void complete() {
217+
log.debug(() -> "Received complete() for chunk number: " + chunkNumber);
218+
delegate.complete();
219+
}
220+
221+
public void error(Throwable error) {
222+
delegate.error(error);
223+
}
224+
225+
@Override
226+
public void subscribe(Subscriber<? super ByteBuffer> s) {
227+
delegate.subscribe(s);
228+
}
229+
230+
private void addDataBuffered(int length) {
231+
dataBuffered.addAndGet(length);
232+
if (length < 0) {
233+
maybeRequestMoreUpstreamData();
234+
}
235+
}
236+
}
237+
}
238+
239+
public static final class Builder {
240+
private AsyncRequestBody asyncRequestBody;
241+
private Long chunkSizeInBytes;
242+
private Long maxMemoryUsageInBytes;
243+
private CompletableFuture<Void> future;
244+
245+
/**
246+
* Configures the asyncRequestBody to split
247+
*
248+
* @param asyncRequestBody The new asyncRequestBody value.
249+
* @return This object for method chaining.
250+
*/
251+
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
252+
this.asyncRequestBody = asyncRequestBody;
253+
return this;
254+
}
255+
256+
/**
257+
* Configures the size of the chunk for each {@link AsyncRequestBody} to publish
258+
*
259+
* @param chunkSizeInBytes The new chunkSizeInBytes value.
260+
* @return This object for method chaining.
261+
*/
262+
public Builder chunkSizeInBytes(Long chunkSizeInBytes) {
263+
this.chunkSizeInBytes = chunkSizeInBytes;
264+
return this;
265+
}
266+
267+
/**
268+
* Sets the maximum memory usage in bytes. By default, it uses unlimited memory.
269+
*
270+
* @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
271+
* @return This object for method chaining.
272+
*/
273+
// TODO: max memory usage might not be the best name, since we may technically go a little above this limit when we add
274+
// on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
275+
// for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
276+
// buffer size instead?
277+
public Builder maxMemoryUsageInBytes(Long maxMemoryUsageInBytes) {
278+
this.maxMemoryUsageInBytes = maxMemoryUsageInBytes;
279+
return this;
280+
}
281+
282+
/**
283+
* Sets the result future. The future will be completed when all request bodies
284+
* have been sent.
285+
*
286+
* @param future The new future value.
287+
* @return This object for method chaining.
288+
*/
289+
public Builder resultFuture(CompletableFuture<Void> future) {
290+
this.future = future;
291+
return this;
292+
}
293+
294+
public SplittingPublisher build() {
295+
return new SplittingPublisher(this);
296+
}
297+
}
298+
}

0 commit comments

Comments
 (0)