Skip to content

Add "unsafe" AsyncRequestBody constructors for better performance when concurrent modification isn't a worry #3925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-5d806ad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "StephenFlavin",
"description": "Add \"unsafe\" and \"fromRemaining\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,38 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;

/**
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
* this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
* of the data (i.e. to write that data on the wire).
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
* the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
* to write that data on the wire).
*
* <p>
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
* org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
* the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
* {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
* SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
* </p>
*
* <p>
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
* The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
* for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
* subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
* notified via the {@link org.reactivestreams.Subscription#request(long)} method.
* </p>
*
* @see FileAsyncRequestBody
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
@SdkPublicApi
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
Expand All @@ -70,8 +71,8 @@ default String contentType() {
}

/**
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
* The data is delivered when the publisher publishes the data.
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
* publisher publishes the data.
*
* @param publisher Publisher of source data
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
Expand Down Expand Up @@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
string.getBytes(cs));
}

/**
Expand All @@ -143,29 +144,181 @@ static AsyncRequestBody fromString(String string) {
}

/**
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
* original byte array are not reflected in the {@link AsyncRequestBody}.
* Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
* modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
byte[] clonedBytes = bytes.clone();
return ByteBuffersAsyncRequestBody.from(clonedBytes);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
* Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
* introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
* implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
* {@code AsyncRequestBody} implementation.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
return ByteBuffersAsyncRequestBody.from(bytes);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
* prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
* <p>
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
* it to copy only the remaining readable bytes.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
immutableCopy.rewind();
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
* remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
* reflected in the {@link AsyncRequestBody}.
* <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
* only the remaining bytes.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
* need it to copy only the remaining readable bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
* risks.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
readOnlyBuffer.rewind();
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
* the buffer and reads only the remaining bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
* risks.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
* to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
* <p>
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOf)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
* {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
* {@link AsyncRequestBody}.
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
* this method respects the current read position of each buffer and reads only the remaining bytes.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOfRemaining)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
* risks.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
.map(ByteBuffer::asReadOnlyBuffer)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
* this method respects the current read position of each buffer and reads only the remaining bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
* risks.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
.map(ByteBuffer::asReadOnlyBuffer)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link InputStream}.
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
*
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
* non-blocking event loop threads owned by the SDK.
Expand Down Expand Up @@ -239,7 +392,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
}

/**
* Creates a {@link AsyncRequestBody} with no content.
* Creates an {@link AsyncRequestBody} with no content.
*
* @return AsyncRequestBody instance.
*/
Expand Down

This file was deleted.

Loading