Skip to content

Commit 53a85a2

Browse files
committed
Revert "Add "unsafe" AsyncRequestBody constructors for byte[] and ByteBuffers (#3925)"
This reverts commit be45eb0.
1 parent 62a012b commit 53a85a2

File tree

9 files changed

+265
-1025
lines changed

9 files changed

+265
-1025
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 24 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,37 @@
2222
import java.nio.charset.Charset;
2323
import java.nio.charset.StandardCharsets;
2424
import java.nio.file.Path;
25-
import java.util.Arrays;
2625
import java.util.Optional;
2726
import java.util.concurrent.ExecutorService;
2827
import org.reactivestreams.Publisher;
2928
import org.reactivestreams.Subscriber;
3029
import software.amazon.awssdk.annotations.SdkPublicApi;
31-
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
30+
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
3231
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3332
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3433
import software.amazon.awssdk.core.internal.util.Mimetype;
3534
import software.amazon.awssdk.utils.BinaryUtils;
3635

3736
/**
38-
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
39-
* the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
40-
* to write that data on the wire).
37+
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
38+
* this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
39+
* of the data (i.e. to write that data on the wire).
4140
*
4241
* <p>
4342
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
44-
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
45-
* {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
46-
* SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
43+
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
44+
* org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
45+
* the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
4746
* </p>
4847
*
4948
* <p>
50-
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
51-
* subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
52-
* notified via the {@link org.reactivestreams.Subscription#request(long)} method.
49+
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
50+
* The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
51+
* for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
5352
* </p>
5453
*
5554
* @see FileAsyncRequestBody
56-
* @see ByteBuffersAsyncRequestBody
55+
* @see ByteArrayAsyncRequestBody
5756
*/
5857
@SdkPublicApi
5958
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
@@ -71,8 +70,8 @@ default String contentType() {
7170
}
7271

7372
/**
74-
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
75-
* publisher publishes the data.
73+
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
74+
* The data is delivered when the publisher publishes the data.
7675
*
7776
* @param publisher Publisher of source data
7877
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -125,11 +124,11 @@ static AsyncRequestBody fromFile(File file) {
125124
* @param string The string to provide.
126125
* @param cs The {@link Charset} to use.
127126
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
128-
* @see ByteBuffersAsyncRequestBody
127+
* @see ByteArrayAsyncRequestBody
129128
*/
130129
static AsyncRequestBody fromString(String string, Charset cs) {
131-
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
132-
string.getBytes(cs));
130+
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
131+
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
133132
}
134133

135134
/**
@@ -144,181 +143,29 @@ static AsyncRequestBody fromString(String string) {
144143
}
145144

146145
/**
147-
* Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
148-
* modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
146+
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147+
* original byte array are not reflected in the {@link AsyncRequestBody}.
149148
*
150149
* @param bytes The bytes to send to the service.
151150
* @return AsyncRequestBody instance.
152151
*/
153152
static AsyncRequestBody fromBytes(byte[] bytes) {
154-
byte[] clonedBytes = bytes.clone();
155-
return ByteBuffersAsyncRequestBody.from(clonedBytes);
153+
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
156154
}
157155

