Skip to content

Commit f6bc363

Browse files
committed
Add "unsafe" AsyncRequestBody constructors for byte[] and ByteBuffers
1 parent 15abae1 commit f6bc363

File tree

10 files changed

+1033
-265
lines changed

10 files changed

+1033
-265
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "StephenFlavin",
5+
"description": "Add \"unsafe\" and \"fromRemaining\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 177 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,38 @@
2222
import java.nio.charset.Charset;
2323
import java.nio.charset.StandardCharsets;
2424
import java.nio.file.Path;
25+
import java.util.Arrays;
2526
import java.util.Optional;
2627
import java.util.concurrent.ExecutorService;
2728
import org.reactivestreams.Publisher;
2829
import org.reactivestreams.Subscriber;
2930
import software.amazon.awssdk.annotations.SdkPublicApi;
30-
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
31+
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
3132
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3233
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3334
import software.amazon.awssdk.core.internal.util.Mimetype;
3435
import software.amazon.awssdk.utils.BinaryUtils;
3536

3637
/**
37-
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
38-
* this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
39-
* of the data (i.e. to write that data on the wire).
38+
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
39+
* the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
40+
* to write that data on the wire).
4041
*
4142
* <p>
4243
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
43-
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
44-
* org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
45-
* the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
44+
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
45+
* {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
46+
* SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
4647
* </p>
4748
*
4849
* <p>
49-
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
50-
* The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
51-
* for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
50+
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
51+
* subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
52+
* notified via the {@link org.reactivestreams.Subscription#request(long)} method.
5253
* </p>
5354
*
5455
* @see FileAsyncRequestBody
55-
* @see ByteArrayAsyncRequestBody
56+
* @see ByteBuffersAsyncRequestBody
5657
*/
5758
@SdkPublicApi
5859
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
@@ -70,8 +71,8 @@ default String contentType() {
7071
}
7172

7273
/**
73-
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
74-
* The data is delivered when the publisher publishes the data.
74+
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
75+
* publisher publishes the data.
7576
*
7677
* @param publisher Publisher of source data
7778
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
124125
* @param string The string to provide.
125126
* @param cs The {@link Charset} to use.
126127
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127-
* @see ByteArrayAsyncRequestBody
128+
* @see ByteBuffersAsyncRequestBody
128129
*/
129130
static AsyncRequestBody fromString(String string, Charset cs) {
130-
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
131-
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
131+
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
132+
string.getBytes(cs));
132133
}
133134

134135
/**
@@ -143,29 +144,181 @@ static AsyncRequestBody fromString(String string) {
143144
}
144145

145146
/**
146-
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147-
* original byte array are not reflected in the {@link AsyncRequestBody}.
147+
* Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
148+
* modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
148149
*
149150
* @param bytes The bytes to send to the service.
150151
* @return AsyncRequestBody instance.
151152
*/
152153
static AsyncRequestBody fromBytes(byte[] bytes) {
153-
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
154+
byte[] clonedBytes = bytes.clone();
155+
return ByteBuffersAsyncRequestBody.from(clonedBytes);
154156
}
155157

