Skip to content

Commit 4f64fd9

Browse files
committed
Add "unsafe" AsyncRequestBody constructors for byte[] and ByteBuffers
1 parent 754d525 commit 4f64fd9

File tree

9 files changed

+721
-248
lines changed

9 files changed

+721
-248
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "StephenFlavin",
5+
"description": "Add \"unsafe\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
import java.nio.charset.Charset;
2323
import java.nio.charset.StandardCharsets;
2424
import java.nio.file.Path;
25+
import java.util.Arrays;
2526
import java.util.Optional;
2627
import java.util.concurrent.ExecutorService;
2728
import org.reactivestreams.Publisher;
2829
import org.reactivestreams.Subscriber;
2930
import software.amazon.awssdk.annotations.SdkPublicApi;
30-
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
31+
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
3132
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3233
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
3334
import software.amazon.awssdk.core.internal.util.Mimetype;
@@ -52,7 +53,7 @@
5253
* </p>
5354
*
5455
* @see FileAsyncRequestBody
55-
* @see ByteArrayAsyncRequestBody
56+
* @see ByteBuffersAsyncRequestBody
5657
*/
5758
@SdkPublicApi
5859
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
@@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
124125
* @param string The string to provide.
125126
* @param cs The {@link Charset} to use.
126127
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127-
* @see ByteArrayAsyncRequestBody
128+
* @see ByteBuffersAsyncRequestBody
128129
*/
129130
static AsyncRequestBody fromString(String string, Charset cs) {
130-
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
131-
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
131+
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
132+
string.getBytes(cs));
132133
}
133134

134135
/**
@@ -150,18 +151,69 @@ static AsyncRequestBody fromString(String string) {
150151
* @return AsyncRequestBody instance.
151152
*/
152153
static AsyncRequestBody fromBytes(byte[] bytes) {
153-
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
154+
byte[] clonedBytes = bytes.clone();
155+
return ByteBuffersAsyncRequestBody.from(clonedBytes);
154156
}
155157

156158
/**
157-
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158-
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
159+
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
160+
* original byte array are not reflected in the {@link AsyncRequestBody}.
161+
*
162+
* @param bytes The bytes to send to the service.
163+
* @return AsyncRequestBody instance.
164+
*/
165+
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
166+
return ByteBuffersAsyncRequestBody.from(bytes);
167+
}
168+
169+
/**
170+
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied with the position set to zero, the
171+
* mark if defined is discarded, this is to ensure modifications made to the original {@link ByteBuffer} are not reflected in
172+
* the {@link AsyncRequestBody}.
159173
*
160174
* @param byteBuffer ByteBuffer to send to the service.
161175
* @return AsyncRequestBody instance.
162176
*/
163177
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
164-
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
178+
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
179+
immutableCopy.rewind();
180+
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
181+
}
182+
183+
/**
184+
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}.
185+
*
186+
* @param byteBuffer ByteBuffer to send to the service.
187+
* @return AsyncRequestBody instance.
188+
*/
189+
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
190+
return ByteBuffersAsyncRequestBody.of(null, byteBuffer);
191+
}
192+
193+
/**
194+
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}. Each Buffers contents are copied with their
195+
* positions set to zero and marks if defined are discarded, this is to ensure modifications made to the original array of
196+
* {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
197+
*
198+
* @param byteBuffers ByteBuffer[] to send to the service.
199+
* @return AsyncRequestBody instance.
200+
*/
201+
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
202+
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
203+
.map(BinaryUtils::immutableCopyOf)
204+
.peek(ByteBuffer::rewind)
205+
.toArray(ByteBuffer[]::new);
206+
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
207+
}
208+
209+
/**
210+
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}.
211+
*
212+
* @param byteBuffers ByteBuffer[] to send to the service.
213+
* @return AsyncRequestBody instance.
214+
*/
215+
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
216+
return ByteBuffersAsyncRequestBody.of(null, byteBuffers);
165217
}
166218

167219
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java

Lines changed: 0 additions & 98 deletions
This file was deleted.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.core.async.AsyncRequestBody;
26+
import software.amazon.awssdk.core.internal.util.Mimetype;
27+
import software.amazon.awssdk.utils.BinaryUtils;
28+
import software.amazon.awssdk.utils.Logger;
29+
30+
/**
31+
* An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
32+
* using static methods on {@link AsyncRequestBody}
33+
*
34+
* @see AsyncRequestBody#fromBytes(byte[])
35+
* @see AsyncRequestBody#fromBytesUnsafe(byte[])
36+
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
37+
* @see AsyncRequestBody#fromByteBufferUnsafe(ByteBuffer)
38+
* @see AsyncRequestBody#fromByteBuffers(ByteBuffer...)
39+
* @see AsyncRequestBody#fromByteBuffersUnsafe(ByteBuffer...)
40+
* @see AsyncRequestBody#fromString(String)
41+
*/
42+
@SdkInternalApi
43+
public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
44+
private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);
45+
46+
private final String mimetype;
47+
private final Long length;
48+
private final ByteBuffer[] buffers;
49+
50+
private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
51+
this.mimetype = mimetype;
52+
this.length = length;
53+
this.buffers = buffers;
54+
}
55+
56+
@Override
57+
public Optional<Long> contentLength() {
58+
return Optional.ofNullable(length);
59+
}
60+
61+
@Override
62+
public String contentType() {
63+
return mimetype;
64+
}
65+
66+
@Override
67+
public void subscribe(Subscriber<? super ByteBuffer> s) {
68+
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
69+
if (s == null) {
70+
throw new NullPointerException("Subscription MUST NOT be null.");
71+
}
72+
73+
// As per 2.13, this method must return normally (i.e. not throw).
74+
try {
75+
s.onSubscribe(
76+
new Subscription() {
77+
private final AtomicInteger index = new AtomicInteger(0);
78+
private final AtomicBoolean completed = new AtomicBoolean(false);
79+
80+
@Override
81+
public void request(long n) {
82+
if (completed.get()) {
83+
return;
84+
}
85+
86+
if (n > 0) {
87+
int i = index.getAndIncrement();
88+
89+
if (i >= buffers.length) {
90+
return;
91+
}
92+
93+
long remaining = n;
94+
95+
do {
96+
ByteBuffer buffer = buffers[i];
97+
98+
// Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928
99+
if (buffer.isDirect()) {
100+
buffer = BinaryUtils.toNonDirectBuffer(buffer);
101+
}
102+
103+
s.onNext(buffer);
104+
remaining--;
105+
} while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);
106+
107+
if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
108+
s.onComplete();
109+
}
110+
} else {
111+
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
112+
}
113+
}
114+
115+
@Override
116+
public void cancel() {
117+
completed.set(true);
118+
}
119+
}
120+
);
121+
} catch (Throwable ex) {
122+
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
123+
}
124+
}
125+
126+
public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
127+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
128+
}
129+
130+
public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
131+
return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
132+
}
133+
134+
public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
135+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
136+
ByteBuffer.wrap(bytes));
137+
}
138+
139+
public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
140+
return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
141+
}
142+
}

0 commit comments

Comments
 (0)