Skip to content

Commit f3ac7de

Browse files
committed
Updated Fix to support AsyncRequestFileBody of variable chunk size
1 parent f6a7a43 commit f3ac7de

File tree

10 files changed

+480
-193
lines changed

10 files changed

+480
-193
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import software.amazon.awssdk.core.exception.SdkException;
3333
import software.amazon.awssdk.utils.BinaryUtils;
3434
import software.amazon.awssdk.utils.Validate;
35+
import software.amazon.awssdk.utils.async.ByteBufferingSubscriber;
3536
import software.amazon.awssdk.utils.builder.SdkBuilder;
3637

3738
/**
@@ -41,6 +42,8 @@
4142
@SdkInternalApi
4243
public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody {
4344

45+
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
46+
4447
public static final byte[] FINAL_BYTE = new byte[0];
4548
private final AsyncRequestBody wrapped;
4649
private final SdkChecksum sdkChecksum;
@@ -148,7 +151,10 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
148151
if (sdkChecksum != null) {
149152
sdkChecksum.reset();
150153
}
151-
wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes));
154+
wrapped.subscribe(
155+
new ByteBufferingSubscriber(
156+
new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes), DEFAULT_CHUNK_SIZE)
157+
);
152158
}
153159

154160
private static final class ChecksumCalculatingSubscriber implements Subscriber<ByteBuffer> {
@@ -189,7 +195,7 @@ public void onNext(ByteBuffer byteBuffer) {
189195
ByteBuffer allocatedBuffer = getFinalChecksumAppendedChunk(byteBuffer);
190196
wrapped.onNext(allocatedBuffer);
191197
} else {
192-
ByteBuffer allocatedBuffer = appendChunkSizeAndFinalByte(byteBuffer);
198+
ByteBuffer allocatedBuffer = createChunk(byteBuffer, false);
193199
wrapped.onNext(allocatedBuffer);
194200
}
195201
} catch (SdkException sdkException) {
@@ -216,14 +222,6 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) {
216222
return checksumAppendedBuffer;
217223
}
218224

219-
private ByteBuffer appendChunkSizeAndFinalByte(ByteBuffer byteBuffer) {
220-
ByteBuffer contentChunk = createChunk(byteBuffer, false);
221-
ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate(contentChunk.remaining());
222-
checksumAppendedBuffer.put(contentChunk);
223-
checksumAppendedBuffer.flip();
224-
return checksumAppendedBuffer;
225-
}
226-
227225
@Override
228226
public void onError(Throwable t) {
229227
wrapped.onError(t);

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,12 @@
4747
*/
4848
@SdkInternalApi
4949
public final class FileAsyncRequestBody implements AsyncRequestBody {
50+
private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class);
5051

5152
/**
5253
* Default size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
5354
*/
54-
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
55-
56-
private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class);
55+
private static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
5756