158156
/**
159-
* Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
160-
* introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
161-
* implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
162-
* {@code AsyncRequestBody} implementation.
163-
*
164-
* <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
165-
*
166-
* @param bytes The bytes to send to the service.
167-
* @return AsyncRequestBody instance.
168-
*/
169-
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
170-
return ByteBuffersAsyncRequestBody.from(bytes);
171-
}
172-
173-
/**
174-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
175-
* prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
176-
* <p>
177-
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
178-
* it to copy only the remaining readable bytes.
157+
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158+
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
179159
*
180160
* @param byteBuffer ByteBuffer to send to the service.
181161
* @return AsyncRequestBody instance.
182162
*/
183163
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
184-
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
185-
immutableCopy.rewind();
186-
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
187-
}
188-
189-
/**
190-
* Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
191-
* remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
192-
* reflected in the {@link AsyncRequestBody}.
193-
* <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
194-
* only the remaining bytes.
195-
*
196-
* @param byteBuffer ByteBuffer to send to the service.
197-
* @return AsyncRequestBody instance.
198-
*/
199-
static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
200-
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
201-
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
202-
}
203-
204-
/**
205-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
206-
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
207-
* {@code AsyncRequestBody} implementation.
208-
* <p>
209-
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
210-
* need it to copy only the remaining readable bytes.
211-
*
212-
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
213-
* risks.
214-
*
215-
* @param byteBuffer ByteBuffer to send to the service.
216-
* @return AsyncRequestBody instance.
217-
*/
218-
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
219-
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
220-
readOnlyBuffer.rewind();
221-
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
222-
}
223-
224-
/**
225-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
226-
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
227-
* {@code AsyncRequestBody} implementation.
228-
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
229-
* the buffer and reads only the remaining bytes.
230-
*
231-
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
232-
* risks.
233-
*
234-
* @param byteBuffer ByteBuffer to send to the service.
235-
* @return AsyncRequestBody instance.
236-
*/
237-
static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
238-
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
239-
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
240-
}
241-
242-
/**
243-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
244-
* to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
245-
* <p>
246-
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
247-
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
248-
*
249-
* @param byteBuffers ByteBuffer array to send to the service.
250-
* @return AsyncRequestBody instance.
251-
*/
252-
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
253-
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
254-
.map(BinaryUtils::immutableCopyOf)
255-
.peek(ByteBuffer::rewind)
256-
.toArray(ByteBuffer[]::new);
257-
return ByteBuffersAsyncRequestBody.of(immutableCopy);
258-
}
259-
260-
/**
261-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
262-
* {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
263-
* {@link AsyncRequestBody}.
264-
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
265-
* this method respects the current read position of each buffer and reads only the remaining bytes.
266-
*
267-
* @param byteBuffers ByteBuffer array to send to the service.
268-
* @return AsyncRequestBody instance.
269-
*/
270-
static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
271-
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
272-
.map(BinaryUtils::immutableCopyOfRemaining)
273-
.peek(ByteBuffer::rewind)
274-
.toArray(ByteBuffer[]::new);
275-
return ByteBuffersAsyncRequestBody.of(immutableCopy);
276-
}
277-
278-
/**
279-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
280-
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
281-
* {@code AsyncRequestBody} implementation.
282-
* <p>
283-
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
284-
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
285-
*
286-
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
287-
* risks.
288-
*
289-
* @param byteBuffers ByteBuffer array to send to the service.
290-
* @return AsyncRequestBody instance.
291-
*/
292-
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
293-
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
294-
.map(ByteBuffer::asReadOnlyBuffer)
295-
.peek(ByteBuffer::rewind)
296-
.toArray(ByteBuffer[]::new);
297-
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
298-
}
299-
300-
/**
301-
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
302-
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
303-
* {@code AsyncRequestBody} implementation.
304-
* <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
305-
* this method respects the current read position of each buffer and reads only the remaining bytes.
306-
*
307-
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
308-
* risks.
309-
*
310-
* @param byteBuffers ByteBuffer array to send to the service.
311-
* @return AsyncRequestBody instance.
312-
*/
313-
static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
314-
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
315-
.map(ByteBuffer::asReadOnlyBuffer)
316-
.toArray(ByteBuffer[]::new);
317-
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
164+
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
318165
}
319166

320167
/**
321-
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
168+
* Creates a {@link AsyncRequestBody} from a {@link InputStream}.
322169
*
323170
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
324171
* non-blocking event loop threads owned by the SDK.
@@ -392,7 +239,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
392239
}
393240

394241
/**
395-
* Creates an {@link AsyncRequestBody} with no content.
242+
* Creates a {@link AsyncRequestBody} with no content.
396243
*
397244
* @return AsyncRequestBody instance.
398245
*/
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
22+
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.core.async.AsyncRequestBody;
24+
import software.amazon.awssdk.utils.Logger;
25+
26+
/**
27+
* An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static
28+
* methods on {@link AsyncRequestBody}
29+
*
30+
* @see AsyncRequestBody#fromBytes(byte[])
31+
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
32+
* @see AsyncRequestBody#fromString(String)
33+
*/
34+
@SdkInternalApi
35+
public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
36+
private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);
37+
38+
private final byte[] bytes;
39+
40+
private final String mimetype;
41+
42+
public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) {
43+
this.bytes = bytes.clone();
44+
this.mimetype = mimetype;
45+
}
46+
47+
@Override
48+
public Optional<Long> contentLength() {
49+
return Optional.of((long) bytes.length);
50+
}
51+
52+
@Override
53+
public String contentType() {
54+
return mimetype;
55+
}
56+
57+
@Override
58+
public void subscribe(Subscriber<? super ByteBuffer> s) {
59+
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
60+
if (s == null) {
61+
throw new NullPointerException("Subscription MUST NOT be null.");
62+
}
63+
64+
// As per 2.13, this method must return normally (i.e. not throw).
65+
try {
66+
s.onSubscribe(
67+
new Subscription() {
68+
private boolean done = false;
69+
70+
@Override
71+
public void request(long n) {
72+
if (done) {
73+
return;
74+
}
75+
if (n > 0) {
76+
done = true;
77+
s.onNext(ByteBuffer.wrap(bytes));
78+
s.onComplete();
79+
} else {
80+
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
81+
}
82+
}
83+
84+
@Override
85+
public void cancel() {
86+
synchronized (this) {
87+
if (!done) {
88+
done = true;
89+
}
90+
}
91+
}
92+
}
93+
);
94+
} catch (Throwable ex) {
95+
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)