Skip to content

Revert "Add "unsafe" AsyncRequestBody constructors for byte[] and Byt… #4150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,37 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;

/**
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
* the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
* to write that data on the wire).
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
* this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
* of the data (i.e. to write that data on the wire).
*
* <p>
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
* {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
* SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
* org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
* the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
* </p>
*
* <p>
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
* subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
* notified via the {@link org.reactivestreams.Subscription#request(long)} method.
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
* The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
* for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
* </p>
*
* @see FileAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
* @see ByteArrayAsyncRequestBody
*/
@SdkPublicApi
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
Expand All @@ -71,8 +70,8 @@ default String contentType() {
}

/**
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
* publisher publishes the data.
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
* The data is delivered when the publisher publishes the data.
*
* @param publisher Publisher of source data
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
Expand Down Expand Up @@ -125,11 +124,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
* @see ByteBuffersAsyncRequestBody
* @see ByteArrayAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
string.getBytes(cs));
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
}

/**
Expand All @@ -144,181 +143,29 @@ static AsyncRequestBody fromString(String string) {
}

/**
* Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
* modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
* original byte array are not reflected in the {@link AsyncRequestBody}.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
byte[] clonedBytes = bytes.clone();
return ByteBuffersAsyncRequestBody.from(clonedBytes);
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
}

/**
* Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
* introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
* implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
* {@code AsyncRequestBody} implementation.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
return ByteBuffersAsyncRequestBody.from(bytes);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
* prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
* <p>
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
* it to copy only the remaining readable bytes.
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
immutableCopy.rewind();
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
* remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
* reflected in the {@link AsyncRequestBody}.
* <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
* only the remaining bytes.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
* need it to copy only the remaining readable bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
* risks.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
readOnlyBuffer.rewind();
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
* the buffer and reads only the remaining bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
* risks.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
* to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
* <p>
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOf)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
* {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
* {@link AsyncRequestBody}.
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
* this method respects the current read position of each buffer and reads only the remaining bytes.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOfRemaining)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
* risks.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
.map(ByteBuffer::asReadOnlyBuffer)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
* this method respects the current read position of each buffer and reads only the remaining bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
* risks.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
.map(ByteBuffer::asReadOnlyBuffer)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
}

/**
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
* Creates a {@link AsyncRequestBody} from a {@link InputStream}.
*
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
* non-blocking event loop threads owned by the SDK.
Expand Down Expand Up @@ -392,7 +239,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
}

/**
* Creates an {@link AsyncRequestBody} with no content.
* Creates a {@link AsyncRequestBody} with no content.
*
* @return AsyncRequestBody instance.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.utils.Logger;

/**
* An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static
* methods on {@link AsyncRequestBody}
*
* @see AsyncRequestBody#fromBytes(byte[])
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
* @see AsyncRequestBody#fromString(String)
*/
@SdkInternalApi
public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);

private final byte[] bytes;

private final String mimetype;

public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) {
this.bytes = bytes.clone();
this.mimetype = mimetype;
}

@Override
public Optional<Long> contentLength() {
return Optional.of((long) bytes.length);
}

@Override
public String contentType() {
return mimetype;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
if (s == null) {
throw new NullPointerException("Subscription MUST NOT be null.");
}

// As per 2.13, this method must return normally (i.e. not throw).
try {
s.onSubscribe(
new Subscription() {
private boolean done = false;

@Override
public void request(long n) {
if (done) {
return;
}
if (n > 0) {
done = true;
s.onNext(ByteBuffer.wrap(bytes));
s.onComplete();
} else {
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
}
}

@Override
public void cancel() {
synchronized (this) {
if (!done) {
done = true;
}
}
}
}
);
} catch (Throwable ex) {
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
}
}
}
Loading