17
17
18
18
import java .nio .ByteBuffer ;
19
19
import java .util .Optional ;
20
- import java .util .concurrent .CompletableFuture ;
21
20
import java .util .concurrent .atomic .AtomicBoolean ;
22
21
import java .util .concurrent .atomic .AtomicInteger ;
23
22
import java .util .concurrent .atomic .AtomicLong ;
@@ -45,24 +44,12 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
45
44
private final SimplePublisher <AsyncRequestBody > downstreamPublisher = new SimplePublisher <>();
46
45
private final long chunkSizeInBytes ;
47
46
private final long maxMemoryUsageInBytes ;
48
- private final CompletableFuture <Void > future ;
49
47
50
48
private SplittingPublisher (Builder builder ) {
51
49
this .upstreamPublisher = Validate .paramNotNull (builder .asyncRequestBody , "asyncRequestBody" );
52
50
this .chunkSizeInBytes = Validate .isPositive (builder .chunkSizeInBytes , "chunkSizeInBytes" );
53
51
this .splittingSubscriber = new SplittingSubscriber (upstreamPublisher .contentLength ().orElse (null ));
54
52
this .maxMemoryUsageInBytes = Validate .isPositive (builder .maxMemoryUsageInBytes , "maxMemoryUsageInBytes" );
55
- this .future = builder .future ;
56
-
57
- // We need to cancel upstream subscription if the future gets cancelled.
58
- future .whenComplete ((r , t ) -> {
59
- if (t != null ) {
60
- if (splittingSubscriber .upstreamSubscription != null ) {
61
- log .trace (() -> "Cancelling subscription because return future completed exceptionally " , t );
62
- splittingSubscriber .upstreamSubscription .cancel ();
63
- }
64
- }
65
- });
66
53
}
67
54
68
55
public static Builder builder () {
@@ -117,26 +104,35 @@ public void onNext(ByteBuffer byteBuffer) {
117
104
byteBufferSizeHint = byteBuffer .remaining ();
118
105
119
106
while (true ) {
107
+
108
+ if (!byteBuffer .hasRemaining ()) {
109
+ break ;
110
+ }
111
+
120
112
int amountRemainingInChunk = amountRemainingInChunk ();
121
113
122
114
// If we have fulfilled this chunk,
123
- // we should create a new DownstreamBody if needed
115
+ // complete the current body
124
116
if (amountRemainingInChunk == 0 ) {
125
- completeCurrentBody ();
117
+ completeCurrentBodyAndCreateNewIfNeeded (byteBuffer );
118
+ amountRemainingInChunk = amountRemainingInChunk ();
119
+ }
126
120
127
- if (shouldCreateNewDownstreamRequestBody (byteBuffer )) {
128
- int currentChunk = chunkNumber .incrementAndGet ();
129
- long chunkSize = calculateChunkSize (totalDataRemaining ());
130
- currentBody = initializeNextDownstreamBody (upstreamSize != null , chunkSize , currentChunk );
131
- }
121
+ // If the current ByteBuffer < this chunk, send it as-is
122
+ if (amountRemainingInChunk > byteBuffer .remaining ()) {
123
+ currentBody .send (byteBuffer .duplicate ());
124
+ break ;
132
125
}
133
126
134
- amountRemainingInChunk = amountRemainingInChunk ();
135
- if (amountRemainingInChunk >= byteBuffer .remaining ()) {
127
+ // If the current ByteBuffer == this chunk, send it as-is and
128
+ // complete the current body
129
+ if (amountRemainingInChunk == byteBuffer .remaining ()) {
136
130
currentBody .send (byteBuffer .duplicate ());
131
+ completeCurrentBodyAndCreateNewIfNeeded (byteBuffer );
137
132
break ;
138
133
}
139
134
135
+ // If the current ByteBuffer > this chunk, split this ByteBuffer
140
136
ByteBuffer firstHalf = byteBuffer .duplicate ();
141
137
int newLimit = firstHalf .position () + amountRemainingInChunk ;
142
138
firstHalf .limit (newLimit );
@@ -147,20 +143,30 @@ public void onNext(ByteBuffer byteBuffer) {
147
143
maybeRequestMoreUpstreamData ();
148
144
}
149
145
146
+ private void completeCurrentBodyAndCreateNewIfNeeded (ByteBuffer byteBuffer ) {
147
+ completeCurrentBody ();
148
+ int currentChunk = chunkNumber .incrementAndGet ();
149
+ boolean shouldCreateNewDownstreamRequestBody ;
150
+ Long dataRemaining = totalDataRemaining ();
150
151
151
- /**
152
- * If content length is known, we should create new DownstreamRequestBody if there's remaining data.
153
- * If content length is unknown, we should create new DownstreamRequestBody if upstream is not completed yet.
154
- */
155
- private boolean shouldCreateNewDownstreamRequestBody (ByteBuffer byteBuffer ) {
156
- return !upstreamComplete || byteBuffer .remaining () > 0 ;
152
+ if (upstreamSize == null ) {
153
+ shouldCreateNewDownstreamRequestBody = !upstreamComplete || byteBuffer .hasRemaining ();
154
+ } else {
155
+ shouldCreateNewDownstreamRequestBody = dataRemaining != null && dataRemaining > 0 ;
156
+ }
157
+
158
+ if (shouldCreateNewDownstreamRequestBody ) {
159
+ long chunkSize = calculateChunkSize (dataRemaining );
160
+ currentBody = initializeNextDownstreamBody (upstreamSize != null , chunkSize , currentChunk );
161
+ }
157
162
}
158
163
159
164
private int amountRemainingInChunk () {
160
165
return Math .toIntExact (currentBody .maxLength - currentBody .transferredLength );
161
166
}
162
167
163
168
private void completeCurrentBody () {
169
+ log .debug (() -> "completeCurrentBody for chunk " + chunkNumber .get ());
164
170
currentBody .complete ();
165
171
if (upstreamSize == null ) {
166
172
sendCurrentBody (currentBody );
@@ -172,12 +178,13 @@ public void onComplete() {
172
178
upstreamComplete = true ;
173
179
log .trace (() -> "Received onComplete()" );
174
180
completeCurrentBody ();
175
- downstreamPublisher .complete (). thenRun (() -> future . complete ( null )) ;
181
+ downstreamPublisher .complete ();
176
182
}
177
183
178
184
@ Override
179
185
public void onError (Throwable t ) {
180
- currentBody .error (t );
186
+ log .trace (() -> "Received onError()" , t );
187
+ downstreamPublisher .error (t );
181
188
}
182
189
183
190
private void sendCurrentBody (AsyncRequestBody body ) {
@@ -206,7 +213,7 @@ private void maybeRequestMoreUpstreamData() {
206
213
}
207
214
208
215
private boolean shouldRequestMoreData (long buffered ) {
209
- return buffered == 0 || buffered + byteBufferSizeHint < maxMemoryUsageInBytes ;
216
+ return buffered == 0 || buffered + byteBufferSizeHint <= maxMemoryUsageInBytes ;
210
217
}
211
218
212
219
private Long totalDataRemaining () {
@@ -240,7 +247,7 @@ public Optional<Long> contentLength() {
240
247
}
241
248
242
249
public void send (ByteBuffer data ) {
243
- log .trace (() -> "Sending bytebuffer " + data );
250
+ log .trace (() -> String . format ( "Sending bytebuffer %s to chunk %d" , data , chunkNumber ) );
244
251
int length = data .remaining ();
245
252
transferredLength += length ;
246
253
addDataBuffered (length );
@@ -283,7 +290,6 @@ public static final class Builder {
283
290
private AsyncRequestBody asyncRequestBody ;
284
291
private Long chunkSizeInBytes ;
285
292
private Long maxMemoryUsageInBytes ;
286
- private CompletableFuture <Void > future ;
287
293
288
294
/**
289
295
* Configures the asyncRequestBody to split
@@ -322,18 +328,6 @@ public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
322
328
return this ;
323
329
}
324
330
325
- /**
326
- * Sets the result future. The future will be completed when all request bodies
327
- * have been sent.
328
- *
329
- * @param future The new future value.
330
- * @return This object for method chaining.
331
- */
332
- public Builder resultFuture (CompletableFuture <Void > future ) {
333
- this .future = future ;
334
- return this ;
335
- }
336
-
337
331
public SplittingPublisher build () {
338
332
return new SplittingPublisher (this );
339
333
}
0 commit comments