Skip to content

Commit cfb4737

Browse files
committed
Skip defensive copies and transforms in AsyncRequestBody
1 parent 6530cd7 commit cfb4737

File tree

6 files changed

+334
-34
lines changed

6 files changed

+334
-34
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "StephenFlavin",
5+
"description": "Skip defensive copies and transforms in AsyncRequestBody"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
3131
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3232
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
33+
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
3334
import software.amazon.awssdk.core.internal.util.Mimetype;
34-
import software.amazon.awssdk.utils.BinaryUtils;
3535

3636
/**
3737
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
@@ -124,11 +124,11 @@ static AsyncRequestBody fromFile(File file) {
124124
* @param string The string to provide.
125125
* @param cs The {@link Charset} to use.
126126
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127-
* @see ByteArrayAsyncRequestBody
127+
* @see ByteBuffersAsyncRequestBody
128128
*/
129129
static AsyncRequestBody fromString(String string, Charset cs) {
130-
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
131-
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
130+
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
131+
string.getBytes(cs));
132132
}
133133

134134
/**
@@ -143,25 +143,33 @@ static AsyncRequestBody fromString(String string) {
143143
}
144144

145145
/**
146-
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147-
* original byte array are not reflected in the {@link AsyncRequestBody}.
146+
* Creates a {@link AsyncRequestBody} from a byte array.
148147
*
149148
* @param bytes The bytes to send to the service.
150149
* @return AsyncRequestBody instance.
151150
*/
152151
static AsyncRequestBody fromBytes(byte[] bytes) {
153-
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
152+
return ByteBuffersAsyncRequestBody.from(bytes);
154153
}
155154

156155
/**
157-
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158-
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
156+
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}.
159157
*
160158
* @param byteBuffer ByteBuffer to send to the service.
161159
* @return AsyncRequestBody instance.
162160
*/
163161
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
164-
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
162+
return ByteBuffersAsyncRequestBody.of(null, byteBuffer);
163+
}
164+
165+
/**
166+
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}.
167+
*
168+
* @param byteBuffers ByteBuffer[] to send to the service.
169+
* @return AsyncRequestBody instance.
170+
*/
171+
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
172+
return ByteBuffersAsyncRequestBody.of(null, byteBuffers);
165173
}
166174

167175
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
* @see AsyncRequestBody#fromBytes(byte[])
3131
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
3232
* @see AsyncRequestBody#fromString(String)
33+
*
34+
* @deprecated by {@link ByteBuffersAsyncRequestBody#from}.
3335
*/
3436
@SdkInternalApi
37+
@Deprecated
3538
public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
3639
private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);
3740

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.core.async.AsyncRequestBody;
26+
import software.amazon.awssdk.core.internal.util.Mimetype;
27+
import software.amazon.awssdk.utils.BinaryUtils;
28+
import software.amazon.awssdk.utils.Logger;
29+
30+
/**
31+
* An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
32+
* using static methods on {@link AsyncRequestBody}
33+
*
34+
* @see AsyncRequestBody#fromBytes(byte[])
35+
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
36+
* @see AsyncRequestBody#fromByteBuffers(ByteBuffer...)
37+
* @see AsyncRequestBody#fromString(String)
38+
*/
39+
@SdkInternalApi
40+
public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
41+
private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);
42+
43+
private final String mimetype;
44+
private final Long length;
45+
private final ByteBuffer[] buffers;
46+
47+
private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
48+
this.mimetype = mimetype;
49+
this.length = length;
50+
this.buffers = buffers;
51+
}
52+
53+
@Override
54+
public Optional<Long> contentLength() {
55+
return Optional.ofNullable(length);
56+
}
57+
58+
@Override
59+
public String contentType() {
60+
return mimetype;
61+
}
62+
63+
@Override
64+
public void subscribe(Subscriber<? super ByteBuffer> s) {
65+
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
66+
if (s == null) {
67+
throw new NullPointerException("Subscription MUST NOT be null.");
68+
}
69+
70+
// As per 2.13, this method must return normally (i.e. not throw).
71+
try {
72+
s.onSubscribe(
73+
new Subscription() {
74+
private final AtomicInteger index = new AtomicInteger(0);
75+
private final AtomicBoolean completed = new AtomicBoolean(false);
76+
77+
@Override
78+
public void request(long n) {
79+
if (completed.get()) {
80+
return;
81+
}
82+
83+
if (n > 0) {
84+
int i = index.getAndIncrement();
85+
86+
if (i >= buffers.length) {
87+
return;
88+
}
89+
90+
long remaining = n;
91+
92+
do {
93+
ByteBuffer buffer = buffers[i];
94+
if (!buffer.hasArray()) {
95+
buffer = ByteBuffer.wrap(BinaryUtils.copyBytesFrom(buffer));
96+
}
97+
s.onNext(buffer);
98+
remaining--;
99+
} while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);
100+
101+
if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
102+
s.onComplete();
103+
}
104+
} else {
105+
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
106+
}
107+
}
108+
109+
@Override
110+
public void cancel() {
111+
completed.set(true);
112+
}
113+
}
114+
);
115+
} catch (Throwable ex) {
116+
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
117+
}
118+
}
119+
120+
public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
121+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
122+
}
123+
124+
public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
125+
return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
126+
}
127+
128+
public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
129+
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
130+
ByteBuffer.wrap(bytes));
131+
}
132+
133+
public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
134+
return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
135+
}
136+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,28 @@
1515

