22
22
import java .nio .charset .Charset ;
23
23
import java .nio .charset .StandardCharsets ;
24
24
import java .nio .file .Path ;
25
+ import java .util .Arrays ;
25
26
import java .util .Optional ;
26
27
import java .util .concurrent .CompletableFuture ;
27
28
import java .util .concurrent .ExecutorService ;
28
29
import org .reactivestreams .Publisher ;
29
30
import org .reactivestreams .Subscriber ;
30
31
import software .amazon .awssdk .annotations .SdkPublicApi ;
31
- import software .amazon .awssdk .core .internal .async .ByteArrayAsyncRequestBody ;
32
+ import software .amazon .awssdk .core .internal .async .ByteBuffersAsyncRequestBody ;
32
33
import software .amazon .awssdk .core .internal .async .FileAsyncRequestBody ;
33
34
import software .amazon .awssdk .core .internal .async .InputStreamWithExecutorAsyncRequestBody ;
34
35
import software .amazon .awssdk .core .internal .async .SplittingPublisher ;
37
38
import software .amazon .awssdk .utils .Validate ;
38
39
39
40
/**
40
- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
41
- * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
42
- * of the data (i.e. to write that data on the wire).
41
+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
42
+ * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
43
+ * to write that data on the wire).
43
44
*
44
45
* <p>
45
46
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
46
- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
47
- * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
48
- * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
47
+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
48
+ * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
49
+ * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
49
50
* </p>
50
51
*
51
52
* <p>
52
- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
53
- * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
54
- * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
53
+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
54
+ * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
55
+ * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
55
56
* </p>
56
57
*
57
58
* @see FileAsyncRequestBody
58
- * @see ByteArrayAsyncRequestBody
59
+ * @see ByteBuffersAsyncRequestBody
59
60
*/
60
61
@ SdkPublicApi
61
62
public interface AsyncRequestBody extends SdkPublisher <ByteBuffer > {
@@ -73,8 +74,8 @@ default String contentType() {
73
74
}
74
75
75
76
/**
76
- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
77
- * The data is delivered when the publisher publishes the data.
77
+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
78
+ * publisher publishes the data.
78
79
*
79
80
* @param publisher Publisher of source data
80
81
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -127,11 +128,11 @@ static AsyncRequestBody fromFile(File file) {
127
128
* @param string The string to provide.
128
129
* @param cs The {@link Charset} to use.
129
130
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
130
- * @see ByteArrayAsyncRequestBody
131
+ * @see ByteBuffersAsyncRequestBody
131
132
*/
132
133
static AsyncRequestBody fromString (String string , Charset cs ) {
133
- return new ByteArrayAsyncRequestBody ( string . getBytes ( cs ),
134
- Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ));
134
+ return ByteBuffersAsyncRequestBody . from ( Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ),
135
+ string . getBytes ( cs ));
135
136
}
136
137
137
138
/**
@@ -146,29 +147,181 @@ static AsyncRequestBody fromString(String string) {
146
147
}
147
148
148
149
/**
149
- * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
150
- * original byte array are not reflected in the {@link AsyncRequestBody}.
150
+ * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
151
+ * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
151
152
*
152
153
* @param bytes The bytes to send to the service.
153
154
* @return AsyncRequestBody instance.
154
155
*/
155
156
static AsyncRequestBody fromBytes (byte [] bytes ) {
156
- return new ByteArrayAsyncRequestBody (bytes , Mimetype .MIMETYPE_OCTET_STREAM );
157
+ byte [] clonedBytes = bytes .clone ();
158
+ return ByteBuffersAsyncRequestBody .from (clonedBytes );
157
159
}
158
160
159
161
/**
160
- * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
161
- * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
162
+ * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
163
+ * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
164
+ * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
165
+ * {@code AsyncRequestBody} implementation.
166
+ *
167
+ * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
168
+ *
169
+ * @param bytes The bytes to send to the service.
170
+ * @return AsyncRequestBody instance.
171
+ */
172
+ static AsyncRequestBody fromBytesUnsafe (byte [] bytes ) {
173
+ return ByteBuffersAsyncRequestBody .from (bytes );
174
+ }
175
+
176
+ /**
177
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
178
+ * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
179
+ * <p>
180
+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
181
+ * it to copy only the remaining readable bytes.
162
182
*
163
183
* @param byteBuffer ByteBuffer to send to the service.
164
184
* @return AsyncRequestBody instance.
165
185
*/
166
186
static AsyncRequestBody fromByteBuffer (ByteBuffer byteBuffer ) {
167
- return fromBytes (BinaryUtils .copyAllBytesFrom (byteBuffer ));
187
+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOf (byteBuffer );
188
+ immutableCopy .rewind ();
189
+ return ByteBuffersAsyncRequestBody .of ((long ) immutableCopy .remaining (), immutableCopy );
190
+ }
191
+
192
+ /**
193
+ * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
194
+ * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
195
+ * reflected in the {@link AsyncRequestBody}.
196
+ * <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
197
+ * only the remaining bytes.
198
+ *
199
+ * @param byteBuffer ByteBuffer to send to the service.
200
+ * @return AsyncRequestBody instance.
201
+ */
202
+ static AsyncRequestBody fromRemainingByteBuffer (ByteBuffer byteBuffer ) {
203
+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOfRemaining (byteBuffer );
204
+ return ByteBuffersAsyncRequestBody .of ((long ) immutableCopy .remaining (), immutableCopy );
205
+ }
206
+
207
+ /**
208
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
209
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
210
+ * {@code AsyncRequestBody} implementation.
211
+ * <p>
212
+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
213
+ * need it to copy only the remaining readable bytes.
214
+ *
215
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
216
+ * risks.
217
+ *
218
+ * @param byteBuffer ByteBuffer to send to the service.
219
+ * @return AsyncRequestBody instance.
220
+ */
221
+ static AsyncRequestBody fromByteBufferUnsafe (ByteBuffer byteBuffer ) {
222
+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
223
+ readOnlyBuffer .rewind ();
224
+ return ByteBuffersAsyncRequestBody .of ((long ) readOnlyBuffer .remaining (), readOnlyBuffer );
225
+ }
226
+
227
+ /**
228
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
229
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
230
+ * {@code AsyncRequestBody} implementation.
231
+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
232
+ * the buffer and reads only the remaining bytes.
233
+ *
234
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
235
+ * risks.
236
+ *
237
+ * @param byteBuffer ByteBuffer to send to the service.
238
+ * @return AsyncRequestBody instance.
239
+ */
240
+ static AsyncRequestBody fromRemainingByteBufferUnsafe (ByteBuffer byteBuffer ) {
241
+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
242
+ return ByteBuffersAsyncRequestBody .of ((long ) readOnlyBuffer .remaining (), readOnlyBuffer );
243
+ }
244
+
245
+ /**
246
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
247
+ * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
248
+ * <p>
249
+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
250
+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
251
+ *
252
+ * @param byteBuffers ByteBuffer array to send to the service.
253
+ * @return AsyncRequestBody instance.
254
+ */
255
+ static AsyncRequestBody fromByteBuffers (ByteBuffer ... byteBuffers ) {
256
+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
257
+ .map (BinaryUtils ::immutableCopyOf )
258
+ .peek (ByteBuffer ::rewind )
259
+ .toArray (ByteBuffer []::new );
260
+ return ByteBuffersAsyncRequestBody .of (immutableCopy );
261
+ }
262
+
263
+ /**
264
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
265
+ * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
266
+ * {@link AsyncRequestBody}.
267
+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
268
+ * this method respects the current read position of each buffer and reads only the remaining bytes.
269
+ *
270
+ * @param byteBuffers ByteBuffer array to send to the service.
271
+ * @return AsyncRequestBody instance.
272
+ */
273
+ static AsyncRequestBody fromRemainingByteBuffers (ByteBuffer ... byteBuffers ) {
274
+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
275
+ .map (BinaryUtils ::immutableCopyOfRemaining )
276
+ .peek (ByteBuffer ::rewind )
277
+ .toArray (ByteBuffer []::new );
278
+ return ByteBuffersAsyncRequestBody .of (immutableCopy );
279
+ }
280
+
281
+ /**
282
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
283
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
284
+ * {@code AsyncRequestBody} implementation.
285
+ * <p>
286
+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
287
+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
288
+ *
289
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
290
+ * risks.
291
+ *
292
+ * @param byteBuffers ByteBuffer array to send to the service.
293
+ * @return AsyncRequestBody instance.
294
+ */
295
+ static AsyncRequestBody fromByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
296
+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
297
+ .map (ByteBuffer ::asReadOnlyBuffer )
298
+ .peek (ByteBuffer ::rewind )
299
+ .toArray (ByteBuffer []::new );
300
+ return ByteBuffersAsyncRequestBody .of (readOnlyBuffers );
301
+ }
302
+
303
+ /**
304
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
305
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
306
+ * {@code AsyncRequestBody} implementation.
307
+ * <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
308
+ * this method respects the current read position of each buffer and reads only the remaining bytes.
309
+ *
310
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
311
+ * risks.
312
+ *
313
+ * @param byteBuffers ByteBuffer array to send to the service.
314
+ * @return AsyncRequestBody instance.
315
+ */
316
+ static AsyncRequestBody fromRemainingByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
317
+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
318
+ .map (ByteBuffer ::asReadOnlyBuffer )
319
+ .toArray (ByteBuffer []::new );
320
+ return ByteBuffersAsyncRequestBody .of (readOnlyBuffers );
168
321
}
169
322
170
323
/**
171
- * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
324
+ * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
172
325
*
173
326
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
174
327
* non-blocking event loop threads owned by the SDK.
@@ -242,7 +395,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
242
395
}
243
396
244
397
/**
245
- * Creates a {@link AsyncRequestBody} with no content.
398
+ * Creates an {@link AsyncRequestBody} with no content.
246
399
*
247
400
* @return AsyncRequestBody instance.
248
401
*/
0 commit comments