156158
/**
157-
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158-
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
159+
* Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
160+
* introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
161+
* implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
162+
* {@code AsyncRequestBody} implementation.
163+
*
164+
* <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
165+
*
166+
* @param bytes The bytes to send to the service.
167+
* @return AsyncRequestBody instance.
168+
*/
169+
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
170+
return ByteBuffersAsyncRequestBody.from(bytes);
171+
}
172+
173+
/**
174+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
175+
* prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
176+
* <p>
177+
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
178+
* it to copy only the remaining readable bytes.
159179
*
160180
* @param byteBuffer ByteBuffer to send to the service.
161181
* @return AsyncRequestBody instance.
162182
*/
163183
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
164-
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
184+
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
185+
immutableCopy.rewind();
186+
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
187+
}
188+
189+
/**
190+
* Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
191+
* remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
192+
* reflected in the {@link AsyncRequestBody}.
193+
* <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
194+
* only the remaining bytes.
195+
*
196+
* @param byteBuffer ByteBuffer to send to the service.
197+
* @return AsyncRequestBody instance.
198+
*/
199+
static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
200+
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
201+
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
202+
}
203+
204+
/**
205+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
206+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
207+
* {@code AsyncRequestBody} implementation.
208+
* <p>
209+
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
210+
* need it to copy only the remaining readable bytes.
211+
*
212+
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
213+
* risks.
214+
*
215+
* @param byteBuffer ByteBuffer to send to the service.
216+
* @return AsyncRequestBody instance.
217+
*/
218+
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
219+
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
220+
readOnlyBuffer.rewind();
221+
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
222+
}
223+
224+
/**
225+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
226+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
227+
* {@code AsyncRequestBody} implementation.
228+
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
229+
* the buffer and reads only the remaining bytes.
230+
*
231+
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
232+
* risks.
233+
*
234+
* @param byteBuffer ByteBuffer to send to the service.
235+
* @return AsyncRequestBody instance.
236+
*/
237+
static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
238+
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
239+
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
240+
}
241+
242+
/**
243+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
244+
* to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
245+
* <p>
246+
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
247+
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
248+
*
249+
* @param byteBuffers ByteBuffer array to send to the service.
250+
* @return AsyncRequestBody instance.
251+
*/
252+
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
253+
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
254+
.map(BinaryUtils::immutableCopyOf)
255+
.peek(ByteBuffer::rewind)
256+
.toArray(ByteBuffer[]::new);
257+
return ByteBuffersAsyncRequestBody.of(immutableCopy);
258+
}
259+
260+
/**
261+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
262+
* {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
263+
* {@link AsyncRequestBody}.
264+
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
265+
* this method respects the current read position of each buffer and reads only the remaining bytes.
266+
*
267+
* @param byteBuffers ByteBuffer array to send to the service.
268+
* @return AsyncRequestBody instance.
269+
*/
270+
static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
271+
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
272+
.map(BinaryUtils::immutableCopyOfRemaining)
273+
.peek(ByteBuffer::rewind)
274+
.toArray(ByteBuffer[]::new);
275+
return ByteBuffersAsyncRequestBody.of(immutableCopy);
276+
}
277+
278+
/**
279+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
280+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
281+
* {@code AsyncRequestBody} implementation.
282+
* <p>
283+
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
284+
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
285+
*
286+
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
287+
* risks.
288+
*
289+
* @param byteBuffers ByteBuffer array to send to the service.
290+
* @return AsyncRequestBody instance.
291+
*/
292+
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
293+
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
294+
.map(ByteBuffer::asReadOnlyBuffer)
295+
.peek(ByteBuffer::rewind)
296+
.toArray(ByteBuffer[]::new);
297+
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
298+
}
299+
300+
/**
301+
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
302+
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
303+
* {@code AsyncRequestBody} implementation.
304+
* <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
305+
* this method respects the current read position of each buffer and reads only the remaining bytes.
306+
*
307+
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
308+
* risks.
309+
*
310+
* @param byteBuffers ByteBuffer array to send to the service.
311+
* @return AsyncRequestBody instance.
312+
*/
313+
static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
314+
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
315+
.map(ByteBuffer::asReadOnlyBuffer)
316+
.toArray(ByteBuffer[]::new);
317+
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
165318
}
166319

167320
/**
168-
* Creates a {@link AsyncRequestBody} from a {@link InputStream}.
321+
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
169322
*
170323
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
171324
* non-blocking event loop threads owned by the SDK.
@@ -239,7 +392,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
239392
}
240393

241394
/**
242-
* Creates a {@link AsyncRequestBody} with no content.
395+
* Creates an {@link AsyncRequestBody} with no content.
243396
*
244397
* @return AsyncRequestBody instance.
245398
*/

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java

Lines changed: 0 additions & 98 deletions
This file was deleted.

0 commit comments

Comments
 (0)