17
17
18
18
import java .nio .ByteBuffer ;
19
19
import java .util .Optional ;
20
- import java .util .concurrent .CompletableFuture ;
21
20
import java .util .concurrent .atomic .AtomicBoolean ;
22
21
import java .util .concurrent .atomic .AtomicInteger ;
23
22
import java .util .concurrent .atomic .AtomicLong ;
@@ -45,24 +44,12 @@ public class SplittingPublisher implements SdkPublisher<AsyncRequestBody> {
45
44
private final SimplePublisher <AsyncRequestBody > downstreamPublisher = new SimplePublisher <>();
46
45
private final long chunkSizeInBytes ;
47
46
private final long maxMemoryUsageInBytes ;
48
- private final CompletableFuture <Void > future ;
49
47
50
48
private SplittingPublisher (Builder builder ) {
51
49
this .upstreamPublisher = Validate .paramNotNull (builder .asyncRequestBody , "asyncRequestBody" );
52
50
this .chunkSizeInBytes = Validate .isPositive (builder .chunkSizeInBytes , "chunkSizeInBytes" );
53
51
this .splittingSubscriber = new SplittingSubscriber (upstreamPublisher .contentLength ().orElse (null ));
54
52
this .maxMemoryUsageInBytes = Validate .isPositive (builder .maxMemoryUsageInBytes , "maxMemoryUsageInBytes" );
55
- this .future = builder .future ;
56
-
57
- // We need to cancel upstream subscription if the future gets cancelled.
58
- future .whenComplete ((r , t ) -> {
59
- if (t != null ) {
60
- if (splittingSubscriber .upstreamSubscription != null ) {
61
- log .trace (() -> "Cancelling subscription because return future completed exceptionally " , t );
62
- splittingSubscriber .upstreamSubscription .cancel ();
63
- }
64
- }
65
- });
66
53
}
67
54
68
55
public static Builder builder () {
@@ -117,16 +104,20 @@ public void onNext(ByteBuffer byteBuffer) {
117
104
byteBufferSizeHint = byteBuffer .remaining ();
118
105
119
106
while (true ) {
107
+
108
+ if (!byteBuffer .hasRemaining ()) {
109
+ break ;
110
+ }
111
+
120
112
int amountRemainingInChunk = amountRemainingInChunk ();
121
113
122
114
// If we have fulfilled this chunk,
123
115
// complete the current body
124
116
if (amountRemainingInChunk == 0 ) {
125
117
completeCurrentBodyAndCreateNewIfNeeded (byteBuffer );
118
+ amountRemainingInChunk = amountRemainingInChunk ();
126
119
}
127
120
128
- amountRemainingInChunk = amountRemainingInChunk ();
129
-
130
121
// If the current ByteBuffer < this chunk, send it as-is
131
122
if (amountRemainingInChunk > byteBuffer .remaining ()) {
132
123
currentBody .send (byteBuffer .duplicate ());
@@ -154,29 +145,28 @@ public void onNext(ByteBuffer byteBuffer) {
154
145
155
146
private void completeCurrentBodyAndCreateNewIfNeeded (ByteBuffer byteBuffer ) {
156
147
completeCurrentBody ();
148
+ int currentChunk = chunkNumber .incrementAndGet ();
149
+ boolean shouldCreateNewDownstreamRequestBody ;
150
+ Long dataRemaining = totalDataRemaining ();
157
151
158
- if (shouldCreateNewDownstreamRequestBody ( byteBuffer ) ) {
159
- int currentChunk = chunkNumber . incrementAndGet ();
160
- long chunkSize = calculateChunkSize ( totalDataRemaining ());
161
- currentBody = initializeNextDownstreamBody ( upstreamSize != null , chunkSize , currentChunk ) ;
152
+ if (upstreamSize == null ) {
153
+ shouldCreateNewDownstreamRequestBody = ! upstreamComplete || byteBuffer . hasRemaining ();
154
+ } else {
155
+ shouldCreateNewDownstreamRequestBody = dataRemaining != null && dataRemaining > 0 ;
162
156
}
163
- }
164
-
165
157
166
- /**
167
- * If content length is known, we should create new DownstreamRequestBody if there's remaining data.
168
- * If content length is unknown, we should create new DownstreamRequestBody if upstream is not completed yet.
169
- */
170
- private boolean shouldCreateNewDownstreamRequestBody (ByteBuffer byteBuffer ) {
171
- return !upstreamComplete || byteBuffer .remaining () > 0 ;
158
+ if (shouldCreateNewDownstreamRequestBody ) {
159
+ long chunkSize = calculateChunkSize (dataRemaining );
160
+ currentBody = initializeNextDownstreamBody (upstreamSize != null , chunkSize , currentChunk );
161
+ }
172
162
}
173
163
174
164
private int amountRemainingInChunk () {
175
165
return Math .toIntExact (currentBody .maxLength - currentBody .transferredLength );
176
166
}
177
167
178
168
private void completeCurrentBody () {
179
- log .debug (() -> "completeCurrentBody" );
169
+ log .debug (() -> "completeCurrentBody for chunk " + chunkNumber . get () );
180
170
currentBody .complete ();
181
171
if (upstreamSize == null ) {
182
172
sendCurrentBody (currentBody );
@@ -188,16 +178,16 @@ public void onComplete() {
188
178
upstreamComplete = true ;
189
179
log .trace (() -> "Received onComplete()" );
190
180
completeCurrentBody ();
191
- downstreamPublisher .complete (). thenRun (() -> future . complete ( null )) ;
181
+ downstreamPublisher .complete ();
192
182
}
193
183
194
184
@ Override
195
185
public void onError (Throwable t ) {
196
- currentBody .error (t );
186
+ log .trace (() -> "Received onError()" , t );
187
+ downstreamPublisher .error (t );
197
188
}
198
189
199
190
private void sendCurrentBody (AsyncRequestBody body ) {
200
- log .debug (() -> "sendCurrentBody" );
201
191
downstreamPublisher .send (body ).exceptionally (t -> {
202
192
downstreamPublisher .error (t );
203
193
return null ;
@@ -300,7 +290,6 @@ public static final class Builder {
300
290
private AsyncRequestBody asyncRequestBody ;
301
291
private Long chunkSizeInBytes ;
302
292
private Long maxMemoryUsageInBytes ;
303
- private CompletableFuture <Void > future ;
304
293
305
294
/**
306
295
* Configures the asyncRequestBody to split
@@ -339,18 +328,6 @@ public Builder maxMemoryUsageInBytes(long maxMemoryUsageInBytes) {
339
328
return this ;
340
329
}
341
330
342
- /**
343
- * Sets the result future. The future will be completed when all request bodies
344
- * have been sent.
345
- *
346
- * @param future The new future value.
347
- * @return This object for method chaining.
348
- */
349
- public Builder resultFuture (CompletableFuture <Void > future ) {
350
- this .future = future ;
351
- return this ;
352
- }
353
-
354
331
public SplittingPublisher build () {
355
332
return new SplittingPublisher (this );
356
333
}
0 commit comments