1616
package software.amazon.awssdk.core.async;
1717

18-
import static java.nio.charset.StandardCharsets.UTF_8;
1918
import static org.assertj.core.api.Assertions.assertThat;
2019

2120
import com.google.common.jimfs.Configuration;
2221
import com.google.common.jimfs.Jimfs;
2322
import io.reactivex.Flowable;
24-
import java.io.File;
25-
import java.io.FileWriter;
2623
import java.io.IOException;
27-
import java.io.InputStream;
2824
import java.nio.ByteBuffer;
2925
import java.nio.charset.StandardCharsets;
3026
import java.nio.file.FileSystem;
3127
import java.nio.file.Files;
3228
import java.nio.file.Path;
33-
import java.time.Instant;
34-
import java.util.Collections;
3529
import java.util.List;
36-
import java.util.concurrent.Callable;
3730
import java.util.concurrent.CountDownLatch;
3831
import java.util.stream.Collectors;
3932
import org.assertj.core.util.Lists;
40-
import org.junit.Rule;
4133
import org.junit.Test;
42-
import org.junit.rules.TemporaryFolder;
4334
import org.junit.runner.RunWith;
4435
import org.junit.runners.Parameterized;
4536
import org.reactivestreams.Publisher;
4637
import org.reactivestreams.Subscriber;
4738
import software.amazon.awssdk.core.internal.util.Mimetype;
4839
import software.amazon.awssdk.http.async.SimpleSubscriber;
49-
import software.amazon.awssdk.utils.BinaryUtils;
50-
import software.amazon.awssdk.utils.StringInputStream;
5140

5241
@RunWith(Parameterized.class)
5342
public class AsyncRequestBodyTest {
@@ -164,17 +153,4 @@ public void publisherConstructorHasCorrectContentType() {
164153
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
165154
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
166155
}
167-
168-
@Test
169-
public void fromBytes_byteArrayNotNull_createsCopy() {
170-
byte[] original = {0x1, 0x2, 0x3, 0x4};
171-
byte[] toModify = new byte[original.length];
172-
System.arraycopy(original, 0, toModify, 0, original.length);
173-
AsyncRequestBody body = AsyncRequestBody.fromBytes(toModify);
174-
for (int i = 0; i < toModify.length; ++i) {
175-
toModify[i]++;
176-
}
177-
ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0);
178-
assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original);
179-
}
180156
}

0 commit comments

Comments
 (0)