Skip to content

Commit e1164ae

Browse files
committed
Skip defensive copies and transforms in AsyncRequestBody
1 parent 335cbb7 commit e1164ae

File tree

5 files changed

+334
-10
lines changed

5 files changed

+334
-10
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "StephenFlavin",
5+
"description": "Skip defensive copies and transforms in AsyncRequestBody"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
3131
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
3232
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
33+
import software.amazon.awssdk.core.internal.async.SimpleAsyncRequestBody;
3334
import software.amazon.awssdk.core.internal.util.Mimetype;
34-
import software.amazon.awssdk.utils.BinaryUtils;
3535

3636
/**
3737
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
@@ -124,11 +124,11 @@ static AsyncRequestBody fromFile(File file) {
124124
* @param string The string to provide.
125125
* @param cs The {@link Charset} to use.
126126
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
127-
* @see ByteArrayAsyncRequestBody
127+
* @see SimpleAsyncRequestBody
128128
*/
129129
static AsyncRequestBody fromString(String string, Charset cs) {
130-
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
131-
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
130+
return SimpleAsyncRequestBody.of(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
131+
string.getBytes(cs));
132132
}
133133

134134
/**
@@ -143,25 +143,33 @@ static AsyncRequestBody fromString(String string) {
143143
}
144144

145145
/**
146-
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
147-
* original byte array are not reflected in the {@link AsyncRequestBody}.
146+
* Creates a {@link AsyncRequestBody} from a byte array.
148147
*
149148
* @param bytes The bytes to send to the service.
150149
* @return AsyncRequestBody instance.
151150
*/
152151
static AsyncRequestBody fromBytes(byte[] bytes) {
153-
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
152+
return SimpleAsyncRequestBody.of(bytes);
154153
}
155154

156155
/**
157-
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
158-
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
156+
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}.
159157
*
160158
* @param byteBuffer ByteBuffer to send to the service.
161159
* @return AsyncRequestBody instance.
162160
*/
163161
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
164-
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
162+
return SimpleAsyncRequestBody.of(null, byteBuffer);
163+
}
164+
165+
/**
166+
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}.
167+
*
168+
* @param byteBuffers ByteBuffer[] to send to the service.
169+
* @return AsyncRequestBody instance.
170+
*/
171+
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
172+
return SimpleAsyncRequestBody.of(null, byteBuffers);
165173
}
166174

