17
17
18
18
import java .nio .ByteBuffer ;
19
19
import java .util .Optional ;
20
+ import java .util .concurrent .CompletableFuture ;
20
21
import java .util .concurrent .atomic .AtomicBoolean ;
22
+ import java .util .concurrent .atomic .AtomicInteger ;
21
23
import java .util .concurrent .atomic .AtomicLong ;
22
24
import org .reactivestreams .Subscriber ;
23
25
import org .reactivestreams .Subscription ;
26
+ import software .amazon .awssdk .annotations .SdkInternalApi ;
24
27
import software .amazon .awssdk .core .async .AsyncRequestBody ;
25
28
import software .amazon .awssdk .core .async .SdkPublisher ;
29
+ import software .amazon .awssdk .utils .Logger ;
30
+ import software .amazon .awssdk .utils .Validate ;
26
31
import software .amazon .awssdk .utils .async .SimplePublisher ;
27
32
33
+ /**
34
+ * Splits an {@link SdkPublisher} to multiple smaller {@link AsyncRequestBody}s, each of which publishes a specific portion of the
35
+ * original data.
36
+ * // TODO: create a static factory method in AsyncRequestBody for this
37
+ * // TODO: fix the case where content length is null
38
+ */
39
+ @ SdkInternalApi
28
40
public class SplittingPublisher implements SdkPublisher <AsyncRequestBody > {
41
+ private static final Logger log = Logger .loggerFor (SplittingPublisher .class );
29
42
private final AsyncRequestBody upstreamPublisher ;
30
- private final SplittingSubscriber splittingSubscriber = new SplittingSubscriber () ;
43
+ private final SplittingSubscriber splittingSubscriber ;
31
44
private final SimplePublisher <AsyncRequestBody > downstreamPublisher = new SimplePublisher <>();
32
- private final long partSizeInBytes ;
45
+ private final long chunkSizeInBytes ;
33
46
private final long maxMemoryUsageInBytes ;
47
+ private final CompletableFuture <Void > future ;
48
+
49
+ private SplittingPublisher (Builder builder ) {
50
+ this .upstreamPublisher = Validate .paramNotNull (builder .asyncRequestBody , "asyncRequestBody" );
51
+ this .chunkSizeInBytes = Validate .paramNotNull (builder .chunkSizeInBytes , "chunkSizeInBytes" );
52
+ this .splittingSubscriber = new SplittingSubscriber (upstreamPublisher .contentLength ().orElse (null ));
53
+ this .maxMemoryUsageInBytes = builder .maxMemoryUsageInBytes == null ? Long .MAX_VALUE : builder .maxMemoryUsageInBytes ;
54
+ this .future = builder .future ;
55
+
56
+ // We need to cancel upstream subscription if the future gets cancelled.
57
+ future .whenComplete ((r , t ) -> {
58
+ if (t != null ) {
59
+ if (splittingSubscriber .upstreamSubscription != null ) {
60
+ log .trace (() -> "Cancelling subscription because return future completed exceptionally " , t );
61
+ splittingSubscriber .upstreamSubscription .cancel ();
62
+ }
63
+ }
64
+ });
65
+ }
34
66
35
- public SplittingPublisher (AsyncRequestBody asyncRequestBody ,
36
- long partSizeInBytes ,
37
- long maxMemoryUsageInBytes ) {
38
- this .upstreamPublisher = asyncRequestBody ;
39
- this .partSizeInBytes = partSizeInBytes ;
40
- this .maxMemoryUsageInBytes = maxMemoryUsageInBytes ;
67
+ public static Builder builder () {
68
+ return new Builder ();
41
69
}
42
70
43
71
@ Override
@@ -48,50 +76,70 @@ public void subscribe(Subscriber<? super AsyncRequestBody> downstreamSubscriber)
48
76
49
77
private class SplittingSubscriber implements Subscriber <ByteBuffer > {
50
78
private Subscription upstreamSubscription ;
51
- private final Long upstreamSize = upstreamPublisher .contentLength ().orElse (null );
52
-
53
- private int partNumber = 0 ;
54
- private DownstreamBody currentBody ;
55
-
79
+ private final Long upstreamSize ;
80
+ private final AtomicInteger chunkNumber = new AtomicInteger (0 );
81
+ private volatile DownstreamBody currentBody ;
56
82
private final AtomicBoolean hasOpenUpstreamDemand = new AtomicBoolean (false );
57
83
private final AtomicLong dataBuffered = new AtomicLong (0 );
58
84
85
+ /**
86
+ * A hint to determine whether we will exceed maxMemoryUsage by the next OnNext call.
87
+ */
88
+ private int byteBufferSizeHint ;
89
+
90
+ SplittingSubscriber (Long upstreamSize ) {
91
+ this .upstreamSize = upstreamSize ;
92
+ }
93
+
59
94
@ Override
60
95
public void onSubscribe (Subscription s ) {
61
96
this .upstreamSubscription = s ;
62
- this .currentBody = new DownstreamBody (calculatePartSize ());
97
+ this .currentBody = new DownstreamBody (calculateChunkSize (), chunkNumber .get ());
98
+ sendCurrentBody ();
99
+ // We need to request subscription *after* we set currentBody because onNext could be invoked right away.
100
+ upstreamSubscription .request (1 );
63
101
}
64
102
65
103
@ Override
66
104
public void onNext (ByteBuffer byteBuffer ) {
67
105
hasOpenUpstreamDemand .set (false );
106
+ byteBufferSizeHint = byteBuffer .remaining ();
68
107
69
108
while (true ) {
70
- int amountRemainingInPart = Math .toIntExact (partSizeInBytes - currentBody .partLength );
109
+ int amountRemainingInPart = amountRemainingInPart ();
110
+ int finalAmountRemainingInPart = amountRemainingInPart ;
111
+ if (amountRemainingInPart == 0 ) {
112
+ currentBody .complete ();
113
+ int currentChunk = chunkNumber .incrementAndGet ();
114
+ Long partSize = calculateChunkSize ();
115
+ currentBody = new DownstreamBody (partSize , currentChunk );
116
+ sendCurrentBody ();
117
+ }
71
118
72
- if ( amountRemainingInPart < byteBuffer . remaining ()) {
73
- // TODO: should we avoid sending empty byte buffers, which can happen here?
74
- currentBody .send (byteBuffer );
119
+ amountRemainingInPart = amountRemainingInPart ();
120
+ if ( amountRemainingInPart >= byteBuffer . remaining ()) {
121
+ currentBody .send (byteBuffer . duplicate () );
75
122
break ;
76
123
}
77
124
78
125
ByteBuffer firstHalf = byteBuffer .duplicate ();
79
126
int newLimit = firstHalf .position () + amountRemainingInPart ;
80
- firstHalf .limit (newLimit ); // TODO: Am I off by one here?
127
+ firstHalf .limit (newLimit );
128
+ byteBuffer .position (newLimit );
81
129
currentBody .send (firstHalf );
82
- currentBody .complete ();
83
-
84
- ++partNumber ;
85
- currentBody = new DownstreamBody (calculatePartSize ());
86
- downstreamPublisher .send (currentBody );
87
- byteBuffer .position (newLimit ); // TODO: Am I off by one here?
88
130
}
89
131
90
132
maybeRequestMoreUpstreamData ();
91
133
}
92
134
135
+ private int amountRemainingInPart () {
136
+ return Math .toIntExact (currentBody .totalLength - currentBody .transferredLength );
137
+ }
138
+
93
139
@ Override
94
140
public void onComplete () {
141
+ log .trace (() -> "Received onComplete()" );
142
+ downstreamPublisher .complete ().thenRun (() -> future .complete (null ));
95
143
currentBody .complete ();
96
144
}
97
145
@@ -100,53 +148,73 @@ public void onError(Throwable t) {
100
148
currentBody .error (t );
101
149
}
102
150
103
- private Long calculatePartSize () {
151
+ private void sendCurrentBody () {
152
+ downstreamPublisher .send (currentBody ).exceptionally (t -> {
153
+ downstreamPublisher .error (t );
154
+ return null ;
155
+ });
156
+ }
157
+
158
+ private Long calculateChunkSize () {
104
159
Long dataRemaining = dataRemaining ();
105
160
if (dataRemaining == null ) {
106
161
return null ;
107
162
}
108
163
109
- return Math .min (partSizeInBytes , dataRemaining );
164
+ return Math .min (chunkSizeInBytes , dataRemaining );
110
165
}
111
166
112
167
private void maybeRequestMoreUpstreamData () {
113
- if ( dataBuffered .get () < maxMemoryUsageInBytes && hasOpenUpstreamDemand . compareAndSet ( false , true )) {
114
- // TODO: max memory usage might not be the best name, since we can technically go a little above
115
- // this limit when we add on a new byte buffer. But we don't know what the size of a buffer we request
116
- // will be, so I don't think we can have a truly accurate max. Maybe we call it minimum buffer size instead?
168
+ long buffered = dataBuffered .get ();
169
+ if ( shouldRequestMoreData ( buffered ) &&
170
+ hasOpenUpstreamDemand . compareAndSet ( false , true )) {
171
+ log . trace (() -> "Requesting more data, current data buffered: " + buffered );
117
172
upstreamSubscription .request (1 );
118
173
}
119
174
}
120
175
176
+ private boolean shouldRequestMoreData (long buffered ) {
177
+ return buffered == 0 || buffered + byteBufferSizeHint < maxMemoryUsageInBytes ;
178
+ }
179
+
121
180
private Long dataRemaining () {
122
181
if (upstreamSize == null ) {
123
182
return null ;
124
183
}
125
- return upstreamSize - (partNumber * partSizeInBytes );
184
+ return upstreamSize - (chunkNumber . get () * chunkSizeInBytes );
126
185
}
127
186
128
187
private class DownstreamBody implements AsyncRequestBody {
129
- private final Long size ;
188
+ private final Long totalLength ;
130
189
private final SimplePublisher <ByteBuffer > delegate = new SimplePublisher <>();
131
- private long partLength = 0 ;
190
+ private final int chunkNumber ;
191
+ private volatile long transferredLength = 0 ;
132
192
133
- private DownstreamBody (Long size ) {
134
- this .size = size ;
193
+ private DownstreamBody (Long totalLength , int chunkNumber ) {
194
+ this .totalLength = totalLength ;
195
+ this .chunkNumber = chunkNumber ;
135
196
}
136
197
137
198
@ Override
138
199
public Optional <Long > contentLength () {
139
- return Optional .ofNullable (size );
200
+ return Optional .ofNullable (totalLength );
140
201
}
141
202
142
203
public void send (ByteBuffer data ) {
204
+ log .trace (() -> "Sending bytebuffer " + data );
143
205
int length = data .remaining ();
144
- partLength += length ;
206
+ transferredLength += length ;
145
207
addDataBuffered (length );
146
- delegate .send (data ).thenRun (() -> addDataBuffered (-length ));
208
+ delegate .send (data ).whenComplete ((r , t ) -> {
209
+ addDataBuffered (-length );
210
+ if (t != null ) {
211
+ error (t );
212
+ }
213
+ });
147
214
}
148
215
149
216
public void complete () {
217
+ log .debug (() -> "Received complete() for chunk number: " + chunkNumber );
150
218
delegate .complete ();
151
219
}
152
220
@@ -167,4 +235,64 @@ private void addDataBuffered(int length) {
167
235
}
168
236
}
169
237
}
238
+
239
+ public static final class Builder {
240
+ private AsyncRequestBody asyncRequestBody ;
241
+ private Long chunkSizeInBytes ;
242
+ private Long maxMemoryUsageInBytes ;
243
+ private CompletableFuture <Void > future ;
244
+
245
+ /**
246
+ * Configures the asyncRequestBody to split
247
+ *
248
+ * @param asyncRequestBody The new asyncRequestBody value.
249
+ * @return This object for method chaining.
250
+ */
251
+ public Builder asyncRequestBody (AsyncRequestBody asyncRequestBody ) {
252
+ this .asyncRequestBody = asyncRequestBody ;
253
+ return this ;
254
+ }
255
+
256
+ /**
257
+ * Configures the size of the chunk for each {@link AsyncRequestBody} to publish
258
+ *
259
+ * @param chunkSizeInBytes The new chunkSizeInBytes value.
260
+ * @return This object for method chaining.
261
+ */
262
+ public Builder chunkSizeInBytes (Long chunkSizeInBytes ) {
263
+ this .chunkSizeInBytes = chunkSizeInBytes ;
264
+ return this ;
265
+ }
266
+
267
+ /**
268
+ * Sets the maximum memory usage in bytes. By default, it uses unlimited memory.
269
+ *
270
+ * @param maxMemoryUsageInBytes The new maxMemoryUsageInBytes value.
271
+ * @return This object for method chaining.
272
+ */
273
+ // TODO: max memory usage might not be the best name, since we may technically go a little above this limit when we add
274
+ // on a new byte buffer. But we don't know for sure what the size of a buffer we request will be (we do use the size
275
+ // for the last byte buffer as a hint), so I don't think we can have a truly accurate max. Maybe we call it minimum
276
+ // buffer size instead?
277
+ public Builder maxMemoryUsageInBytes (Long maxMemoryUsageInBytes ) {
278
+ this .maxMemoryUsageInBytes = maxMemoryUsageInBytes ;
279
+ return this ;
280
+ }
281
+
282
+ /**
283
+ * Sets the result future. The future will be completed when all request bodies
284
+ * have been sent.
285
+ *
286
+ * @param future The new future value.
287
+ * @return This object for method chaining.
288
+ */
289
+ public Builder resultFuture (CompletableFuture <Void > future ) {
290
+ this .future = future ;
291
+ return this ;
292
+ }
293
+
294
+ public SplittingPublisher build () {
295
+ return new SplittingPublisher (this );
296
+ }
297
+ }
170
298
}
0 commit comments