Skip to content

Commit bd1a49c

Browse files
authored
Merge pull request aws#3747 from aws/zoewang/OutputStreamFix
Copy byte array in OutputStreamPublisher
2 parents 2ac868e + a1be7c7 commit bd1a49c

File tree

6 files changed

+242
-10
lines changed

6 files changed

+242
-10
lines changed

.changes/2.31.2.json

+6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@
3737
"category": "AWS SDK for Java v2",
3838
"contributor": "",
3939
"description": "Updated endpoint and partition metadata."
40+
},
41+
{
42+
"type": "bugfix",
43+
"category": "AWS SDK for Java v2",
44+
"contributor": "",
45+
"description": "Copy bytes written to OutputStream of BlockingOutputStreamAsyncRequestBody"
4046
}
4147
]
4248
}

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## __AWS SDK for Java v2__
44
- ### Features
55
- Updated endpoint and partition metadata.
6+
- ### Bugfixes
7+
- Copy bytes written to OutputStream of BlockingOutputStreamAsyncRequestBody.
68

79
## __AWS WAFV2__
810
- ### Features

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingOutputStreamAsyncRequestBodyTest.java

+56
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@
2424
import java.io.OutputStream;
2525
import java.nio.ByteBuffer;
2626
import java.time.Duration;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Random;
2730
import java.util.concurrent.Executors;
2831
import java.util.concurrent.ScheduledExecutorService;
32+
import org.apache.commons.lang.math.RandomUtils;
2933
import org.junit.jupiter.api.Test;
3034
import org.junit.jupiter.api.Timeout;
3135
import software.amazon.awssdk.utils.CancellableOutputStream;
3236
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
3337
import software.amazon.awssdk.utils.async.StoringSubscriber;
3438

