22
22
import java .nio .charset .Charset ;
23
23
import java .nio .charset .StandardCharsets ;
24
24
import java .nio .file .Path ;
25
+ import java .util .Arrays ;
25
26
import java .util .Optional ;
26
27
import java .util .concurrent .ExecutorService ;
27
28
import org .reactivestreams .Publisher ;
28
29
import org .reactivestreams .Subscriber ;
29
30
import software .amazon .awssdk .annotations .SdkPublicApi ;
30
- import software .amazon .awssdk .core .internal .async .ByteArrayAsyncRequestBody ;
31
+ import software .amazon .awssdk .core .internal .async .ByteBuffersAsyncRequestBody ;
31
32
import software .amazon .awssdk .core .internal .async .FileAsyncRequestBody ;
32
33
import software .amazon .awssdk .core .internal .async .InputStreamWithExecutorAsyncRequestBody ;
33
34
import software .amazon .awssdk .core .internal .util .Mimetype ;
34
35
import software .amazon .awssdk .utils .BinaryUtils ;
35
36
36
37
/**
37
- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
38
- * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
39
- * of the data (i.e. to write that data on the wire).
38
+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
39
+ * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
40
+ * to write that data on the wire).
40
41
*
41
42
* <p>
42
43
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
43
- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
44
- * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
45
- * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
44
+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
45
+ * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
46
+ * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
46
47
* </p>
47
48
*
48
49
* <p>
49
- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
50
- * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
51
- * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
50
+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
51
+ * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
52
+ * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
52
53
* </p>
53
54
*
54
55
* @see FileAsyncRequestBody
55
- * @see ByteArrayAsyncRequestBody
56
+ * @see ByteBuffersAsyncRequestBody
56
57
*/
57
58
@ SdkPublicApi
58
59
public interface AsyncRequestBody extends SdkPublisher <ByteBuffer > {
@@ -70,8 +71,8 @@ default String contentType() {
70
71
}
71
72
72
73
/**
73
- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
74
- * The data is delivered when the publisher publishes the data.
74
+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
75
+ * publisher publishes the data.
75
76
*
76
77
* @param publisher Publisher of source data
77
78
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
124
125
* @param string The string to provide.
125
126
* @param cs The {@link Charset} to use.
126
127
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127
- * @see ByteArrayAsyncRequestBody
128
+ * @see ByteBuffersAsyncRequestBody
128
129
*/
129
130
static AsyncRequestBody fromString (String string , Charset cs ) {
130
- return new ByteArrayAsyncRequestBody ( string . getBytes ( cs ),
131
- Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ));
131
+ return ByteBuffersAsyncRequestBody . from ( Mimetype . MIMETYPE_TEXT_PLAIN + "; charset=" + cs . name ( ),
132
+ string . getBytes ( cs ));
132
133
}
133
134
134
135
/**
@@ -143,29 +144,96 @@ static AsyncRequestBody fromString(String string) {
143
144
}
144
145
145
146
/**
146
- * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147
- * original byte array are not reflected in the {@link AsyncRequestBody}.
147
+ * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
148
+ * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
148
149
*
149
150
* @param bytes The bytes to send to the service.
150
151
* @return AsyncRequestBody instance.
151
152
*/
152
153
static AsyncRequestBody fromBytes (byte [] bytes ) {
153
- return new ByteArrayAsyncRequestBody (bytes , Mimetype .MIMETYPE_OCTET_STREAM );
154
+ byte [] clonedBytes = bytes .clone ();
155
+ return ByteBuffersAsyncRequestBody .from (clonedBytes );
154
156
}
155
157
156
158
/**
157
- * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158
- * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
159
+ * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
160
+ * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
161
+ * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
162
+ * {@code AsyncRequestBody} implementation.
163
+ *
164
+ * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
165
+ *
166
+ * @param bytes The bytes to send to the service.
167
+ * @return AsyncRequestBody instance.
168
+ */
169
+ static AsyncRequestBody fromBytesUnsafe (byte [] bytes ) {
170
+ return ByteBuffersAsyncRequestBody .from (bytes );
171
+ }
172
+
173
+ /**
174
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
175
+ * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
176
+ * <p> The position is set to 0 in the copied {@link ByteBuffer} and the mark if defined is discarded.
159
177
*
160
178
* @param byteBuffer ByteBuffer to send to the service.
161
179
* @return AsyncRequestBody instance.
162
180
*/
163
181
static AsyncRequestBody fromByteBuffer (ByteBuffer byteBuffer ) {
164
- return fromBytes (BinaryUtils .copyAllBytesFrom (byteBuffer ));
182
+ ByteBuffer immutableCopy = BinaryUtils .immutableCopyOf (byteBuffer );
183
+ immutableCopy .rewind ();
184
+ return ByteBuffersAsyncRequestBody .of (null , immutableCopy );
185
+ }
186
+
187
+ /**
188
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
189
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing: (1) the caller to modify the {@link ByteBuffer} stored in
190
+ * this {@code AsyncRequestBody} implementation AND (2) any users of {@link #fromByteBufferUnsafe(ByteBuffer)} to modify the
191
+ * {@link ByteBuffer} passed into this {@code AsyncRequestBody} implementation.
192
+ *
193
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
194
+ * risks.
195
+ *
196
+ * @param byteBuffer ByteBuffer to send to the service.
197
+ * @return AsyncRequestBody instance.
198
+ */
199
+ static AsyncRequestBody fromByteBufferUnsafe (ByteBuffer byteBuffer ) {
200
+ return ByteBuffersAsyncRequestBody .of (null , byteBuffer );
201
+ }
202
+
203
+ /**
204
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
205
+ * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
206
+ * <p> The position is set to 0 in each copied {@link ByteBuffer} and the mark if defined is discarded.
207
+ *
208
+ * @param byteBuffers ByteBuffer array to send to the service.
209
+ * @return AsyncRequestBody instance.
210
+ */
211
+ static AsyncRequestBody fromByteBuffers (ByteBuffer ... byteBuffers ) {
212
+ ByteBuffer [] immutableCopy = Arrays .stream (byteBuffers )
213
+ .map (BinaryUtils ::immutableCopyOf )
214
+ .peek (ByteBuffer ::rewind )
215
+ .toArray (ByteBuffer []::new );
216
+ return ByteBuffersAsyncRequestBody .of (null , immutableCopy );
217
+ }
218
+
219
+ /**
220
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
221
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing: (1) the caller to modify any {@link ByteBuffer} stored in
222
+ * this {@code AsyncRequestBody} implementation AND (2) any users of {@link #fromByteBufferUnsafe(ByteBuffer)} to modify any
223
+ * {@link ByteBuffer} passed into this {@code AsyncRequestBody} implementation.
224
+ *
225
+ * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
226
+ * risks.
227
+ *
228
+ * @param byteBuffers ByteBuffer array to send to the service.
229
+ * @return AsyncRequestBody instance.
230
+ */
231
+ static AsyncRequestBody fromByteBuffersUnsafe (ByteBuffer ... byteBuffers ) {
232
+ return ByteBuffersAsyncRequestBody .of (null , byteBuffers );
165
233
}
166
234
167
235
/**
168
- * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
236
+ * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
169
237
*
170
238
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
171
239
* non-blocking event loop threads owned by the SDK.
@@ -239,7 +307,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
239
307
}
240
308
241
309
/**
242
- * Creates a {@link AsyncRequestBody} with no content.
310
+ * Creates an {@link AsyncRequestBody} with no content.
243
311
*
244
312
* @return AsyncRequestBody instance.
245
313
*/
0 commit comments