22
22
import java .nio .charset .Charset ;
23
23
import java .nio .charset .StandardCharsets ;
24
24
import java .nio .file .Path ;
25
+ import java .util .Arrays ;
25
26
import java .util .Optional ;
26
27
import java .util .concurrent .ExecutorService ;
27
28
import org .reactivestreams .Publisher ;
28
29
import org .reactivestreams .Subscriber ;
29
30
import software .amazon .awssdk .annotations .SdkPublicApi ;
30
- import software .amazon .awssdk .core .internal .async .ByteArrayAsyncRequestBody ;
31
+ import software .amazon .awssdk .core .internal .async .ByteBuffersAsyncRequestBody ;
31
32
import software .amazon .awssdk .core .internal .async .FileAsyncRequestBody ;
32
33
import software .amazon .awssdk .core .internal .async .InputStreamWithExecutorAsyncRequestBody ;
33
34
import software .amazon .awssdk .core .internal .util .Mimetype ;
34
35
import software .amazon .awssdk .utils .BinaryUtils ;
35
36
36
37
/**
37
- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
38
- * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
39
- * of the data (i.e. to write that data on the wire).
38
+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
39
+ * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
40
+ * to write that data on the wire).
40
41
*
41
42
* <p>
42
43
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
43
- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
44
- * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
45
- * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
44
+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
45
+ * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
46
+ * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
46
47
* </p>
47
48
*
48
49
* <p>
49
- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
50
- * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
51
- * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
50
+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
51
+ * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
52
+ * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
52
53
* </p>
53
54
*
54
55
* @see FileAsyncRequestBody
55
- * @see ByteArrayAsyncRequestBody
56
+ * @see ByteBuffersAsyncRequestBody
56
57
*/
57
58
@ SdkPublicApi
58
59
public interface AsyncRequestBody extends SdkPublisher <ByteBuffer > {
@@ -70,8 +71,8 @@ default String contentType() {
70
71
}
71
72
72
73
/**
73
- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
74
- * The data is delivered when the publisher publishes the data.
74
+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
75
+ * publisher publishes the data.
75
76
*
76
77
* @param publisher Publisher of source data
77
78
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
124
125
* @param string The string to provide.
125
126
* @param cs The {@link Charset} to use.
126
127
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127
- * @see ByteArrayAsyncRequestBody
128
+ * @see ByteBuffersAsyncRequestBody
128
129
*/
129
130
static AsyncRequestBody fromString (String string , Charset cs ) {
130
- return new ByteArrayAsyncRequestBody ( string . getBytes ( cs ),
131
- Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ));
131
+ return ByteBuffersAsyncRequestBody . from ( Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ),
132
+ string . getBytes ( cs ));
132
133
}
133
134
134
135
/**
@@ -143,29 +144,180 @@ static AsyncRequestBody fromString(String string) {
143
144
}
144
145
145
146
/**
146
- * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147
- * original byte array are not reflected in the {@link AsyncRequestBody}.
147
+ * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
148
+ * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
148
149
*
149
150
* @param bytes The bytes to send to the service.
150
151
* @return AsyncRequestBody instance.
151
152
*/
152
153
static AsyncRequestBody fromBytes (byte [] bytes ) {
153
- return new ByteArrayAsyncRequestBody (bytes , Mimetype .MIMETYPE_OCTET_STREAM );
154
+ byte [] clonedBytes = bytes .clone ();
155
+ return ByteBuffersAsyncRequestBody .from (clonedBytes );
154
156
}
155
157
156
158
/**
157
- * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158
- * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
159
+ * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
160
+ * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
161
+ * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
162
+ * {@code AsyncRequestBody} implementation.
163
+ *
164
+ * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
165
+ *
166
+ * @param bytes The bytes to send to the service.
167
+ * @return AsyncRequestBody instance.
168
+ */
169
+ static AsyncRequestBody fromBytesUnsafe (byte [] bytes ) {
170
+ return ByteBuffersAsyncRequestBody .from (bytes );
171
+ }
172
+
173
+ /**
174
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
175
+ * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
176
+ * <p>
177
+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
178
+ * it to copy only the remaining readable bytes.
159
179
*
160
180
* @param byteBuffer ByteBuffer to send to the service.
161
181
* @return AsyncRequestBody instance.
162
182
*/
163
183
static AsyncRequestBody fromByteBuffer (ByteBuffer byteBuffer ) {
164
- return fromBytes (BinaryUtils .copyAllBytesFrom (byteBuffer ));
184
+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOf (byteBuffer );
185
+ immutableCopy .rewind ();
186
+ return ByteBuffersAsyncRequestBody .of (null , immutableCopy );
187
+ }
188
+
189
+ /**
190
+ * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
191
+ * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
192
+ * reflected in the {@link AsyncRequestBody}.
193
+ * <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
194
+ * only the remaining bytes.
195
+ *
196
+ * @param byteBuffer ByteBuffer to send to the service.
197
+ * @return AsyncRequestBody instance.
198
+ */
199
+ static AsyncRequestBody fromRemainingByteBuffer (ByteBuffer byteBuffer ) {
200
+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOfRemaining (byteBuffer );
201
+ return ByteBuffersAsyncRequestBody .of (null , immutableCopy );
202
+ }
203
+
204
+ /**
205
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
206
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
207
+ * {@code AsyncRequestBody} implementation.
208
+ * <p>
209
+ * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
210
+ * need it to copy only the remaining readable bytes.
211
+ *
212
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
213
+ * risks.
214
+ *
215
+ * @param byteBuffer ByteBuffer to send to the service.
216
+ * @return AsyncRequestBody instance.
217
+ */
218
+ static AsyncRequestBody fromByteBufferUnsafe (ByteBuffer byteBuffer ) {
219
+ ByteBuffer readOnlyBuffer = byteBuffer .asReadOnlyBuffer ();
220
+ readOnlyBuffer .rewind ();
221
+ return ByteBuffersAsyncRequestBody .of (null , readOnlyBuffer );
222
+ }
223
+
224
+ /**
225
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
226
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
227
+ * {@code AsyncRequestBody} implementation.
228
+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
229
+ * the buffer and reads only the remaining bytes.
230
+ *
231
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
232
+ * risks.
233
+ *
234
+ * @param byteBuffer ByteBuffer to send to the service.
235
+ * @return AsyncRequestBody instance.
236
+ */
237
+ static AsyncRequestBody fromRemainingByteBufferUnsafe (ByteBuffer byteBuffer ) {
238
+ return ByteBuffersAsyncRequestBody .of (null , byteBuffer .asReadOnlyBuffer ());
239
+ }
240
+
241
+ /**
242
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
243
+ * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
244
+ * <p>
245
+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
246
+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
247
+ *
248
+ * @param byteBuffers ByteBuffer array to send to the service.
249
+ * @return AsyncRequestBody instance.
250
+ */
251
+ static AsyncRequestBody fromByteBuffers (ByteBuffer ... byteBuffers ) {
252
+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
253
+ .map (BinaryUtils ::immutableCopyOf )
254
+ .peek (ByteBuffer ::rewind )
255
+ .toArray (ByteBuffer []::new );
256
+ return ByteBuffersAsyncRequestBody .of (null , immutableCopy );
257
+ }
258
+
259
+ /**
260
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
261
+ * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
262
+ * {@link AsyncRequestBody}.
263
+ * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
264
+ * this method respects the current read position of each buffer and reads only the remaining bytes.
265
+ *
266
+ * @param byteBuffers ByteBuffer array to send to the service.
267
+ * @return AsyncRequestBody instance.
268
+ */
269
+ static AsyncRequestBody fromRemainingByteBuffers (ByteBuffer ... byteBuffers ) {
270
+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
271
+ .map (BinaryUtils ::immutableCopyOfRemaining )
272
+ .peek (ByteBuffer ::rewind )
273
+ .toArray (ByteBuffer []::new );
274
+ return ByteBuffersAsyncRequestBody .of (null , immutableCopy );
275
+ }
276
+
277
+ /**
278
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
279
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
280
+ * {@code AsyncRequestBody} implementation.
281
+ * <p>
282
+ * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
283
+ * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
284
+ *
285
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
286
+ * risks.
287
+ *
288
+ * @param byteBuffers ByteBuffer array to send to the service.
289
+ * @return AsyncRequestBody instance.
290
+ */
291
+ static AsyncRequestBody fromByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
292
+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
293
+ .map (ByteBuffer ::asReadOnlyBuffer )
294
+ .peek (ByteBuffer ::rewind )
295
+ .toArray (ByteBuffer []::new );
296
+ return ByteBuffersAsyncRequestBody .of (null , readOnlyBuffers );
297
+ }
298
+
299
+ /**
300
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
301
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
302
+ * {@code AsyncRequestBody} implementation.
303
+ * <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
304
+ * this method respects the current read position of each buffer and reads only the remaining bytes.
305
+ *
306
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
307
+ * risks.
308
+ *
309
+ * @param byteBuffers ByteBuffer array to send to the service.
310
+ * @return AsyncRequestBody instance.
311
+ */
312
+ static AsyncRequestBody fromRemainingByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
313
+ ByteBuffer [] readOnlyBuffers = Arrays .stream (byteBuffers )
314
+ .map (ByteBuffer ::asReadOnlyBuffer )
315
+ .toArray (ByteBuffer []::new );
316
+ return ByteBuffersAsyncRequestBody .of (null , readOnlyBuffers );
165
317
}
166
318
167
319
/**
168
- * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
320
+ * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
169
321
*
170
322
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
171
323
* non-blocking event loop threads owned by the SDK.
@@ -239,7 +391,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
239
391
}
240
392
241
393
/**
242
- * Creates a {@link AsyncRequestBody} with no content.
394
+ * Creates an {@link AsyncRequestBody} with no content.
243
395
*
244
396
* @return AsyncRequestBody instance.
245
397
*/
0 commit comments