3539
class BlockingOutputStreamAsyncRequestBodyTest {
40+
private Random random = new Random(3470);
3641
@Test
3742
public void outputStream_waitsForSubscription() throws IOException {
3843
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
@@ -77,4 +82,55 @@ public void outputStream_writesToSubscriber() throws IOException {
7782
assertThat(out.get()).isEqualTo((byte) 1);
7883
}
7984

85+
@Test
86+
public void outputStream_writesArrayWithOffsetUseSameArray_shouldNotOverride() throws IOException {
87+
BlockingOutputStreamAsyncRequestBody requestBody =
88+
AsyncRequestBody.forBlockingOutputStream(320L);
89+
int totalLength = 320;
90+
ByteBuffer expected = ByteBuffer.allocate(totalLength);
91+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(totalLength);
92+
requestBody.subscribe(subscriber);
93+
94+
int bytesToWrite = 32;
95+
CancellableOutputStream outputStream = requestBody.outputStream();
96+
byte[] bytes = new byte[512];
97+
for (int i = 0; i < 10; i++) {
98+
random.nextBytes(bytes);
99+
expected.put(bytes, 10, bytesToWrite);
100+
outputStream.write(bytes, 10, bytesToWrite);
101+
}
102+
103+
outputStream.close();
104+
ByteBuffer out = ByteBuffer.allocate(totalLength);
105+
assertThat(subscriber.transferTo(out)).isEqualTo(END_OF_STREAM);
106+
out.flip();
107+
108+
assertThat(out.array()).containsExactly(expected.array());
109+
}
110+
111+
@Test
112+
public void outputStream_writesArrayUseSameArray_shouldNotOverride() throws IOException {
113+
BlockingOutputStreamAsyncRequestBody requestBody =
114+
AsyncRequestBody.forBlockingOutputStream(320L);
115+
int totalLength = 320;
116+
ByteBuffer expected = ByteBuffer.allocate(totalLength);
117+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(totalLength);
118+
requestBody.subscribe(subscriber);
119+
120+
CancellableOutputStream outputStream = requestBody.outputStream();
121+
byte[] bytes = new byte[32];
122+
for (int i = 0; i < 10; i++) {
123+
random.nextBytes(bytes);
124+
expected.put(bytes);
125+
outputStream.write(bytes);
126+
}
127+
128+
outputStream.close();
129+
ByteBuffer out = ByteBuffer.allocate(totalLength);
130+
assertThat(subscriber.transferTo(out)).isEqualTo(END_OF_STREAM);
131+
out.flip();
132+
133+
assertThat(out.array()).containsExactly(expected.array());
134+
}
135+
80136
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/PutObjectIntegrationTest.java

+115-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package software.amazon.awssdk.services.s3;
1818

19+
import static java.util.Base64.getEncoder;
1920
import static org.assertj.core.api.Assertions.assertThat;
2021
import static software.amazon.awssdk.services.s3.internal.checksums.ChecksumsEnabledValidator.CHECKSUM;
2122
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
@@ -25,45 +26,69 @@
2526
import java.io.FilterInputStream;
2627
import java.io.IOException;
2728
import java.io.InputStream;
29+
import java.io.OutputStream;
2830
import java.net.URI;
2931
import java.nio.charset.StandardCharsets;
32+
import java.security.DigestInputStream;
33+
import java.security.MessageDigest;
34+
import java.security.NoSuchAlgorithmException;
3035
import java.util.ArrayList;
3136
import java.util.List;
37+
import java.util.Random;
3238
import java.util.concurrent.CompletableFuture;
33-
import org.junit.AfterClass;
34-
import org.junit.BeforeClass;
35-
import org.junit.Test;
39+
import java.util.stream.Stream;
40+
import org.junit.jupiter.api.AfterAll;
41+
import org.junit.jupiter.api.BeforeAll;
42+
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.params.ParameterizedTest;
44+
import org.junit.jupiter.params.provider.Arguments;
45+
import org.junit.jupiter.params.provider.MethodSource;
46+
import software.amazon.awssdk.core.ResponseInputStream;
47+
import software.amazon.awssdk.core.async.AsyncRequestBody;
48+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3649
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
50+
import software.amazon.awssdk.core.async.BlockingOutputStreamAsyncRequestBody;
51+
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
3752
import software.amazon.awssdk.core.interceptor.Context;
3853
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
3954
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
4055
import software.amazon.awssdk.core.sync.RequestBody;
4156
import software.amazon.awssdk.http.ContentStreamProvider;
57+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
4258
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
4359
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4460
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
61+
import software.amazon.awssdk.utils.IoUtils;
4562

4663
/**
4764
* Integration tests for {@code PutObject}.
4865
*/
4966
public class PutObjectIntegrationTest extends S3IntegrationTestBase {
5067
private static final String BUCKET = temporaryBucketName(PutObjectIntegrationTest.class);
51-
private static final String ASYNC_KEY = "async-key";
52-
private static final String SYNC_KEY = "sync-key";
53-
private static final String TEXT_CONTENT_TYPE = "text/plain";
68+
private static final String ASYNC_KEY = "async-key" ;
69+
private static final String SYNC_KEY = "sync-key" ;
70+
private static final String TEXT_CONTENT_TYPE = "text/plain" ;
5471
private static final byte[] CONTENT = "Hello".getBytes(StandardCharsets.UTF_8);
72+
private static final Random RANDOM = new Random(3470);
5573

56-
@BeforeClass
74+
@BeforeAll
5775
public static void setUp() throws Exception {
5876
S3IntegrationTestBase.setUp();
5977
createBucket(BUCKET);
6078
}
6179

62-
@AfterClass
80+
@AfterAll
6381
public static void tearDown() {
6482
deleteBucketAndAllContents(BUCKET);
6583
}
6684

85+
public static Stream<Arguments> s3Clients() {
86+
return Stream.of(
87+
Arguments.of(s3AsyncClientBuilder().requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED).build()),
88+
Arguments.of(s3AsyncClientBuilder().build()),
89+
Arguments.of(crtClientBuilder().build()));
90+
}
91+
6792
@Test
6893
public void objectInputStreamsAreClosed() {
6994
TestContentProvider provider = new TestContentProvider(CONTENT);
@@ -97,6 +122,88 @@ public void blockingInputStreamAsyncRequestBody_withContentType_isHonored() {
97122
assertThat(response.contentType()).isEqualTo(TEXT_CONTENT_TYPE);
98123
}
99124

125+
@ParameterizedTest
126+
@MethodSource("s3Clients")
127+
public void blockingOutputStreamAsyncRequestBody_writeArrayWithOffset_shouldSucceed(S3AsyncClient client) throws Exception {
128+
BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(320L);
129+
130+
PutObjectRequest.Builder request = PutObjectRequest.builder()
131+
.bucket(BUCKET)
132+
.key(ASYNC_KEY);
133+
CompletableFuture<PutObjectResponse> responseFuture = client.putObject(request.build(), body);
134+
MessageDigest expectedDigest = writeArrayWithOffset(body, 32);
135+
responseFuture.join();
136+
137+
ResponseInputStream<GetObjectResponse> responseInputStream =
138+
client.getObject(b -> b.bucket(BUCKET).key(ASYNC_KEY),
139+
AsyncResponseTransformer.toBlockingInputStream()).join();
140+
141+
DigestInputStream digestInputStream = new DigestInputStream(
142+
responseInputStream, MessageDigest.getInstance("SHA-256"));
143+
IoUtils.drainInputStream(digestInputStream);
144+
MessageDigest actual = digestInputStream.getMessageDigest();
145+
146+
assertThat(getEncoder().encodeToString(actual.digest()))
147+
.isEqualTo(getEncoder().encodeToString(expectedDigest.digest()));
148+
}
149+
150+
@ParameterizedTest
151+
@MethodSource("s3Clients")
152+
public void blockingOutputStreamAsyncRequestBody_writeArray_shouldSucceed(S3AsyncClient client) throws Exception {
153+
BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(320L);
154+
155+
PutObjectRequest.Builder request = PutObjectRequest.builder()
156+
.bucket(BUCKET)
157+
.key(ASYNC_KEY);
158+
CompletableFuture<PutObjectResponse> responseFuture = client.putObject(request.build(), body);
159+
MessageDigest expectedDigest = writeArray(body);
160+
responseFuture.join();
161+
162+
ResponseInputStream<GetObjectResponse> responseInputStream =
163+
client.getObject(b -> b.bucket(BUCKET).key(ASYNC_KEY),
164+
AsyncResponseTransformer.toBlockingInputStream()).join();
165+
166+
DigestInputStream digestInputStream = new DigestInputStream(
167+
responseInputStream, MessageDigest.getInstance("SHA-256"));
168+
IoUtils.drainInputStream(digestInputStream);
169+
MessageDigest actual = digestInputStream.getMessageDigest();
170+
171+
assertThat(getEncoder().encodeToString(actual.digest()))
172+
.isEqualTo(getEncoder().encodeToString(expectedDigest.digest()));
173+
}
174+
175+
private static MessageDigest writeArray(BlockingOutputStreamAsyncRequestBody body)
176+
throws NoSuchAlgorithmException {
177+
MessageDigest digest = MessageDigest.getInstance("SHA-256");
178+
try (OutputStream outputStream = body.outputStream()) {
179+
byte[] buffer = new byte[32];
180+
for (int i = 0; i < 10; i++) {
181+
RANDOM.nextBytes(buffer);
182+
digest.update(buffer);
183+
outputStream.write(buffer);
184+
}
185+
} catch (Exception e) {
186+
throw new RuntimeException(e);
187+
}
188+
return digest;
189+
}
190+
191+
private static MessageDigest writeArrayWithOffset(BlockingOutputStreamAsyncRequestBody body, int chunkSize)
192+
throws NoSuchAlgorithmException {
193+
MessageDigest digest = MessageDigest.getInstance("SHA-256");
194+
try (OutputStream outputStream = body.outputStream()) {
195+
byte[] buffer = new byte[1024];
196+
for (int i = 0; i < 10; i++) {
197+
RANDOM.nextBytes(buffer);
198+
digest.update(buffer, 10, chunkSize);
199+
outputStream.write(buffer, 10, chunkSize);
200+
}
201+
} catch (Exception e) {
202+
throw new RuntimeException(e);
203+
}
204+
return digest;
205+
}
206+
100207
@Test
101208
public void s3Client_usingHttpAndDisableChunkedEncoding() {
102209
try (S3Client s3Client = s3ClientBuilder()

utils/src/main/java/software/amazon/awssdk/utils/async/OutputStreamPublisher.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,19 @@ public void write(int b) {
6262
@Override
6363
public void write(byte[] b) {
6464
flush();
65-
send(ByteBuffer.wrap(b));
65+
ByteBuffer buffer = ByteBuffer.allocate(b.length);
66+
buffer.put(b);
67+
buffer.flip();
68+
send(buffer.asReadOnlyBuffer());
6669
}
6770

6871
@Override
6972
public void write(byte[] b, int off, int len) {
7073
flush();
71-
send(ByteBuffer.wrap(b, off, len));
74+
ByteBuffer buffer = ByteBuffer.allocate(len);
75+
buffer.put(b, off, len);
76+
buffer.flip();
77+
send(buffer.asReadOnlyBuffer());
7278
}
7379

7480
@Override

utils/src/test/java/software/amazon/awssdk/utils/async/OutputStreamPublisherTest.java

+55
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020

2121
import java.nio.ByteBuffer;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Optional;
25+
import java.util.Random;
2226
import java.util.concurrent.ExecutionException;
2327
import java.util.concurrent.ExecutorService;
2428
import java.util.concurrent.Executors;
@@ -27,13 +31,15 @@
2731
import org.junit.jupiter.api.BeforeEach;
2832
import org.junit.jupiter.api.Test;
2933
import org.junit.jupiter.api.Timeout;
34+
import software.amazon.awssdk.utils.BinaryUtils;
3035
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
3136
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
3237

3338
public class OutputStreamPublisherTest {
3439
private StoringSubscriber<ByteBuffer> storingSubscriber;
3540
private ByteBufferStoringSubscriber byteStoringSubscriber;
3641
private OutputStreamPublisher publisher;
42+
private Random random = new Random(3470);
3743

3844
@BeforeEach
3945
public void setup() {
@@ -60,6 +66,55 @@ public void oneByteWritesAreFlushedEventually() {
6066
});
6167
}
6268

69+
@Test
70+
public void writeArrayReuseSameArray_shouldNotOverride() {
71+
publisher.subscribe(storingSubscriber);
72+
List<ByteBuffer> expectedBuffers = new ArrayList<>();
73+
74+
byte[] bytes = new byte[512];
75+
for (int i = 0; i < 10; i++) {
76+
random.nextBytes(bytes);
77+
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
78+
byteBuffer.put(bytes);
79+
byteBuffer.flip();
80+
81+
expectedBuffers.add(byteBuffer);
82+
publisher.write(bytes);
83+
}
84+
85+
List<ByteBuffer> actualBuffers = new ArrayList<>();
86+
while(storingSubscriber.peek().isPresent()) {
87+
StoringSubscriber.Event<ByteBuffer> event = storingSubscriber.poll().get();
88+
actualBuffers.add(event.value());
89+
}
90+
assertThat(actualBuffers).hasSameElementsAs(expectedBuffers);
91+
}
92+
93+
@Test
94+
public void writeArrayWithOffsetReuseSameArray_shouldNotOverride() {
95+
publisher.subscribe(storingSubscriber);
96+
int bytesToWrite = 32;
97+
List<ByteBuffer> expectedBuffers = new ArrayList<>();
98+
99+
byte[] bytes = new byte[512];
100+
for (int i = 0; i < 10; i++) {
101+
random.nextBytes(bytes);
102+
ByteBuffer byteBuffer = ByteBuffer.allocate(bytesToWrite);
103+
byteBuffer.put(bytes, 10, bytesToWrite);
104+
byteBuffer.flip();
105+
106+
expectedBuffers.add(byteBuffer);
107+
publisher.write(bytes, 10, bytesToWrite);
108+
}
109+
110+
List<ByteBuffer> actualBuffers = new ArrayList<>();
111+
while(storingSubscriber.peek().isPresent()) {
112+
StoringSubscriber.Event<ByteBuffer> event = storingSubscriber.poll().get();
113+
actualBuffers.add(event.value());
114+
}
115+
assertThat(actualBuffers).hasSameElementsAs(expectedBuffers);
116+
}
117+
63118
@Test
64119
public void flushDrainsBufferedBytes() {
65120
publisher.subscribe(storingSubscriber);

0 commit comments

Comments
 (0)