5857
/**
5958
* File to read.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.core.internal.interceptor;
1717

18+
import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength;
19+
1820
import java.util.Optional;
1921
import software.amazon.awssdk.annotations.SdkInternalApi;
2022
import software.amazon.awssdk.core.ClientType;
@@ -25,8 +27,6 @@
2527
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
2628
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
2729
import software.amazon.awssdk.core.internal.async.ChecksumCalculatingAsyncRequestBody;
28-
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
29-
import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream;
3030
import software.amazon.awssdk.core.internal.util.ChunkContentUtils;
3131
import software.amazon.awssdk.core.internal.util.HttpChecksumUtils;
3232
import software.amazon.awssdk.http.Header;
@@ -99,10 +99,8 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
9999
private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttpRequest context, ChecksumSpecs checksum,
100100
long checksumContentLength, long originalContentLength) {
101101

102-
long chunkLength = isFileAsyncRequestBody(context)
103-
? AwsUnsignedChunkedEncodingInputStream
104-
.calculateStreamContentLength(originalContentLength, FileAsyncRequestBody.DEFAULT_CHUNK_SIZE)
105-
: ChunkContentUtils.calculateChunkLength(originalContentLength);
102+
long chunkLength =
103+
calculateStreamContentLength(originalContentLength, ChecksumCalculatingAsyncRequestBody.DEFAULT_CHUNK_SIZE);
106104

107105
return context.httpRequest().copy(r ->
108106
r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName())
@@ -112,9 +110,4 @@ private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttp
112110
.putHeader(Header.CONTENT_LENGTH,
113111
Long.toString(chunkLength + checksumContentLength)));
114112
}
115-
116-
private static boolean isFileAsyncRequestBody(Context.ModifyHttpRequest context) {
117-
return context.asyncRequestBody().isPresent() && context.asyncRequestBody().get() instanceof FileAsyncRequestBody;
118-
}
119-
120113
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ public static Builder builder() {
5050
public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) {
5151
int checksumLength = algorithm.base64EncodedLength();
5252

53-
return (headerName.length()
54-
+ HEADER_COLON_SEPARATOR.length()
55-
+ checksumLength
56-
+ CRLF.length());
53+
return headerName.length()
54+
+ HEADER_COLON_SEPARATOR.length()
55+
+ checksumLength
56+
+ CRLF.length() + CRLF.length();
5757
}
5858

5959
/**

core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void lengthsOfCalculateByChecksumCalculatingInputStream(){
6060
long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength(
6161
SHA256_ALGORITHM, SHA256_HEADER_NAME);
6262
assertThat(calculateChunkLength).isEqualTo(19);
63-
assertThat(checksumContentLength).isEqualTo(69);
63+
assertThat(checksumContentLength).isEqualTo(71);
6464
}
6565

6666
@Test
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.checksum;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.KB;
20+
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb;
21+
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters;
22+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
23+
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.net.URI;
27+
import java.nio.file.Files;
28+
import java.util.concurrent.CompletableFuture;
29+
import org.junit.jupiter.api.BeforeAll;
30+
import org.junit.jupiter.api.Disabled;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.ValueSource;
34+
import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute;
35+
import software.amazon.awssdk.authcrt.signer.internal.DefaultAwsCrtS3V4aSigner;
36+
import software.amazon.awssdk.core.async.AsyncRequestBody;
37+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
38+
import software.amazon.awssdk.core.checksums.Algorithm;
39+
import software.amazon.awssdk.core.checksums.ChecksumValidation;
40+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
41+
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
42+
import software.amazon.awssdk.services.s3.S3AsyncClient;
43+
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
44+
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
45+
import software.amazon.awssdk.services.s3.model.ChecksumMode;
46+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
47+
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
48+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
49+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
50+
import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor;
51+
import software.amazon.awssdk.testutils.Waiter;
52+
53+
public class AsyncHttpChecksumIntegrationTest extends S3IntegrationTestBase {
54+
55+
protected static final String KEY = "some-key";
56+
private static final String BUCKET = temporaryBucketName(AsyncHttpChecksumIntegrationTest.class);
57+
public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor();
58+
protected static S3AsyncClient s3HttpAsync;
59+
60+
61+
@BeforeAll
62+
public static void setUp() throws Exception {
63+
64+
s3 = s3ClientBuilder().build();
65+
66+
s3Async = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)).build();
67+
68+
// Http Client to generate Signed request
69+
s3HttpAsync = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor))
70+
.endpointOverride(URI.create("http://s3." + DEFAULT_REGION + ".amazonaws.com")).build();
71+
72+
73+
createBucket(BUCKET);
74+
75+
Waiter.run(() -> s3.headBucket(r -> r.bucket(BUCKET)))
76+
.ignoringException(NoSuchBucketException.class)
77+
.orFail();
78+
interceptor.reset();
79+
}
80+
81+
82+
@Test
83+
void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() {
84+
s3Async.putObject(PutObjectRequest.builder()
85+
.bucket(BUCKET)
86+
.key(KEY)
87+
.overrideConfiguration(o -> o.signer(DefaultAwsCrtS3V4aSigner.create()))
88+
89+
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
90+
.build(), AsyncRequestBody.fromString("Hello world")).join();
91+
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
92+
assertThat(interceptor.requestChecksumInHeader()).isNull();
93+
String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET)
94+
.key(KEY).checksumMode(ChecksumMode.ENABLED)
95+
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
96+
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
97+
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
98+
assertThat(response).isEqualTo("Hello world");
99+
}
100+
101+
@Test
102+
void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallRequestBody() throws InterruptedException {
103+
s3Async.putObject(PutObjectRequest.builder()
104+
.bucket(BUCKET)
105+
.key(KEY)
106+
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
107+
.build(), AsyncRequestBody.fromString("Hello world")).join();
108+
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
109+
assertThat(interceptor.requestChecksumInHeader()).isNull();
110+
111+
String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET)
112+
.key(KEY).checksumMode(ChecksumMode.ENABLED)
113+
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
114+
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
115+
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
116+
assertThat(response).isEqualTo("Hello world");
117+
}
118+
119+
120+
@ParameterizedTest
121+
@ValueSource(ints = {1, 12, 16, 17, 32, 33})
122+
void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody(int dataSize) throws InterruptedException {
123+
s3Async.putObject(PutObjectRequest.builder()
124+
.bucket(BUCKET)
125+
.key(KEY)
126+
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
127+
.build(), AsyncRequestBody.fromString(createDataOfSizeInKb(dataSize, 'a'))).join();
128+
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
129+
assertThat(interceptor.requestChecksumInHeader()).isNull();
130+
131+
String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET)
132+
.key(KEY).checksumMode(ChecksumMode.ENABLED)
133+
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
134+
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
135+
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
136+
assertThat(response).isEqualTo(createDataOfSizeInKb(dataSize, 'a'));
137+
}
138+
139+
@ParameterizedTest
140+
@ValueSource(ints = {1*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB})
141+
void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody
142+
(int chunkSize) throws IOException {
143+
File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(64);
144+
s3Async.putObject(PutObjectRequest.builder()
145+
.bucket(BUCKET)
146+
.key(KEY)
147+
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
148+
.build(), FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath())
149+
.chunkSizeInBytes(chunkSize)
150+
.build()).join();
151+
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
152+
assertThat(interceptor.requestChecksumInHeader()).isNull();
153+
154+
String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET)
155+
.key(KEY).checksumMode(ChecksumMode.ENABLED)
156+
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
157+
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
158+
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
159+
160+
byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath());
161+
assertThat(response).isEqualTo(new String(bytes));
162+
}
163+
164+
/**
165+
* Test two async call made back to back with different sizes parameterized to test for different chunk sizes
166+
*/
167+
@ParameterizedTest
168+
@ValueSource(ints = {1 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB})
169+
void asyncHttpsValidUnsignedTrailer_TwoRequests_withDifferentChunkSize_OfFileAsyncFileRequestBody(int chunkSize)
170+
throws IOException {
171+
172+
File randomFileOfFixedLengthOne = fixedLengthFileWithRandomCharacters(64);
173+
File randomFileOfFixedLengthTwo = fixedLengthFileWithRandomCharacters(17);
174+
CompletableFuture<PutObjectResponse> putObjectFutureOne =
175+
s3Async.putObject(PutObjectRequest.builder()
176+
.bucket(BUCKET)
177+
.key(KEY)
178+
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
179+
.build(),
180+
FileAsyncRequestBody.builder().path(randomFileOfFixedLengthOne.toPath()).chunkSizeInBytes(chunkSize).build());
181+
182+
String keyTwo = KEY + "_two";
183+
CompletableFuture<PutObjectResponse> putObjectFutureTwo =
184+
s3Async.putObject(PutObjectRequest.builder()
185+
.bucket(BUCKET)
186+
.key(keyTwo)
187+
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
188+
.build(),
189+
FileAsyncRequestBody.builder().path(randomFileOfFixedLengthTwo.toPath()).chunkSizeInBytes(chunkSize).build());
190+
191+
putObjectFutureOne.join();
192+
putObjectFutureTwo.join();
193+
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
194+
assertThat(interceptor.requestChecksumInHeader()).isNull();
195+
196+
String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET)
197+
.key(KEY).checksumMode(ChecksumMode.ENABLED)
198+
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
199+
200+
String responseTwo = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET)
201+
.key(keyTwo).checksumMode(ChecksumMode.ENABLED)
202+
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
203+
204+
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
205+
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
206+
207+
assertThat(response).isEqualTo(new String(Files.readAllBytes(randomFileOfFixedLengthOne.toPath())));
208+
assertThat(responseTwo).isEqualTo(new String(Files.readAllBytes(randomFileOfFixedLengthTwo.toPath())));
209+
210+
}
211+
212+
213+
@Disabled("Http Async Signing is not supported for S3")
214+
void asyncValidSignedTrailerChecksumCalculatedBySdkClient() {
215+
ExecutionAttributes executionAttributes = ExecutionAttributes.builder()
216+
.put(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING,
217+
true).build();
218+
s3HttpAsync.putObject(PutObjectRequest.builder()
219+
.bucket(BUCKET)
220+
.overrideConfiguration(o -> o.executionAttributes(executionAttributes))
221+
.key(KEY)
222+
.build(), AsyncRequestBody.fromString("Hello world")).join();
223+
String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET)
224+
.key(KEY)
225+
.build(), AsyncResponseTransformer.toBytes()).join()
226+
.asUtf8String();
227+
assertThat(response).isEqualTo("Hello world");
228+
}
229+
230+
231+
232+
}

0 commit comments

Comments
 (0)