167175
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
* @see AsyncRequestBody#fromBytes(byte[])
3131
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
3232
* @see AsyncRequestBody#fromString(String)
33+
*
34+
* @deprecated by {@link SimpleAsyncRequestBody#of(byte[])}.
3335
*/
3436
@SdkInternalApi
37+
@Deprecated
3538
public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
3639
private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);
3740

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.core.async.AsyncRequestBody;
26+
import software.amazon.awssdk.core.internal.util.Mimetype;
27+
import software.amazon.awssdk.utils.BinaryUtils;
28+
import software.amazon.awssdk.utils.Logger;
29+
30+
/**
31+
* An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} arrau. This is created
32+
* using static
33+
* methods on {@link AsyncRequestBody}
34+
*
35+
* @see AsyncRequestBody#fromBytes(byte[])
36+
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
37+
* @see AsyncRequestBody#fromString(String)
38+
*/
39+
@SdkInternalApi
40+
public final class SimpleAsyncRequestBody implements AsyncRequestBody {
41+
private static final Logger log = Logger.loggerFor(SimpleAsyncRequestBody.class);
42+
43+
private final String mimetype;
44+
private final Long length;
45+
private final ByteBuffer[] buffers;
46+
47+
private SimpleAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
48+
this.mimetype = mimetype;
49+
this.length = length;
50+
this.buffers = buffers;
51+
}
52+
53+
@Override
54+
public Optional<Long> contentLength() {
55+
return Optional.ofNullable(length);
56+
}
57+
58+
@Override
59+
public String contentType() {
60+
return mimetype;
61+
}
62+
63+
@Override
64+
public void subscribe(Subscriber<? super ByteBuffer> s) {
65+
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
66+
if (s == null) {
67+
throw new NullPointerException("Subscription MUST NOT be null.");
68+
}
69+
70+
// As per 2.13, this method must return normally (i.e. not throw).
71+
try {
72+
s.onSubscribe(
73+
new Subscription() {
74+
private final AtomicInteger index = new AtomicInteger(0);
75+
private final AtomicBoolean completed = new AtomicBoolean(false);
76+
77+
@Override
78+
public void request(long n) {
79+
if (completed.get()) {
80+
return;
81+
}
82+
83+
if (n > 0) {
84+
int i = index.getAndIncrement();
85+
86+
if (i >= buffers.length) {
87+
return;
88+
}
89+
90+
long remaining = n;
91+
92+
do {
93+
ByteBuffer buffer = buffers[i];
94+
if (!buffer.hasArray()) {
95+
buffer = ByteBuffer.wrap(BinaryUtils.copyBytesFrom(buffer));
96+
}
97+
s.onNext(buffer);
98+
remaining--;
99+
} while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);
100+
101+
if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
102+
s.onComplete();
103+
}
104+
} else {
105+
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
106+
}
107+
}
108+
109+
@Override
110+
public void cancel() {
111+
completed.set(true);
112+
}
113+
}
114+
);
115+
} catch (Throwable ex) {
116+
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
117+
}
118+
}
119+
120+
public static SimpleAsyncRequestBody of(Long length, ByteBuffer... buffers) {
121+
return new SimpleAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
122+
}
123+
124+
public static SimpleAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
125+
return new SimpleAsyncRequestBody(mimetype, length, buffers);
126+
}
127+
128+
public static SimpleAsyncRequestBody of(byte[] bytes) {
129+
return new SimpleAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
130+
ByteBuffer.wrap(bytes));
131+
}
132+
133+
public static SimpleAsyncRequestBody of(String mimetype, byte[] bytes) {
134+
return new SimpleAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
135+
}
136+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
23+
import java.nio.ByteBuffer;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.stream.IntStream;
32+
import org.junit.jupiter.api.Test;
33+
import org.reactivestreams.Subscriber;
34+
import org.reactivestreams.Subscription;
35+
import software.amazon.awssdk.core.async.AsyncRequestBody;
36+
import software.amazon.awssdk.utils.BinaryUtils;
37+
38+
class SimpleAsyncRequestBodyTest {
39+
40+
private static class TestSubscriber implements Subscriber<ByteBuffer> {
41+
private Subscription subscription;
42+
private boolean onCompleteCalled = false;
43+
private int callsToComplete = 0;
44+
private final List<ByteBuffer> publishedResults = Collections.synchronizedList(new ArrayList<>());
45+
46+
public void request(long n) {
47+
subscription.request(n);
48+
}
49+
50+
@Override
51+
public void onSubscribe(Subscription s) {
52+
this.subscription = s;
53+
}
54+
55+
@Override
56+
public void onNext(ByteBuffer byteBuffer) {
57+
publishedResults.add(byteBuffer);
58+
}
59+
60+
@Override
61+
public void onError(Throwable throwable) {
62+
throw new IllegalStateException(throwable);
63+
}
64+
65+
@Override
66+
public void onComplete() {
67+
onCompleteCalled = true;
68+
callsToComplete++;
69+
}
70+
}
71+
72+
@Test
73+
public void subscriberIsMarkedAsCompleted() {
74+
AsyncRequestBody requestBody = SimpleAsyncRequestBody.of("Hello World!".getBytes(StandardCharsets.UTF_8));
75+
76+
TestSubscriber subscriber = new TestSubscriber();
77+
requestBody.subscribe(subscriber);
78+
subscriber.request(1);
79+
80+
assertTrue(subscriber.onCompleteCalled);
81+
assertEquals(1, subscriber.publishedResults.size());
82+
}
83+
84+
@Test
85+
public void subscriberIsMarkedAsCompletedWhenARequestIsMadeForMoreBuffersThanAreAvailable() {
86+
AsyncRequestBody requestBody = SimpleAsyncRequestBody.of("Hello World!".getBytes(StandardCharsets.UTF_8));
87+
88+
TestSubscriber subscriber = new TestSubscriber();
89+
requestBody.subscribe(subscriber);
90+
subscriber.request(2);
91+
92+
assertTrue(subscriber.onCompleteCalled);
93+
assertEquals(1, subscriber.publishedResults.size());
94+
}
95+
96+
@Test
97+
public void subscriberIsThreadSafeAndMarkedAsCompletedExactlyOnce() throws InterruptedException {
98+
int numBuffers = 100;
99+
AsyncRequestBody requestBody = SimpleAsyncRequestBody.of(null, IntStream.range(0, numBuffers)
100+
.mapToObj(i -> ByteBuffer.wrap(new byte[1]))
101+
.toArray(ByteBuffer[]::new));
102+
103+
TestSubscriber subscriber = new TestSubscriber();
104+
requestBody.subscribe(subscriber);
105+
106+
int parallelism = 8;
107+
ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
108+
for (int i = 0; i < parallelism; i++) {
109+
executorService.submit(() -> {
110+
for (int j = 0; j < numBuffers; j++) {
111+
subscriber.request(2);
112+
}
113+
});
114+
}
115+
executorService.shutdown();
116+
executorService.awaitTermination(1, TimeUnit.MINUTES);
117+
118+
assertTrue(subscriber.onCompleteCalled);
119+
assertEquals(1, subscriber.callsToComplete);
120+
assertEquals(numBuffers, subscriber.publishedResults.size());
121+
}
122+
123+
@Test
124+
public void subscriberIsNotMarkedAsCompletedWhenThereAreRemainingBuffersToPublish() {
125+
byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8);
126+
byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8);
127+
AsyncRequestBody requestBody = SimpleAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length),
128+
ByteBuffer.wrap(helloWorld),
129+
ByteBuffer.wrap(goodbyeWorld));
130+
131+
TestSubscriber subscriber = new TestSubscriber();
132+
requestBody.subscribe(subscriber);
133+
subscriber.request(1);
134+
135+
assertFalse(subscriber.onCompleteCalled);
136+
assertEquals(1, subscriber.publishedResults.size());
137+
}
138+
139+
@Test
140+
public void subscriberReceivesAllBuffers() {
141+
byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8);
142+
byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8);
143+
144+
AsyncRequestBody requestBody = SimpleAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length),
145+
ByteBuffer.wrap(helloWorld),
146+
ByteBuffer.wrap(goodbyeWorld));
147+
148+
TestSubscriber subscriber = new TestSubscriber();
149+
requestBody.subscribe(subscriber);
150+
subscriber.request(2);
151+
152+
assertEquals(2, subscriber.publishedResults.size());
153+
assertTrue(subscriber.onCompleteCalled);
154+
assertArrayEquals(helloWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(0)));
155+
assertArrayEquals(goodbyeWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(1)));
156+
}
157+
158+
@Test
159+
public void canceledSubscriberDoesNotReturnNewResults() {
160+
AsyncRequestBody requestBody = SimpleAsyncRequestBody.of(null, ByteBuffer.wrap(new byte[0]));
161+
162+
TestSubscriber subscriber = new TestSubscriber();
163+
requestBody.subscribe(subscriber);
164+
165+
subscriber.subscription.cancel();
166+
subscriber.request(1);
167+
168+
assertTrue(subscriber.publishedResults.isEmpty());
169+
}
170+
171+
}

0 commit comments

Comments
 (0)