From f6a7a43340c9b16ee9d9217f0d4ee177d1fce068 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 25 Aug 2022 01:34:30 -0700 Subject: [PATCH 1/6] Fix Trailer based Http Checksum for Async Request body created from File --- .../bugfix-AWSSDKforJavav2-5ecdce1.json | 6 + .../ChecksumCalculatingAsyncRequestBody.java | 11 +- .../internal/async/FileAsyncRequestBody.java | 5 +- ...estBodyHttpChecksumTrailerInterceptor.java | 13 +- .../SyncHttpChecksumInTrailerInterceptor.java | 6 +- .../io/AwsChunkedEncodingInputStream.java | 2 +- ...AwsUnsignedChunkedEncodingInputStream.java | 18 ++- .../AwsChunkedEncodingInputStreamTest.java | 5 +- .../checksum/HttpChecksumIntegrationTest.java | 138 +++++++++++++++++- 9 files changed, 186 insertions(+), 18 deletions(-) create mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json b/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json new file mode 100644 index 000000000000..10e6373170a5 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async File Request body." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 7bc1ca43b817..7bbc28df7f3e 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -189,7 +189,8 @@ public void onNext(ByteBuffer byteBuffer) { ByteBuffer allocatedBuffer = getFinalChecksumAppendedChunk(byteBuffer); wrapped.onNext(allocatedBuffer); } else { - wrapped.onNext(byteBuffer); + ByteBuffer allocatedBuffer = appendChunkSizeAndFinalByte(byteBuffer); + wrapped.onNext(allocatedBuffer); } } catch (SdkException sdkException) { this.subscription.cancel(); @@ -215,6 +216,14 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) { return checksumAppendedBuffer; } + private ByteBuffer appendChunkSizeAndFinalByte(ByteBuffer byteBuffer) { + ByteBuffer contentChunk = createChunk(byteBuffer, false); + ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate(contentChunk.remaining()); + checksumAppendedBuffer.put(contentChunk); + checksumAppendedBuffer.flip(); + return checksumAppendedBuffer; + } + @Override public void onError(Throwable t) { wrapped.onError(t); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java index 8f7b2a483607..b3efc9a072a2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java @@ -47,12 +47,13 @@ */ @SdkInternalApi public final class FileAsyncRequestBody implements AsyncRequestBody { - private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class); /** * Default size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber. */ - private static final int DEFAULT_CHUNK_SIZE = 16 * 1024; + public static final int DEFAULT_CHUNK_SIZE = 16 * 1024; + + private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class); /** * File to read. diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java index ea7113043262..988dee8b9c73 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java @@ -25,6 +25,8 @@ import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.core.internal.async.ChecksumCalculatingAsyncRequestBody; +import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; +import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.util.ChunkContentUtils; import software.amazon.awssdk.core.internal.util.HttpChecksumUtils; import software.amazon.awssdk.http.Header; @@ -97,7 +99,11 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttpRequest context, ChecksumSpecs checksum, long checksumContentLength, long originalContentLength) { - long chunkLength = ChunkContentUtils.calculateChunkLength(originalContentLength); + long chunkLength = isFileAsyncRequestBody(context) + ? AwsUnsignedChunkedEncodingInputStream + .calculateStreamContentLength(originalContentLength, FileAsyncRequestBody.DEFAULT_CHUNK_SIZE) + : ChunkContentUtils.calculateChunkLength(originalContentLength); + return context.httpRequest().copy(r -> r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName()) .putHeader("Content-encoding", HttpChecksumConstant.AWS_CHUNKED_HEADER) @@ -106,4 +112,9 @@ private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttp .putHeader(Header.CONTENT_LENGTH, Long.toString(chunkLength + checksumContentLength))); } + + private static boolean isFileAsyncRequestBody(Context.ModifyHttpRequest context) { + return context.asyncRequestBody().isPresent() && context.asyncRequestBody().get() instanceof FileAsyncRequestBody; + } + } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java index 57b125f8caf2..b76e20653d2c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java @@ -29,6 +29,7 @@ import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.util.HttpChecksumResolver; import software.amazon.awssdk.core.internal.util.HttpChecksumUtils; @@ -73,7 +74,7 @@ public Optional modifyHttpContent(Context.ModifyHttpRequest context RequestBody.fromContentProvider( streamProvider, AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength( - requestBody.optionalContentLength().orElse(0L)) + requestBody.optionalContentLength().orElse(0L), AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE) + checksumContentLength, requestBody.contentType())); } @@ -112,7 +113,8 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, Execu .putHeader("x-amz-decoded-content-length", Long.toString(originalContentLength)) .putHeader(CONTENT_LENGTH, Long.toString(AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength( - originalContentLength) + checksumContentLength))); + originalContentLength, AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE) + + checksumContentLength))); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java index d98deb0e229e..9dee93e70a32 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java @@ -39,7 +39,7 @@ @SdkInternalApi public abstract class AwsChunkedEncodingInputStream extends SdkInputStream { - protected static final int DEFAULT_CHUNK_SIZE = 128 * 1024; + public static final int DEFAULT_CHUNK_SIZE = 128 * 1024; protected static final int SKIP_BUFFER_SIZE = 256 * 1024; protected static final String CRLF = "\r\n"; protected static final byte[] FINAL_CHUNK = new byte[0]; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java index 4c04a1b644ec..dd4eb9987193 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java @@ -68,17 +68,19 @@ private static long calculateChunkLength(long originalContentLength) { + CRLF.length(); } - public static long calculateStreamContentLength(long originalLength) { - if (originalLength < 0) { - throw new IllegalArgumentException("Non negative content length expected."); + public static long calculateStreamContentLength(long originalLength, long defaultChunkSize) { + if (originalLength < 0 || defaultChunkSize == 0) { + throw new IllegalArgumentException(originalLength + ", " + defaultChunkSize + "Args <= 0 not expected"); } - long maxSizeChunks = originalLength / DEFAULT_CHUNK_SIZE; - long remainingBytes = originalLength % DEFAULT_CHUNK_SIZE; + long maxSizeChunks = originalLength / defaultChunkSize; + long remainingBytes = originalLength % defaultChunkSize; - return maxSizeChunks * calculateChunkLength(DEFAULT_CHUNK_SIZE) - + (remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0) - + calculateChunkLength(0); + long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize); + long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0; + long lastByteSize = "0".length() + CRLF.length(); + + return allChunks + remainingInChunk + lastByteSize; } @Override diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java index 7f212dbb7094..5b69749b6e7b 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java @@ -55,10 +55,11 @@ public void readAwsUnsignedChunkedEncodingInputStream() throws IOException { public void lengthsOfCalculateByChecksumCalculatingInputStream(){ String initialString = "Hello world"; - long calculateChunkLength = AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(initialString.length()); + long calculateChunkLength = AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(initialString.length(), + AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE); long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength( SHA256_ALGORITHM, SHA256_HEADER_NAME); - assertThat(calculateChunkLength).isEqualTo(21); + assertThat(calculateChunkLength).isEqualTo(19); assertThat(checksumContentLength).isEqualTo(69); } diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java index 43f16fc40097..8e9f6e2426b5 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java @@ -20,9 +20,14 @@ import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; import java.io.InputStreamReader; +import java.io.RandomAccessFile; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -303,7 +308,7 @@ public void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() throws Inte } @Test - public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient() throws InterruptedException { + public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallRequestBody() throws InterruptedException { s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) @@ -320,6 +325,23 @@ public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient() throws assertThat(response).isEqualTo("Hello world"); } + @Test + public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody() throws InterruptedException { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString(createDataSize(HUGE_MSG_SIZE))).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(createDataSize(HUGE_MSG_SIZE)); + } @Disabled("Http Async Signing is not supported for S3") @@ -416,4 +438,118 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep assertThat(interceptor.responseValidation()).isNull(); assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); } + + + @Test + public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, IOException { + File randomFileOfFixedLength = getRandomFileOfFixedLength(10); + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromFile(randomFileOfFixedLength.toPath())).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(response).isEqualTo(new String (bytes)); + + + } + + @Test + public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() + throws IOException { + + File randomFileOfFixedLength = getRandomFileOfFixedLength(17); + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromFile(randomFileOfFixedLength.toPath())).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(response).isEqualTo(new String (bytes)); + + } + + @Test + public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, + IOException { + + File randomFileOfFixedLength = getRandomFileOfFixedLength(10); + + s3Https.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), RequestBody.fromFile(randomFileOfFixedLength.toPath())); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + ResponseInputStream s3HttpsObject = + s3Https.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).checksumMode(ChecksumMode.ENABLED).build()); + String text = new BufferedReader( + new InputStreamReader(s3HttpsObject, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(text).isEqualTo(new String(bytes)); + } + + + @Test + public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws InterruptedException, + IOException { + + File randomFileOfFixedLength = getRandomFileOfFixedLength(34); + + s3Https.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), RequestBody.fromFile(randomFileOfFixedLength.toPath())); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + ResponseInputStream s3HttpsObject = + s3Https.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).checksumMode(ChecksumMode.ENABLED).build()); + String text = new BufferedReader( + new InputStreamReader(s3HttpsObject, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(text).isEqualTo(new String(bytes)); + } + + private File getRandomFileOfFixedLength(int sizeInKb) throws IOException { + int objectSize = sizeInKb * 1024 ; + final File tempFile = File.createTempFile("s3-object-file-", ".tmp"); + try (RandomAccessFile f = new RandomAccessFile(tempFile, "rw")) { + f.setLength(objectSize ); + } + tempFile.deleteOnExit(); + return tempFile; + } + } From f02605b11cfd0977f32c2061a08782d3147f6dbd Mon Sep 17 00:00:00 2001 From: John Viegas Date: Sun, 4 Sep 2022 12:15:54 -0700 Subject: [PATCH 2/6] Updated Fix to support AsyncRequestFileBody of variable chunk size --- .../ChecksumCalculatingAsyncRequestBody.java | 16 +- .../internal/async/FileAsyncRequestBody.java | 5 +- ...estBodyHttpChecksumTrailerInterceptor.java | 15 +- ...AwsUnsignedChunkedEncodingInputStream.java | 12 +- .../AwsChunkedEncodingInputStreamTest.java | 2 +- .../AsyncHttpChecksumIntegrationTest.java | 232 ++++++++++++++++++ .../checksum/HttpChecksumIntegrationTest.java | 179 ++------------ .../awssdk/services/s3/utils/S3TestUtils.java | 29 +++ .../utils/async/ByteBufferingSubscriber.java | 96 ++++++++ .../async/ByteBufferingSubscriberTest.java | 89 +++++++ 10 files changed, 479 insertions(+), 196 deletions(-) create mode 100644 services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java create mode 100644 utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java create mode 100644 utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 7bbc28df7f3e..3c59cdef9e1f 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.async.ByteBufferingSubscriber; import software.amazon.awssdk.utils.builder.SdkBuilder; /** @@ -41,6 +42,8 @@ @SdkInternalApi public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody { + public static final int DEFAULT_CHUNK_SIZE = 16 * 1024; + public static final byte[] FINAL_BYTE = new byte[0]; private final AsyncRequestBody wrapped; private final SdkChecksum sdkChecksum; @@ -148,7 +151,8 @@ public void subscribe(Subscriber s) { if (sdkChecksum != null) { sdkChecksum.reset(); } - wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes)); + wrapped.subscribe(new ByteBufferingSubscriber( + new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes), DEFAULT_CHUNK_SIZE)); } private static final class ChecksumCalculatingSubscriber implements Subscriber { @@ -189,7 +193,7 @@ public void onNext(ByteBuffer byteBuffer) { ByteBuffer allocatedBuffer = getFinalChecksumAppendedChunk(byteBuffer); wrapped.onNext(allocatedBuffer); } else { - ByteBuffer allocatedBuffer = appendChunkSizeAndFinalByte(byteBuffer); + ByteBuffer allocatedBuffer = createChunk(byteBuffer, false); wrapped.onNext(allocatedBuffer); } } catch (SdkException sdkException) { @@ -216,14 +220,6 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) { return checksumAppendedBuffer; } - private ByteBuffer appendChunkSizeAndFinalByte(ByteBuffer byteBuffer) { - ByteBuffer contentChunk = createChunk(byteBuffer, false); - ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate(contentChunk.remaining()); - checksumAppendedBuffer.put(contentChunk); - checksumAppendedBuffer.flip(); - return checksumAppendedBuffer; - } - @Override public void onError(Throwable t) { wrapped.onError(t); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java index b3efc9a072a2..8f7b2a483607 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java @@ -47,13 +47,12 @@ */ @SdkInternalApi public final class FileAsyncRequestBody implements AsyncRequestBody { + private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class); /** * Default size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber. */ - public static final int DEFAULT_CHUNK_SIZE = 16 * 1024; - - private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class); + private static final int DEFAULT_CHUNK_SIZE = 16 * 1024; /** * File to read. diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java index 988dee8b9c73..0e4e1f7569af 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.core.internal.interceptor; +import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength; + import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.ClientType; @@ -25,8 +27,6 @@ import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.core.internal.async.ChecksumCalculatingAsyncRequestBody; -import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; -import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.util.ChunkContentUtils; import software.amazon.awssdk.core.internal.util.HttpChecksumUtils; import software.amazon.awssdk.http.Header; @@ -99,10 +99,8 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttpRequest context, ChecksumSpecs checksum, long checksumContentLength, long originalContentLength) { - long chunkLength = isFileAsyncRequestBody(context) - ? AwsUnsignedChunkedEncodingInputStream - .calculateStreamContentLength(originalContentLength, FileAsyncRequestBody.DEFAULT_CHUNK_SIZE) - : ChunkContentUtils.calculateChunkLength(originalContentLength); + long chunkLength = + calculateStreamContentLength(originalContentLength, ChecksumCalculatingAsyncRequestBody.DEFAULT_CHUNK_SIZE); return context.httpRequest().copy(r -> r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName()) @@ -112,9 +110,4 @@ private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttp .putHeader(Header.CONTENT_LENGTH, Long.toString(chunkLength + checksumContentLength))); } - - private static boolean isFileAsyncRequestBody(Context.ModifyHttpRequest context) { - return context.asyncRequestBody().isPresent() && context.asyncRequestBody().get() instanceof FileAsyncRequestBody; - } - } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java index dd4eb9987193..dbbd16a20234 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java @@ -48,12 +48,10 @@ public static Builder builder() { * @return Content length of the trailer that will be appended at the end. */ public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) { - int checksumLength = algorithm.base64EncodedLength(); - - return (headerName.length() - + HEADER_COLON_SEPARATOR.length() - + checksumLength - + CRLF.length()); + return headerName.length() + + HEADER_COLON_SEPARATOR.length() + + algorithm.base64EncodedLength().longValue() + + CRLF.length() + CRLF.length(); } /** @@ -78,7 +76,7 @@ public static long calculateStreamContentLength(long originalLength, long defaul long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize); long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0; - long lastByteSize = "0".length() + CRLF.length(); + long lastByteSize = (long) "0".length() + (long) CRLF.length(); return allChunks + remainingInChunk + lastByteSize; } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java index 5b69749b6e7b..0fa862dd2acb 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java @@ -60,7 +60,7 @@ public void lengthsOfCalculateByChecksumCalculatingInputStream(){ long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength( SHA256_ALGORITHM, SHA256_HEADER_NAME); assertThat(calculateChunkLength).isEqualTo(19); - assertThat(checksumContentLength).isEqualTo(69); + assertThat(checksumContentLength).isEqualTo(71); } @Test diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java new file mode 100644 index 000000000000..00c422cd3322 --- /dev/null +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java @@ -0,0 +1,232 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.checksum; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.services.s3.utils.S3TestUtils.KB; +import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb; +import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters; +import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute; +import software.amazon.awssdk.authcrt.signer.internal.DefaultAwsCrtS3V4aSigner; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.checksums.Algorithm; +import software.amazon.awssdk.core.checksums.ChecksumValidation; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor; +import software.amazon.awssdk.testutils.Waiter; + +public class AsyncHttpChecksumIntegrationTest extends S3IntegrationTestBase { + + protected static final String KEY = "some-key"; + private static final String BUCKET = temporaryBucketName(AsyncHttpChecksumIntegrationTest.class); + public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); + protected static S3AsyncClient s3HttpAsync; + + + @BeforeAll + public static void setUp() throws Exception { + + s3 = s3ClientBuilder().build(); + + s3Async = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)).build(); + + // Http Client to generate Signed request + s3HttpAsync = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)) + .endpointOverride(URI.create("http://s3." + DEFAULT_REGION + ".amazonaws.com")).build(); + + + createBucket(BUCKET); + + Waiter.run(() -> s3.headBucket(r -> r.bucket(BUCKET))) + .ignoringException(NoSuchBucketException.class) + .orFail(); + interceptor.reset(); + } + + + @Test + void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .overrideConfiguration(o -> o.signer(DefaultAwsCrtS3V4aSigner.create())) + + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString("Hello world")).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo("Hello world"); + } + + @Test + void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallRequestBody() throws InterruptedException { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString("Hello world")).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo("Hello world"); + } + + + @ParameterizedTest + @ValueSource(ints = {1, 12, 16, 17, 32, 33}) + void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody(int dataSize) throws InterruptedException { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString(createDataOfSizeInKb(dataSize, 'a'))).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(createDataOfSizeInKb(dataSize, 'a')); + } + + @ParameterizedTest + @ValueSource(ints = {1*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB}) + void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody + (int chunkSize) throws IOException { + File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(64); + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) + .chunkSizeInBytes(chunkSize) + .build()).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(response).isEqualTo(new String(bytes)); + } + + /** + * Test two async call made back to back with different sizes parameterized to test for different chunk sizes + */ + @ParameterizedTest + @ValueSource(ints = {1 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB}) + void asyncHttpsValidUnsignedTrailer_TwoRequests_withDifferentChunkSize_OfFileAsyncFileRequestBody(int chunkSize) + throws IOException { + + File randomFileOfFixedLengthOne = fixedLengthFileWithRandomCharacters(64); + File randomFileOfFixedLengthTwo = fixedLengthFileWithRandomCharacters(17); + CompletableFuture putObjectFutureOne = + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), + FileAsyncRequestBody.builder().path(randomFileOfFixedLengthOne.toPath()).chunkSizeInBytes(chunkSize).build()); + + String keyTwo = KEY + "_two"; + CompletableFuture putObjectFutureTwo = + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(keyTwo) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), + FileAsyncRequestBody.builder().path(randomFileOfFixedLengthTwo.toPath()).chunkSizeInBytes(chunkSize).build()); + + putObjectFutureOne.join(); + putObjectFutureTwo.join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + + String responseTwo = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(keyTwo).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + + assertThat(response).isEqualTo(new String(Files.readAllBytes(randomFileOfFixedLengthOne.toPath()))); + assertThat(responseTwo).isEqualTo(new String(Files.readAllBytes(randomFileOfFixedLengthTwo.toPath()))); + + } + + + @Disabled("Http Async Signing is not supported for S3") + void asyncValidSignedTrailerChecksumCalculatedBySdkClient() { + ExecutionAttributes executionAttributes = ExecutionAttributes.builder() + .put(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING, + true).build(); + s3HttpAsync.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .overrideConfiguration(o -> o.executionAttributes(executionAttributes)) + .key(KEY) + .build(), AsyncRequestBody.fromString("Hello world")).join(); + String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY) + .build(), AsyncResponseTransformer.toBytes()).join() + .asUtf8String(); + assertThat(response).isEqualTo("Hello world"); + } + + + +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java index 8e9f6e2426b5..417dbf3f7b19 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java @@ -17,31 +17,26 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb; +import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; -import java.io.RandomAccessFile; import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Paths; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute; import software.amazon.awssdk.authcrt.signer.internal.DefaultAwsCrtS3V4aSigner; import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.checksums.Algorithm; import software.amazon.awssdk.core.checksums.ChecksumValidation; -import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; @@ -59,12 +54,12 @@ public class HttpChecksumIntegrationTest extends S3IntegrationTestBase { - protected static final String KEY = "some-key"; public static final int HUGE_MSG_SIZE = 16384; + protected static final String KEY = "some-key"; + private static final String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class); public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); protected static S3Client s3Https; protected static S3AsyncClient s3HttpAsync; - private static String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class); @BeforeAll public static void setUp() throws Exception { @@ -91,19 +86,10 @@ public static void setUp() throws Exception { } @AfterAll - public static void tearDown(){ + public static void tearDown() { deleteBucketAndAllContents(BUCKET); } - private static String createDataSize(int msgSize) { - msgSize = msgSize / 2; - msgSize = msgSize * 1024; - StringBuilder sb = new StringBuilder(msgSize); - for (int i = 0; i < msgSize; i++) { - sb.append('a'); - } - return sb.toString(); - } @AfterEach public void clear() { @@ -286,88 +272,13 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClientWithSigv4a() { assertThat(text).isEqualTo("Hello world"); } - - @Test - public void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() throws InterruptedException { - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .overrideConfiguration(o -> o.signer(DefaultAwsCrtS3V4aSigner.create())) - - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString("Hello world")).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo("Hello world"); - } - - @Test - public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallRequestBody() throws InterruptedException { - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString("Hello world")).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo("Hello world"); - } - - @Test - public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody() throws InterruptedException { - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString(createDataSize(HUGE_MSG_SIZE))).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo(createDataSize(HUGE_MSG_SIZE)); - } - - - @Disabled("Http Async Signing is not supported for S3") - public void asyncValidSignedTrailerChecksumCalculatedBySdkClient() { - ExecutionAttributes executionAttributes = ExecutionAttributes.builder() - .put(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING, - true).build(); - s3HttpAsync.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .overrideConfiguration(o -> o.executionAttributes(executionAttributes)) - .key(KEY) - .build(), AsyncRequestBody.fromString("Hello world")).join(); - String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY) - .build(), AsyncResponseTransformer.toBytes()).join() - .asUtf8String(); - assertThat(response).isEqualTo("Hello world"); - } - @Test public void syncUnsignedPayloadForHugeMessage() throws InterruptedException { s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataSize(HUGE_MSG_SIZE))); + .build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -385,16 +296,16 @@ public void syncUnsignedPayloadForHugeMessage() throws InterruptedException { .collect(Collectors.joining("\n")); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); + assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')); } @Test - public void syncSignedPayloadForHugeMessage(){ + public void syncSignedPayloadForHugeMessage() { s3.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataSize(HUGE_MSG_SIZE))); + .build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -411,7 +322,7 @@ public void syncSignedPayloadForHugeMessage(){ .collect(Collectors.joining("\n")); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); + assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')); } @Test @@ -420,7 +331,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataSize(HUGE_MSG_SIZE))); + .build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -436,62 +347,15 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep assertThat(interceptor.validationAlgorithm()).isNull(); assertThat(interceptor.responseValidation()).isNull(); - assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); - } - - - @Test - public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, IOException { - File randomFileOfFixedLength = getRandomFileOfFixedLength(10); - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromFile(randomFileOfFixedLength.toPath())).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - - byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); - assertThat(response).isEqualTo(new String (bytes)); - - + assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')); } - @Test - public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() - throws IOException { - - File randomFileOfFixedLength = getRandomFileOfFixedLength(17); - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromFile(randomFileOfFixedLength.toPath())).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - - byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); - assertThat(response).isEqualTo(new String (bytes)); - - } @Test public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, IOException { - File randomFileOfFixedLength = getRandomFileOfFixedLength(10); + File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(10); s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) @@ -516,11 +380,8 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileR @Test - public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws InterruptedException, - IOException { - - File randomFileOfFixedLength = getRandomFileOfFixedLength(34); - + public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws IOException { + File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(34); s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) @@ -542,14 +403,4 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRe assertThat(text).isEqualTo(new String(bytes)); } - private File getRandomFileOfFixedLength(int sizeInKb) throws IOException { - int objectSize = sizeInKb * 1024 ; - final File tempFile = File.createTempFile("s3-object-file-", ".tmp"); - try (RandomAccessFile f = new RandomAccessFile(tempFile, "rw")) { - f.setLength(objectSize ); - } - tempFile.deleteOnExit(); - return tempFile; - } - } diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java index 952fbccdc701..91314db0bb6b 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java @@ -15,13 +15,20 @@ package software.amazon.awssdk.services.s3.utils; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.RandomAccessFile; import java.rmi.NoSuchObjectException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Bucket; @@ -41,6 +48,7 @@ public class S3TestUtils { private static final Logger log = Logger.loggerFor(S3TestUtils.class); private static final String TEST_BUCKET_PREFIX = "s3-test-bucket-"; private static final String NON_DNS_COMPATIBLE_TEST_BUCKET_PREFIX = "s3.test.bucket."; + public static final int KB = 1024; private static Map, List> cleanupTasks = new ConcurrentHashMap<>(); @@ -167,4 +175,25 @@ public static void deleteBucketAndAllContents(S3Client s3, String bucketName) { e.printStackTrace(); } } + + public static File fixedLengthFileWithRandomCharacters(int sizeInKb) throws IOException { + File tempFile = File.createTempFile("s3-object-file-", ".tmp"); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) { + PrintWriter writer = new PrintWriter(tempFile, "UTF-8"); + int objectSize = sizeInKb * KB; + Random random = new Random(); + for (int index = 0; index < objectSize; index ++) { + writer.print(index % 5 == 0 ? ' ' : (char) ('a' + random.nextInt(26))); + } + writer.flush(); + } + tempFile.deleteOnExit(); + return tempFile; + } + + public static String createDataOfSizeInKb(int dataSize, char contentCharacter) { + return IntStream.range(0, dataSize * 1024).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); + } + + } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java new file mode 100644 index 000000000000..9cc6be2af7f7 --- /dev/null +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java @@ -0,0 +1,96 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.utils.async; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkProtectedApi; + + +@SdkProtectedApi +public class ByteBufferingSubscriber extends DelegatingSubscriber { + private final int bufferSize; + private final AtomicInteger bufferedBytes = new AtomicInteger(0); + private final ByteBuffer currentBuffer; + private Subscription subscription; + + + public ByteBufferingSubscriber(Subscriber subscriber, int bufferSize) { + super(subscriber); + this.bufferSize = bufferSize; + currentBuffer = ByteBuffer.allocate(bufferSize); + + } + + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + super.onSubscribe(subscription); + } + + @Override + public void onNext(T t) { + int startPosition = 0; + int currentBytesRead = t.remaining(); + do { + int availableToRead = bufferSize - bufferedBytes.get(); + int bytesToMove = availableToRead > (currentBytesRead - startPosition) + ? (currentBytesRead - startPosition) + : availableToRead; + + if (bufferedBytes.get() == 0) { + currentBuffer.put(t.array(), startPosition, bytesToMove); + + } else { + currentBuffer.put(t.array(), 0, bytesToMove); + } + startPosition = startPosition + bytesToMove; + if (bufferedBytes.addAndGet(bytesToMove) == bufferSize) { + currentBuffer.position(0); + subscriber.onNext((T) currentBuffer); + currentBuffer.clear(); + bufferedBytes.addAndGet(-bufferSize); + } + } while (startPosition < currentBytesRead); + + if (bufferedBytes.get() > 0) { + subscription.request(1); + } + } + + @Override + public void onComplete() { + // Deliver any remaining items before calling on complete + + if (bufferedBytes.get() > 0) { + currentBuffer.position(0); + if (bufferedBytes.get() < bufferSize) { + // Create a ByteBuffer with capacity equal to remaining bytes in the currentBuffer. + ByteBuffer trimmedBuffer = ByteBuffer.allocate(bufferedBytes.get()); + trimmedBuffer.put(currentBuffer.array(), 0, bufferedBytes.get()); + trimmedBuffer.position(0); + subscriber.onNext((T) trimmedBuffer); + } else { + subscriber.onNext((T) currentBuffer); + } + currentBuffer.clear(); + } + super.onComplete(); + } +} diff --git a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java new file mode 100644 index 000000000000..71a4f0d1c21d --- /dev/null +++ b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java @@ -0,0 +1,89 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.utils.async; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; +import java.util.stream.IntStream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +@RunWith(MockitoJUnitRunner.class) +public class ByteBufferingSubscriberTest { + + private static final int BUFFER_SIZE = 6; + + private static final Object data = ByteBuffer.allocate(1); + + @Mock + private Subscriber mockSubscriber; + + @Mock + private Subscription mockSubscription; + + private Subscriber bufferingSubscriber; + + @Before + public void setup() { + doNothing().when(mockSubscriber).onSubscribe(any()); + doNothing().when(mockSubscriber).onNext(any()); + doNothing().when(mockSubscriber).onComplete(); + doNothing().when(mockSubscription).request(anyLong()); + + bufferingSubscriber = new ByteBufferingSubscriber(mockSubscriber, BUFFER_SIZE); + bufferingSubscriber.onSubscribe(mockSubscription); + } + + @Test + public void onNextNotCalled_WhenCurrentSizeLessThanBufferSize() { + int count = 3; + callOnNext(count); + + verify(mockSubscription, times(count)).request(1); + verify(mockSubscriber, times(0)).onNext(any()); + } + + @Test + public void onNextIsCalled_onlyWhen_BufferSizeRequirementIsMet() { + callOnNext(BUFFER_SIZE); + + verify(mockSubscriber, times(1)).onNext(any()); + } + + @Test + public void onNextIsCalled_DuringOnComplete_WhenBufferNotEmpty() { + int count = 8; + callOnNext(count); + bufferingSubscriber.onComplete(); + + verify(mockSubscriber, times(count/BUFFER_SIZE + 1)).onNext(any()); + } + + private void callOnNext(int times) { + IntStream.range(0, times).forEach(i -> bufferingSubscriber.onNext(data)); + } + +} From b6625154b7e093cacfd60e7fd3842abbd24084a0 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 8 Sep 2022 13:27:38 -0700 Subject: [PATCH 3/6] Reusing .flatMapIterable API of SDKPublisher to create Chunks of fixed size --- .../ChecksumCalculatingAsyncRequestBody.java | 66 +++++++++-- ...AwsUnsignedChunkedEncodingInputStream.java | 3 +- .../AsyncHttpChecksumIntegrationTest.java | 19 +-- .../checksum/HttpChecksumIntegrationTest.java | 23 ++-- .../awssdk/services/s3/utils/S3TestUtils.java | 29 ----- .../services/s3/utils/ChecksumUtils.java | 29 +++++ ...uestBodyFlexibleChecksumInTrailerTest.java | 108 +++++++++++++++--- .../utils/async/ByteBufferingSubscriber.java | 96 ---------------- .../utils/async/FlatteningSubscriber.java | 21 +++- .../async/ByteBufferingSubscriberTest.java | 89 --------------- 10 files changed, 223 insertions(+), 260 deletions(-) delete mode 100644 utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java delete mode 100644 utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 3c59cdef9e1f..3eef080a419b 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -21,6 +21,8 @@ import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChunk; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscriber; @@ -32,7 +34,6 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Validate; -import software.amazon.awssdk.utils.async.ByteBufferingSubscriber; import software.amazon.awssdk.utils.builder.SdkBuilder; /** @@ -50,6 +51,8 @@ public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody { private final Algorithm algorithm; private final String trailerHeader; private final AtomicLong remainingBytes; + private final long totalBytes; + private final ByteBuffer currentBuffer; private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) { @@ -60,8 +63,10 @@ private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) { this.algorithm = builder.algorithm; this.sdkChecksum = builder.algorithm != null ? SdkChecksum.forAlgorithm(algorithm) : null; this.trailerHeader = builder.trailerHeader; - this.remainingBytes = new AtomicLong(wrapped.contentLength() - .orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied."))); + this.totalBytes = wrapped.contentLength() + .orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied.")); + this.remainingBytes = new AtomicLong(); + this.currentBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); } /** @@ -151,8 +156,11 @@ public void subscribe(Subscriber s) { if (sdkChecksum != null) { sdkChecksum.reset(); } - wrapped.subscribe(new ByteBufferingSubscriber( - new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes), DEFAULT_CHUNK_SIZE)); + + this.remainingBytes.set(totalBytes); + + wrapped.flatMapIterable(this::bufferAndCreateChunks) + .subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes)); } private static final class ChecksumCalculatingSubscriber implements Subscriber { @@ -166,11 +174,11 @@ private static final class ChecksumCalculatingSubscriber implements Subscriber wrapped, SdkChecksum checksum, - String trailerHeader, AtomicLong remainingBytes) { + String trailerHeader, long totalBytes) { this.wrapped = wrapped; this.checksum = checksum; this.trailerHeader = trailerHeader; - this.remainingBytes = remainingBytes; + this.remainingBytes = new AtomicLong(totalBytes); } @Override @@ -230,4 +238,48 @@ public void onComplete() { wrapped.onComplete(); } } + + private Iterable bufferAndCreateChunks(ByteBuffer buffer) { + int startPosition = 0; + int currentBytesRead = buffer.remaining(); + + List resultBufferedList = new ArrayList<>(); + do { + int bufferedBytes = currentBuffer.position(); + int availableToRead = DEFAULT_CHUNK_SIZE - bufferedBytes; + int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); + + if (bufferedBytes == 0) { + currentBuffer.put(buffer.array(), startPosition, bytesToMove); + } else { + currentBuffer.put(buffer.array(), 0, bytesToMove); + } + + startPosition = startPosition + bytesToMove; + + // Send the data once the buffer is full + if (currentBuffer.position() == DEFAULT_CHUNK_SIZE) { + currentBuffer.position(0); + ByteBuffer bufferToSend = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + bufferToSend.put(currentBuffer.array(), 0, DEFAULT_CHUNK_SIZE); + bufferToSend.clear(); + currentBuffer.clear(); + resultBufferedList.add(bufferToSend); + remainingBytes.addAndGet(-DEFAULT_CHUNK_SIZE); + } + + } while (startPosition < currentBytesRead); + + int bufferedBytes = currentBuffer.position(); + // Send the remainder buffered bytes at the end when there no more bytes + if (bufferedBytes > 0 && remainingBytes.get() == bufferedBytes) { + currentBuffer.clear(); + ByteBuffer trimmedBuffer = ByteBuffer.allocate(bufferedBytes); + trimmedBuffer.put(currentBuffer.array(), 0, bufferedBytes); + trimmedBuffer.clear(); + resultBufferedList.add(trimmedBuffer); + remainingBytes.addAndGet(-bufferedBytes); + } + return resultBufferedList; + } } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java index dbbd16a20234..186ca5d7d0d8 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java @@ -76,7 +76,8 @@ public static long calculateStreamContentLength(long originalLength, long defaul long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize); long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0; - long lastByteSize = (long) "0".length() + (long) CRLF.length(); + // last byte is composed of a "0" and "\r\n" + long lastByteSize = 1 + (long) CRLF.length(); return allChunks + remainingInChunk + lastByteSize; } diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java index 00c422cd3322..c9b1d7e3b99e 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java @@ -16,9 +16,9 @@ package software.amazon.awssdk.services.s3.checksum; import static org.assertj.core.api.Assertions.assertThat; -import static software.amazon.awssdk.services.s3.utils.S3TestUtils.KB; -import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb; -import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.KB; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.createDataOfSize; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.fixedLengthInKbFileWithRandomOrFixedCharacters; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.File; @@ -118,13 +118,14 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallReques @ParameterizedTest - @ValueSource(ints = {1, 12, 16, 17, 32, 33}) + //createDataOfSizeInKb already creates data in kb + @ValueSource(ints = {1*KB, 3*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB}) void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody(int dataSize) throws InterruptedException { s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString(createDataOfSizeInKb(dataSize, 'a'))).join(); + .build(), AsyncRequestBody.fromString(createDataOfSize(dataSize, 'a'))).join(); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -133,14 +134,14 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo(createDataOfSizeInKb(dataSize, 'a')); + assertThat(response).isEqualTo(createDataOfSize(dataSize, 'a')); } @ParameterizedTest @ValueSource(ints = {1*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB}) void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody (int chunkSize) throws IOException { - File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(64); + File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(64, true); s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) @@ -169,8 +170,8 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest void asyncHttpsValidUnsignedTrailer_TwoRequests_withDifferentChunkSize_OfFileAsyncFileRequestBody(int chunkSize) throws IOException { - File randomFileOfFixedLengthOne = fixedLengthFileWithRandomCharacters(64); - File randomFileOfFixedLengthTwo = fixedLengthFileWithRandomCharacters(17); + File randomFileOfFixedLengthOne = fixedLengthInKbFileWithRandomOrFixedCharacters(64, true); + File randomFileOfFixedLengthTwo = fixedLengthInKbFileWithRandomOrFixedCharacters(17, true); CompletableFuture putObjectFutureOne = s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java index 417dbf3f7b19..40be0176ea4c 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java @@ -17,8 +17,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb; -import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.KB; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.createDataOfSize; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.fixedLengthInKbFileWithRandomOrFixedCharacters; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.BufferedReader; @@ -54,7 +55,7 @@ public class HttpChecksumIntegrationTest extends S3IntegrationTestBase { - public static final int HUGE_MSG_SIZE = 16384; + public static final int HUGE_MSG_SIZE = 16384 * KB; protected static final String KEY = "some-key"; private static final String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class); public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); @@ -278,7 +279,7 @@ public void syncUnsignedPayloadForHugeMessage() throws InterruptedException { .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'))); + .build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -296,7 +297,7 @@ public void syncUnsignedPayloadForHugeMessage() throws InterruptedException { .collect(Collectors.joining("\n")); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')); + assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a')); } @Test @@ -305,7 +306,7 @@ public void syncSignedPayloadForHugeMessage() { .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'))); + .build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -322,7 +323,7 @@ public void syncSignedPayloadForHugeMessage() { .collect(Collectors.joining("\n")); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')); + assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a')); } @Test @@ -331,7 +332,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'))); + .build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -347,7 +348,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep assertThat(interceptor.validationAlgorithm()).isNull(); assertThat(interceptor.responseValidation()).isNull(); - assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')); + assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a')); } @@ -355,7 +356,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, IOException { - File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(10); + File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(10, true); s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) @@ -381,7 +382,7 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileR @Test public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws IOException { - File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(34); + File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(34, true); s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java index 91314db0bb6b..952fbccdc701 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java @@ -15,20 +15,13 @@ package software.amazon.awssdk.services.s3.utils; -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.RandomAccessFile; import java.rmi.NoSuchObjectException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Bucket; @@ -48,7 +41,6 @@ public class S3TestUtils { private static final Logger log = Logger.loggerFor(S3TestUtils.class); private static final String TEST_BUCKET_PREFIX = "s3-test-bucket-"; private static final String NON_DNS_COMPATIBLE_TEST_BUCKET_PREFIX = "s3.test.bucket."; - public static final int KB = 1024; private static Map, List> cleanupTasks = new ConcurrentHashMap<>(); @@ -175,25 +167,4 @@ public static void deleteBucketAndAllContents(S3Client s3, String bucketName) { e.printStackTrace(); } } - - public static File fixedLengthFileWithRandomCharacters(int sizeInKb) throws IOException { - File tempFile = File.createTempFile("s3-object-file-", ".tmp"); - try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) { - PrintWriter writer = new PrintWriter(tempFile, "UTF-8"); - int objectSize = sizeInKb * KB; - Random random = new Random(); - for (int index = 0; index < objectSize; index ++) { - writer.print(index % 5 == 0 ? ' ' : (char) ('a' + random.nextInt(26))); - } - writer.flush(); - } - tempFile.deleteOnExit(); - return tempFile; - } - - public static String createDataOfSizeInKb(int dataSize, char contentCharacter) { - return IntStream.range(0, dataSize * 1024).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); - } - - } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java index 28dcf23340d8..49e36be60681 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java @@ -15,17 +15,26 @@ package software.amazon.awssdk.services.s3.utils; +import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Utilities for computing the SHA-256 checksums of various binary objects. */ public final class ChecksumUtils { + + public static final int KB = 1024; + public static byte[] computeCheckSum(InputStream is) throws IOException { MessageDigest instance = createMessageDigest(); @@ -66,4 +75,24 @@ private static MessageDigest createMessageDigest() { throw new RuntimeException("Unable to create SHA-256 MessageDigest instance", e); } } + + public static File fixedLengthInKbFileWithRandomOrFixedCharacters(int sizeInKb, boolean isRandom) throws IOException { + File tempFile = File.createTempFile("temp-random-sdk-file-", ".tmp"); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) { + PrintWriter writer = new PrintWriter(tempFile, "UTF-8"); + int objectSize = sizeInKb * 1024; + Random random = new Random(); + for (int index = 0; index < objectSize; index++) { + int offset = isRandom ? random.nextInt(26) : 0; + writer.print(index % 5 == 0 ? ' ' : (char) ('a' + offset)); + } + writer.flush(); + } + tempFile.deleteOnExit(); + return tempFile; + } + + public static String createDataOfSize(int dataSize, char contentCharacter) { + return IntStream.range(0, dataSize).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); + } } diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java index 2be93377d816..603ff7325153 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java @@ -33,8 +33,16 @@ import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.stubbing.Scenario; import com.github.tomakehurst.wiremock.verification.LoggedRequest; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.RandomAccessFile; import java.net.URI; +import java.nio.file.Files; import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -45,6 +53,7 @@ import software.amazon.awssdk.core.HttpChecksumConstant; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; @@ -52,6 +61,7 @@ import software.amazon.awssdk.testutils.EnvironmentVariableHelper; public class AsyncRequestBodyFlexibleChecksumInTrailerTest { + public static final int KB = 1024; private static final String CRLF = "\r\n"; private static final EnvironmentVariableHelper ENVIRONMENT_VARIABLE_HELPER = new EnvironmentVariableHelper(); private static final String SCENARIO = "scenario"; @@ -86,15 +96,11 @@ public void cleanUp() { public void asyncStreaming_NoSigner_shouldContainChecksum_fromInterceptors() { stubResponseWithHeaders(); asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), AsyncRequestBody.fromString( - "abc"), + "abc"), AsyncResponseTransformer.toBytes()).join(); //payload would in json form as "{"StringMember":"foo"}x-amz-checksum-crc32:tcUDMQ==[\r][\n]" - verify(putRequestedFor(anyUrl()).withHeader(CONTENT_LENGTH, equalTo("44"))); - verify(putRequestedFor(anyUrl()).withHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, equalTo("x-amz-checksum-crc32"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-content-sha256", equalTo("STREAMING-UNSIGNED-PAYLOAD-TRAILER"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-decoded-content-length", equalTo("3"))); - verify(putRequestedFor(anyUrl()).withHeader("content-encoding", equalTo("aws-chunked"))); - verify(putRequestedFor(anyUrl()).withHeader("Content-Encoding", equalTo("aws-chunked"))); + verifyHeadersForPutRequest("44", "3", "x-amz-checksum-crc32"); + verify(putRequestedFor(anyUrl()).withRequestBody( containing( "3" + CRLF + "abc" + CRLF @@ -111,23 +117,67 @@ public void asyncStreaming_withRetry_NoSigner_shouldContainChecksum_fromIntercep + "x-amz-checksum-sha256:ungWv48Bz+pBQUDeXa4iI7ADYaOWF3qctBD/YfIAFa0=" + CRLF + CRLF; asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.SHA256), AsyncRequestBody.fromString( - "abc"), + "abc"), AsyncResponseTransformer.toBytes()).join(); List requests = getRecordedRequests(); assertThat(requests.size()).isEqualTo(2); assertThat(requests.get(0).getBody()).contains(expectedRequestBody.getBytes()); assertThat(requests.get(1).getBody()).contains(expectedRequestBody.getBytes()); + verify(putRequestedFor(anyUrl()).withHeader(CONTENT_TYPE, equalTo(Mimetype.MIMETYPE_TEXT_PLAIN))); - verify(putRequestedFor(anyUrl()).withHeader(CONTENT_LENGTH, equalTo("81"))); - verify(putRequestedFor(anyUrl()).withHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, equalTo("x-amz-checksum-sha256"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-content-sha256", equalTo("STREAMING-UNSIGNED-PAYLOAD-TRAILER"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-decoded-content-length", equalTo("3"))); - verify(putRequestedFor(anyUrl()).withHeader("Content-Encoding", equalTo("aws-chunked"))); + verifyHeadersForPutRequest("81", "3", "x-amz-checksum-sha256"); + verify(putRequestedFor(anyUrl()).withRequestBody( containing( expectedRequestBody))); } + + @Test + public void asyncStreaming_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsChecksums_fromInterceptors() throws IOException { + + stubForFailureThenSuccess(500, "500"); + + File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(37, false); + String contentString = new String(Files.readAllBytes(randomFileOfFixedLength.toPath())); + asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), + FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) + .chunkSizeInBytes(16 * KB) + .build(), + AsyncResponseTransformer.toBytes()).join(); + verifyHeadersForPutRequest("37948", "37888", "x-amz-checksum-crc32"); + verify(putRequestedFor(anyUrl()).withRequestBody( + containing( + "4000" + CRLF + contentString.substring(0, 16 * KB) + CRLF + + "4000" + CRLF + contentString.substring(16 * KB, 32 * KB) + CRLF + + "1400" + CRLF + contentString.substring(32 * KB) + CRLF + + "0" + CRLF + + "x-amz-checksum-crc32:wQiPHw==" + CRLF + CRLF))); + } + + @Test + public void asyncStreaming_withRetry_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsChecksums_fromInterceptors() throws IOException { + + + File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(37, false); + String contentString = new String(Files.readAllBytes(randomFileOfFixedLength.toPath())); + stubResponseWithHeaders(); + asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), + FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) + .chunkSizeInBytes(16 * KB) + .build(), + AsyncResponseTransformer.toBytes()).join(); + verifyHeadersForPutRequest("37948", "37888", "x-amz-checksum-crc32"); + verify(putRequestedFor(anyUrl()).withRequestBody( + containing( + "4000" + CRLF + contentString.substring(0, 16 * KB) + CRLF + + "4000" + CRLF + contentString.substring(16 * KB, 32 * KB) + CRLF + + "1400" + CRLF + contentString.substring(32 * KB) + CRLF + + "0" + CRLF + + "x-amz-checksum-crc32:wQiPHw==" + CRLF + CRLF))); + } + + private void stubResponseWithHeaders() { stubFor(put(anyUrl()) .willReturn(aResponse().withStatus(200) @@ -163,4 +213,36 @@ private List getRecordedRequests() { } + private void verifyHeadersForPutRequest(String contentLength, String decodedContentLength, String checksumHeader) { + verify(putRequestedFor(anyUrl()).withHeader(CONTENT_LENGTH, equalTo(contentLength))); + verify(putRequestedFor(anyUrl()).withHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, equalTo( + checksumHeader))); + verify(putRequestedFor(anyUrl()).withHeader("x-amz-content-sha256", equalTo("STREAMING-UNSIGNED-PAYLOAD-TRAILER"))); + verify(putRequestedFor(anyUrl()).withHeader("x-amz-decoded-content-length", equalTo(decodedContentLength))); + verify(putRequestedFor(anyUrl()).withHeader("content-encoding", equalTo("aws-chunked"))); + verify(putRequestedFor(anyUrl()).withHeader("Content-Encoding", equalTo("aws-chunked"))); + } + + + + public static File fixedLengthInKbFileWithRandomOrFixedCharacters(int sizeInKb, boolean isRandom) throws IOException { + File tempFile = File.createTempFile("temp-random-sdk-file-", ".tmp"); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) { + PrintWriter writer = new PrintWriter(tempFile, "UTF-8"); + int objectSize = sizeInKb * 1024; + Random random = new Random(); + for (int index = 0; index < objectSize; index++) { + int offset = isRandom ? random.nextInt(26) : 0; + writer.print(index % 5 == 0 ? ' ' : (char) ('a' + offset)); + } + writer.flush(); + } + tempFile.deleteOnExit(); + return tempFile; + } + + public static String createDataOfSize(int dataSize, char contentCharacter) { + return IntStream.range(0, dataSize).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); + } + } \ No newline at end of file diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java deleted file mode 100644 index 9cc6be2af7f7..000000000000 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriber.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.utils.async; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.annotations.SdkProtectedApi; - - -@SdkProtectedApi -public class ByteBufferingSubscriber extends DelegatingSubscriber { - private final int bufferSize; - private final AtomicInteger bufferedBytes = new AtomicInteger(0); - private final ByteBuffer currentBuffer; - private Subscription subscription; - - - public ByteBufferingSubscriber(Subscriber subscriber, int bufferSize) { - super(subscriber); - this.bufferSize = bufferSize; - currentBuffer = ByteBuffer.allocate(bufferSize); - - } - - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - super.onSubscribe(subscription); - } - - @Override - public void onNext(T t) { - int startPosition = 0; - int currentBytesRead = t.remaining(); - do { - int availableToRead = bufferSize - bufferedBytes.get(); - int bytesToMove = availableToRead > (currentBytesRead - startPosition) - ? (currentBytesRead - startPosition) - : availableToRead; - - if (bufferedBytes.get() == 0) { - currentBuffer.put(t.array(), startPosition, bytesToMove); - - } else { - currentBuffer.put(t.array(), 0, bytesToMove); - } - startPosition = startPosition + bytesToMove; - if (bufferedBytes.addAndGet(bytesToMove) == bufferSize) { - currentBuffer.position(0); - subscriber.onNext((T) currentBuffer); - currentBuffer.clear(); - bufferedBytes.addAndGet(-bufferSize); - } - } while (startPosition < currentBytesRead); - - if (bufferedBytes.get() > 0) { - subscription.request(1); - } - } - - @Override - public void onComplete() { - // Deliver any remaining items before calling on complete - - if (bufferedBytes.get() > 0) { - currentBuffer.position(0); - if (bufferedBytes.get() < bufferSize) { - // Create a ByteBuffer with capacity equal to remaining bytes in the currentBuffer. - ByteBuffer trimmedBuffer = ByteBuffer.allocate(bufferedBytes.get()); - trimmedBuffer.put(currentBuffer.array(), 0, bufferedBytes.get()); - trimmedBuffer.position(0); - subscriber.onNext((T) trimmedBuffer); - } else { - subscriber.onNext((T) currentBuffer); - } - currentBuffer.clear(); - } - super.onComplete(); - } -} diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java index 2053a4e820cf..a9c54bc98f5b 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java @@ -93,10 +93,13 @@ public void request(long l) { handleStateUpdate(); } + @Override public void cancel() { subscription.cancel(); } + + }); } @@ -133,11 +136,19 @@ public void onComplete() { * Increment the downstream demand by the provided value, accounting for overflow. */ private void addDownstreamDemand(long l) { - Validate.isTrue(l > 0, "Demand must not be negative."); - downstreamDemand.getAndUpdate(current -> { - long newValue = current + l; - return newValue >= 0 ? newValue : Long.MAX_VALUE; - }); + + if (l > 0) { + downstreamDemand.getAndUpdate(current -> { + long newValue = current + l; + return newValue >= 0 ? newValue : Long.MAX_VALUE; + }); + } else { + log.error(() -> "Demand " + l + " must not be negative."); + upstreamSubscription.cancel(); + onError(new IllegalArgumentException("Demand must not be negative")); + } + + } /** diff --git a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java deleted file mode 100644 index 71a4f0d1c21d..000000000000 --- a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferingSubscriberTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.utils.async; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.nio.ByteBuffer; -import java.util.stream.IntStream; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -@RunWith(MockitoJUnitRunner.class) -public class ByteBufferingSubscriberTest { - - private static final int BUFFER_SIZE = 6; - - private static final Object data = ByteBuffer.allocate(1); - - @Mock - private Subscriber mockSubscriber; - - @Mock - private Subscription mockSubscription; - - private Subscriber bufferingSubscriber; - - @Before - public void setup() { - doNothing().when(mockSubscriber).onSubscribe(any()); - doNothing().when(mockSubscriber).onNext(any()); - doNothing().when(mockSubscriber).onComplete(); - doNothing().when(mockSubscription).request(anyLong()); - - bufferingSubscriber = new ByteBufferingSubscriber(mockSubscriber, BUFFER_SIZE); - bufferingSubscriber.onSubscribe(mockSubscription); - } - - @Test - public void onNextNotCalled_WhenCurrentSizeLessThanBufferSize() { - int count = 3; - callOnNext(count); - - verify(mockSubscription, times(count)).request(1); - verify(mockSubscriber, times(0)).onNext(any()); - } - - @Test - public void onNextIsCalled_onlyWhen_BufferSizeRequirementIsMet() { - callOnNext(BUFFER_SIZE); - - verify(mockSubscriber, times(1)).onNext(any()); - } - - @Test - public void onNextIsCalled_DuringOnComplete_WhenBufferNotEmpty() { - int count = 8; - callOnNext(count); - bufferingSubscriber.onComplete(); - - verify(mockSubscriber, times(count/BUFFER_SIZE + 1)).onNext(any()); - } - - private void callOnNext(int times) { - IntStream.range(0, times).forEach(i -> bufferingSubscriber.onNext(data)); - } - -} From 7eba1194d3a5b6dabeab51dcb2ed2ae68b79cefc Mon Sep 17 00:00:00 2001 From: John Viegas Date: Mon, 12 Sep 2022 23:59:49 -0700 Subject: [PATCH 4/6] Adding a separate class ChunkBuffer to handle Multiple thread accessing currentBuffer --- .../bugfix-AWSSDKforJavav2-5ecdce1.json | 2 +- .../awssdk/core/HttpChecksumConstant.java | 5 + .../ChecksumCalculatingAsyncRequestBody.java | 65 ++------- .../core/internal/async/ChunkBuffer.java | 138 ++++++++++++++++++ ...estBodyHttpChecksumTrailerInterceptor.java | 3 +- .../awssdk/core/async/ChunkBufferTest.java | 90 ++++++++++++ .../AsyncHttpChecksumIntegrationTest.java | 14 +- .../checksum/HttpChecksumIntegrationTest.java | 14 +- .../services/s3/utils/ChecksumUtils.java | 22 ++- ...uestBodyFlexibleChecksumInTrailerTest.java | 54 +++---- .../utils/async/FlatteningSubscriber.java | 5 - 11 files changed, 290 insertions(+), 122 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json b/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json index 10e6373170a5..74e753aa0cb5 100644 --- a/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json @@ -2,5 +2,5 @@ "type": "bugfix", "category": "AWS SDK for Java v2", "contributor": "", - "description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async File Request body." + "description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async Request body." } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java index ff4f573f4173..8856ab0ed9d0 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java @@ -39,6 +39,11 @@ public final class HttpChecksumConstant { public static final String HEADER_FOR_TRAILER_REFERENCE = "x-amz-trailer"; + /** + * Default chunk size for Async trailer based checksum data transfer* + */ + public static final int DEFAULT_ASYNC_CHUNK_SIZE = 16 * 1024; + private HttpChecksumConstant() { } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 3eef080a419b..ccb3a57b3b41 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -15,14 +15,13 @@ package software.amazon.awssdk.core.internal.async; +import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChunkLength; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChecksumTrailer; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChunk; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscriber; @@ -43,16 +42,12 @@ @SdkInternalApi public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody { - public static final int DEFAULT_CHUNK_SIZE = 16 * 1024; - - public static final byte[] FINAL_BYTE = new byte[0]; + private static final byte[] FINAL_BYTE = new byte[0]; private final AsyncRequestBody wrapped; private final SdkChecksum sdkChecksum; private final Algorithm algorithm; private final String trailerHeader; - private final AtomicLong remainingBytes; private final long totalBytes; - private final ByteBuffer currentBuffer; private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) { @@ -65,8 +60,6 @@ private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) { this.trailerHeader = builder.trailerHeader; this.totalBytes = wrapped.contentLength() .orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied.")); - this.remainingBytes = new AtomicLong(); - this.currentBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); } /** @@ -157,9 +150,7 @@ public void subscribe(Subscriber s) { sdkChecksum.reset(); } - this.remainingBytes.set(totalBytes); - - wrapped.flatMapIterable(this::bufferAndCreateChunks) + wrapped.flatMapIterable(this::buffer) .subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes)); } @@ -239,47 +230,13 @@ public void onComplete() { } } - private Iterable bufferAndCreateChunks(ByteBuffer buffer) { - int startPosition = 0; - int currentBytesRead = buffer.remaining(); - - List resultBufferedList = new ArrayList<>(); - do { - int bufferedBytes = currentBuffer.position(); - int availableToRead = DEFAULT_CHUNK_SIZE - bufferedBytes; - int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); - - if (bufferedBytes == 0) { - currentBuffer.put(buffer.array(), startPosition, bytesToMove); - } else { - currentBuffer.put(buffer.array(), 0, bytesToMove); - } - - startPosition = startPosition + bytesToMove; - - // Send the data once the buffer is full - if (currentBuffer.position() == DEFAULT_CHUNK_SIZE) { - currentBuffer.position(0); - ByteBuffer bufferToSend = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); - bufferToSend.put(currentBuffer.array(), 0, DEFAULT_CHUNK_SIZE); - bufferToSend.clear(); - currentBuffer.clear(); - resultBufferedList.add(bufferToSend); - remainingBytes.addAndGet(-DEFAULT_CHUNK_SIZE); - } - - } while (startPosition < currentBytesRead); - - int bufferedBytes = currentBuffer.position(); - // Send the remainder buffered bytes at the end when there no more bytes - if (bufferedBytes > 0 && remainingBytes.get() == bufferedBytes) { - currentBuffer.clear(); - ByteBuffer trimmedBuffer = ByteBuffer.allocate(bufferedBytes); - trimmedBuffer.put(currentBuffer.array(), 0, bufferedBytes); - trimmedBuffer.clear(); - resultBufferedList.add(trimmedBuffer); - remainingBytes.addAndGet(-bufferedBytes); - } - return resultBufferedList; + private Iterable buffer(ByteBuffer bytes) { + ChunkBuffer chunkBuffer = ChunkBuffer.builder() + .bufferSize(DEFAULT_ASYNC_CHUNK_SIZE) + .totalBytes(bytes.remaining()) + .build(); + chunkBuffer.bufferAndCreateChunks(bytes); + return chunkBuffer.getBufferedList(); } + } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java new file mode 100644 index 000000000000..c7f1cc5eefea --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java @@ -0,0 +1,138 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.SdkBuilder; + +/** + * Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize* + */ +@SdkInternalApi +public final class ChunkBuffer { + private final AtomicLong remainingBytes; + private final ByteBuffer currentBuffer; + private final int bufferSize; + private List bufferedList; + + + private ChunkBuffer(Long totalBytes, Integer bufferSize) { + Validate.notNull(totalBytes, "The totalBytes must not be null"); + + int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE; + this.bufferSize = chunkSize; + this.currentBuffer = ByteBuffer.allocate(chunkSize); + this.remainingBytes = new AtomicLong(totalBytes); + bufferedList = new ArrayList<>(); + } + + + public static Builder builder() { + return new DefaultBuilder(); + } + + public List getBufferedList() { + if (currentBuffer == null) { + throw new IllegalStateException(""); + } + List ret = bufferedList; + bufferedList = new ArrayList<>(); + return Collections.unmodifiableList(ret); + } + + public Iterable bufferAndCreateChunks(ByteBuffer buffer) { + int startPosition = 0; + int currentBytesRead = buffer.remaining(); + + do { + + int bufferedBytes = currentBuffer.position(); + int availableToRead = bufferSize - bufferedBytes; + int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); + + if (bufferedBytes == 0) { + currentBuffer.put(buffer.array(), startPosition, bytesToMove); + } else { + currentBuffer.put(buffer.array(), 0, bytesToMove); + } + + startPosition = startPosition + bytesToMove; + + // Send the data once the buffer is full + if (currentBuffer.position() == bufferSize) { + currentBuffer.position(0); + ByteBuffer bufferToSend = ByteBuffer.allocate(bufferSize); + bufferToSend.put(currentBuffer.array(), 0, bufferSize); + bufferToSend.clear(); + currentBuffer.clear(); + bufferedList.add(bufferToSend); + remainingBytes.addAndGet(-bufferSize); + } + } while (startPosition < currentBytesRead); + + int remainingBytesInBuffer = currentBuffer.position(); + // Send the remainder buffered bytes at the end when there no more bytes + if (remainingBytesInBuffer > 0 && remainingBytes.get() == remainingBytesInBuffer) { + currentBuffer.clear(); + ByteBuffer trimmedBuffer = ByteBuffer.allocate(remainingBytesInBuffer); + trimmedBuffer.put(currentBuffer.array(), 0, remainingBytesInBuffer); + trimmedBuffer.clear(); + bufferedList.add(trimmedBuffer); + remainingBytes.addAndGet(-remainingBytesInBuffer); + } + return bufferedList; + } + + public interface Builder extends SdkBuilder { + + Builder bufferSize(int bufferSize); + + Builder totalBytes(long totalBytes); + + + } + + private static final class DefaultBuilder implements Builder { + + private Integer bufferSize; + private Long totalBytes; + + @Override + public ChunkBuffer build() { + return new ChunkBuffer(totalBytes, bufferSize); + } + + @Override + public Builder bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + @Override + public Builder totalBytes(long totalBytes) { + this.totalBytes = totalBytes; + return this; + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java index 0e4e1f7569af..5785f9c6778d 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.core.internal.interceptor; +import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength; import java.util.Optional; @@ -100,7 +101,7 @@ private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttp long checksumContentLength, long originalContentLength) { long chunkLength = - calculateStreamContentLength(originalContentLength, ChecksumCalculatingAsyncRequestBody.DEFAULT_CHUNK_SIZE); + calculateStreamContentLength(originalContentLength, DEFAULT_ASYNC_CHUNK_SIZE); return context.httpRequest().copy(r -> r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName()) diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java new file mode 100644 index 000000000000..4d3e83ebfe2d --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.internal.async.ChunkBuffer; +import software.amazon.awssdk.utils.StringUtils; + +class ChunkBufferTest { + + @Test + void builderWithNoTotalSize() { + assertThatThrownBy(() -> ChunkBuffer.builder().build()).isInstanceOf(NullPointerException.class); + } + + @Test + void numberOfChunkMultipleOfTotalBytes() { + String inputString = StringUtils.repeat("*", 25); + + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(5).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + assertThat(r.array()).isEqualTo(StringUtils.repeat("*", 5).getBytes(StandardCharsets.UTF_8)); + }); + assertThat(iteratedCounts.get()).isEqualTo(5); + } + + @Test + void numberOfChunk_Not_MultipleOfTotalBytes() { + int totalBytes = 23; + int bufferSize = 5; + + String inputString = StringUtils.repeat("*", totalBytes); + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + if (iteratedCounts.get() * bufferSize < totalBytes) { + assertThat(r.array()).isEqualTo(StringUtils.repeat("*", bufferSize).getBytes(StandardCharsets.UTF_8)); + } else { + assertThat(r.array()).isEqualTo(StringUtils.repeat("*", 3).getBytes(StandardCharsets.UTF_8)); + + } + }); + } + + @Test + void zeroTotalBytesAsInput() { + String inputString = ""; + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(5).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + }); + assertThat(iteratedCounts.get()).isZero(); + } + +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java index c9b1d7e3b99e..5c297fc3f74a 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.KB; import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.createDataOfSize; -import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.fixedLengthInKbFileWithRandomOrFixedCharacters; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.File; @@ -48,6 +47,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor; +import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.testutils.Waiter; public class AsyncHttpChecksumIntegrationTest extends S3IntegrationTestBase { @@ -69,12 +69,8 @@ public static void setUp() throws Exception { s3HttpAsync = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)) .endpointOverride(URI.create("http://s3." + DEFAULT_REGION + ".amazonaws.com")).build(); - createBucket(BUCKET); - - Waiter.run(() -> s3.headBucket(r -> r.bucket(BUCKET))) - .ignoringException(NoSuchBucketException.class) - .orFail(); + s3.waiter().waitUntilBucketExists(s ->s.bucket(BUCKET)); interceptor.reset(); } @@ -141,7 +137,7 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest @ValueSource(ints = {1*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB}) void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody (int chunkSize) throws IOException { - File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(64, true); + File randomFileOfFixedLength = new RandomTempFile(64 * KB); s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) @@ -170,8 +166,8 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest void asyncHttpsValidUnsignedTrailer_TwoRequests_withDifferentChunkSize_OfFileAsyncFileRequestBody(int chunkSize) throws IOException { - File randomFileOfFixedLengthOne = fixedLengthInKbFileWithRandomOrFixedCharacters(64, true); - File randomFileOfFixedLengthTwo = fixedLengthInKbFileWithRandomOrFixedCharacters(17, true); + File randomFileOfFixedLengthOne = new RandomTempFile(64 * KB); + File randomFileOfFixedLengthTwo = new RandomTempFile(17 * KB); CompletableFuture putObjectFutureOne = s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java index 40be0176ea4c..72d05cc93f69 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.KB; import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.createDataOfSize; -import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.fixedLengthInKbFileWithRandomOrFixedCharacters; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.BufferedReader; @@ -51,11 +50,12 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor; +import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.testutils.Waiter; public class HttpChecksumIntegrationTest extends S3IntegrationTestBase { - public static final int HUGE_MSG_SIZE = 16384 * KB; + public static final int HUGE_MSG_SIZE = 1600 * KB; protected static final String KEY = "some-key"; private static final String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class); public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); @@ -79,10 +79,7 @@ public static void setUp() throws Exception { createBucket(BUCKET); - - Waiter.run(() -> s3.headBucket(r -> r.bucket(BUCKET))) - .ignoringException(NoSuchBucketException.class) - .orFail(); + s3.waiter().waitUntilBucketExists(s ->s.bucket(BUCKET)); interceptor.reset(); } @@ -355,8 +352,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep @Test public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, IOException { - - File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(10, true); + File randomFileOfFixedLength = new RandomTempFile(10 * KB); s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) @@ -382,7 +378,7 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileR @Test public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws IOException { - File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(34, true); + File randomFileOfFixedLength = new RandomTempFile(34 * KB); s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java index 49e36be60681..22a313f4186e 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java @@ -21,12 +21,16 @@ import java.io.PrintWriter; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; +import software.amazon.awssdk.core.checksums.Algorithm; +import software.amazon.awssdk.core.checksums.SdkChecksum; +import software.amazon.awssdk.utils.BinaryUtils; /** * Utilities for computing the SHA-256 checksums of various binary objects. @@ -76,20 +80,12 @@ private static MessageDigest createMessageDigest() { } } - public static File fixedLengthInKbFileWithRandomOrFixedCharacters(int sizeInKb, boolean isRandom) throws IOException { - File tempFile = File.createTempFile("temp-random-sdk-file-", ".tmp"); - try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) { - PrintWriter writer = new PrintWriter(tempFile, "UTF-8"); - int objectSize = sizeInKb * 1024; - Random random = new Random(); - for (int index = 0; index < objectSize; index++) { - int offset = isRandom ? random.nextInt(26) : 0; - writer.print(index % 5 == 0 ? ' ' : (char) ('a' + offset)); - } - writer.flush(); + public static String calculatedChecksum(String contentString, Algorithm algorithm) { + SdkChecksum sdkChecksum = SdkChecksum.forAlgorithm(algorithm); + for (byte character : contentString.getBytes(StandardCharsets.UTF_8)) { + sdkChecksum.update(character); } - tempFile.deleteOnExit(); - return tempFile; + return BinaryUtils.toBase64(sdkChecksum.getChecksumBytes()); } public static String createDataOfSize(int dataSize, char contentCharacter) { diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java index 603ff7325153..fb95a8d0bc57 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java @@ -35,12 +35,10 @@ import com.github.tomakehurst.wiremock.verification.LoggedRequest; import java.io.File; import java.io.IOException; -import java.io.PrintWriter; -import java.io.RandomAccessFile; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.List; -import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.After; @@ -53,12 +51,16 @@ import software.amazon.awssdk.core.HttpChecksumConstant; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.checksums.Algorithm; +import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; import software.amazon.awssdk.services.protocolrestjson.model.ChecksumAlgorithm; import software.amazon.awssdk.testutils.EnvironmentVariableHelper; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.utils.BinaryUtils; public class AsyncRequestBodyFlexibleChecksumInTrailerTest { public static final int KB = 1024; @@ -72,6 +74,10 @@ public class AsyncRequestBodyFlexibleChecksumInTrailerTest { private ProtocolRestJsonAsyncClient asyncClient; private ProtocolRestJsonAsyncClient asyncClientWithSigner; + public static String createDataOfSize(int dataSize, char contentCharacter) { + return IntStream.range(0, dataSize).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); + } + @Before public void setupClient() { asyncClientWithSigner = ProtocolRestJsonAsyncClient.builder() @@ -132,14 +138,14 @@ public void asyncStreaming_withRetry_NoSigner_shouldContainChecksum_fromIntercep expectedRequestBody))); } - @Test public void asyncStreaming_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsChecksums_fromInterceptors() throws IOException { stubForFailureThenSuccess(500, "500"); - - File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(37, false); + File randomFileOfFixedLength = new RandomTempFile(37 * KB); String contentString = new String(Files.readAllBytes(randomFileOfFixedLength.toPath())); + String expectedChecksum = calculatedChecksum(contentString, Algorithm.CRC32); + asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) .chunkSizeInBytes(16 * KB) @@ -152,16 +158,18 @@ public void asyncStreaming_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsC + "4000" + CRLF + contentString.substring(16 * KB, 32 * KB) + CRLF + "1400" + CRLF + contentString.substring(32 * KB) + CRLF + "0" + CRLF - + "x-amz-checksum-crc32:wQiPHw==" + CRLF + CRLF))); + + "x-amz-checksum-crc32:" + expectedChecksum + CRLF + CRLF))); } @Test public void asyncStreaming_withRetry_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsChecksums_fromInterceptors() throws IOException { - File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(37, false); + File randomFileOfFixedLength = new RandomTempFile(37 * KB); String contentString = new String(Files.readAllBytes(randomFileOfFixedLength.toPath())); + String expectedChecksum = calculatedChecksum(contentString, Algorithm.CRC32); stubResponseWithHeaders(); + asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) .chunkSizeInBytes(16 * KB) @@ -174,9 +182,16 @@ public void asyncStreaming_withRetry_FromAsyncRequestBody_VariableChunkSize_NoSi + "4000" + CRLF + contentString.substring(16 * KB, 32 * KB) + CRLF + "1400" + CRLF + contentString.substring(32 * KB) + CRLF + "0" + CRLF - + "x-amz-checksum-crc32:wQiPHw==" + CRLF + CRLF))); + + "x-amz-checksum-crc32:" + expectedChecksum + CRLF + CRLF))); } + private String calculatedChecksum(String contentString, Algorithm algorithm) { + SdkChecksum sdkChecksum = SdkChecksum.forAlgorithm(algorithm); + for (byte character : contentString.getBytes(StandardCharsets.UTF_8)) { + sdkChecksum.update(character); + } + return BinaryUtils.toBase64(sdkChecksum.getChecksumBytes()); + } private void stubResponseWithHeaders() { stubFor(put(anyUrl()) @@ -224,25 +239,4 @@ private void verifyHeadersForPutRequest(String contentLength, String decodedCont } - - public static File fixedLengthInKbFileWithRandomOrFixedCharacters(int sizeInKb, boolean isRandom) throws IOException { - File tempFile = File.createTempFile("temp-random-sdk-file-", ".tmp"); - try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) { - PrintWriter writer = new PrintWriter(tempFile, "UTF-8"); - int objectSize = sizeInKb * 1024; - Random random = new Random(); - for (int index = 0; index < objectSize; index++) { - int offset = isRandom ? random.nextInt(26) : 0; - writer.print(index % 5 == 0 ? ' ' : (char) ('a' + offset)); - } - writer.flush(); - } - tempFile.deleteOnExit(); - return tempFile; - } - - public static String createDataOfSize(int dataSize, char contentCharacter) { - return IntStream.range(0, dataSize).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); - } - } \ No newline at end of file diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java index a9c54bc98f5b..bf68ee568789 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java @@ -93,13 +93,10 @@ public void request(long l) { handleStateUpdate(); } - @Override public void cancel() { subscription.cancel(); } - - }); } @@ -147,8 +144,6 @@ private void addDownstreamDemand(long l) { upstreamSubscription.cancel(); onError(new IllegalArgumentException("Demand must not be negative")); } - - } /** From 71aab4f5281785a34fa561c4455b022f2d2c1490 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Wed, 14 Sep 2022 08:36:20 -0700 Subject: [PATCH 5/6] Creating a SynchronousBuffer for mapper , also handling ZeroByte s3 put Object --- .../ChecksumCalculatingAsyncRequestBody.java | 23 ++++--- .../core/internal/async/ChunkBuffer.java | 8 ++- .../awssdk/core/async/ChunkBufferTest.java | 35 ++++++++-- .../AsyncHttpChecksumIntegrationTest.java | 65 +++++++++++++++---- 4 files changed, 103 insertions(+), 28 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index ccb3a57b3b41..9a461f1337d8 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -150,7 +150,8 @@ public void subscribe(Subscriber s) { sdkChecksum.reset(); } - wrapped.flatMapIterable(this::buffer) + SynchronousChunkBuffer synchronousChunkBuffer = new SynchronousChunkBuffer(totalBytes); + wrapped.flatMapIterable(synchronousChunkBuffer::buffer) .subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes)); } @@ -205,7 +206,7 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) { ByteBuffer finalChunkedByteBuffer = createChunk(ByteBuffer.wrap(FINAL_BYTE), true); ByteBuffer checksumTrailerByteBuffer = createChecksumTrailer( BinaryUtils.toBase64(checksumBytes), trailerHeader); - ByteBuffer contentChunk = createChunk(byteBuffer, false); + ByteBuffer contentChunk = byteBuffer.hasRemaining() ? createChunk(byteBuffer, false) : byteBuffer; ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate( contentChunk.remaining() @@ -230,13 +231,17 @@ public void onComplete() { } } - private Iterable buffer(ByteBuffer bytes) { - ChunkBuffer chunkBuffer = ChunkBuffer.builder() - .bufferSize(DEFAULT_ASYNC_CHUNK_SIZE) - .totalBytes(bytes.remaining()) - .build(); - chunkBuffer.bufferAndCreateChunks(bytes); - return chunkBuffer.getBufferedList(); + private static final class SynchronousChunkBuffer { + private final ChunkBuffer chunkBuffer; + + SynchronousChunkBuffer(long totalBytes) { + this.chunkBuffer = ChunkBuffer.builder().bufferSize(DEFAULT_ASYNC_CHUNK_SIZE).totalBytes(totalBytes).build(); + } + + private Iterable buffer(ByteBuffer bytes) { + chunkBuffer.bufferAndCreateChunks(bytes); + return chunkBuffer.getBufferedList(); + } } } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java index c7f1cc5eefea..c5a7d28c883a 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java @@ -92,8 +92,12 @@ public Iterable bufferAndCreateChunks(ByteBuffer buffer) { } while (startPosition < currentBytesRead); int remainingBytesInBuffer = currentBuffer.position(); - // Send the remainder buffered bytes at the end when there no more bytes - if (remainingBytesInBuffer > 0 && remainingBytes.get() == remainingBytesInBuffer) { + + // Send the remaining buffer when + // 1. remainingBytes in buffer are same as the last few bytes to be read. + // 2. If it is a zero byte and the last byte to be read. + if (remainingBytes.get() == remainingBytesInBuffer && + (buffer.remaining() == 0 || remainingBytesInBuffer > 0)) { currentBuffer.clear(); ByteBuffer trimmedBuffer = ByteBuffer.allocate(remainingBytesInBuffer); trimmedBuffer.put(currentBuffer.array(), 0, remainingBytesInBuffer); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java index 4d3e83ebfe2d..de465c81ec49 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java @@ -73,18 +73,43 @@ void numberOfChunk_Not_MultipleOfTotalBytes() { } @Test - void zeroTotalBytesAsInput() { - String inputString = ""; + void zeroTotalBytesAsInput_returnsZeroByte() { + byte[] zeroByte = new byte[0]; ChunkBuffer chunkBuffer = - ChunkBuffer.builder().bufferSize(5).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build(); + ChunkBuffer.builder().bufferSize(5).totalBytes(zeroByte.length).build(); Iterable byteBuffers = - chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(zeroByte)); AtomicInteger iteratedCounts = new AtomicInteger(); byteBuffers.forEach(r -> { iteratedCounts.getAndIncrement(); }); - assertThat(iteratedCounts.get()).isZero(); + assertThat(iteratedCounts.get()).isEqualTo(1); } + @Test + void emptyAllocatedBytes_returnSameNumberOfEmptyBytes() { + + int totalBytes = 17; + int bufferSize = 5; + ByteBuffer wrap = ByteBuffer.allocate(totalBytes); + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(wrap.remaining()).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(wrap); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + if (iteratedCounts.get() * bufferSize < totalBytes) { + // array of empty bytes + assertThat(r.array()).isEqualTo(ByteBuffer.allocate(bufferSize).array()); + } else { + assertThat(r.array()).isEqualTo(ByteBuffer.allocate(totalBytes % bufferSize).array()); + } + }); + assertThat(iteratedCounts.get()).isEqualTo(4); + } } + + diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java index 5c297fc3f74a..e4aa6d9d9249 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java @@ -23,8 +23,10 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -43,12 +45,10 @@ import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.ChecksumMode; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.NoSuchBucketException; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor; import software.amazon.awssdk.testutils.RandomTempFile; -import software.amazon.awssdk.testutils.Waiter; public class AsyncHttpChecksumIntegrationTest extends S3IntegrationTestBase { @@ -57,12 +57,10 @@ public class AsyncHttpChecksumIntegrationTest extends S3IntegrationTestBase { public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); protected static S3AsyncClient s3HttpAsync; - @BeforeAll public static void setUp() throws Exception { s3 = s3ClientBuilder().build(); - s3Async = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)).build(); // Http Client to generate Signed request @@ -70,10 +68,14 @@ public static void setUp() throws Exception { .endpointOverride(URI.create("http://s3." + DEFAULT_REGION + ".amazonaws.com")).build(); createBucket(BUCKET); - s3.waiter().waitUntilBucketExists(s ->s.bucket(BUCKET)); + s3.waiter().waitUntilBucketExists(s -> s.bucket(BUCKET)); interceptor.reset(); } + @AfterEach + public void clear() { + interceptor.reset(); + } @Test void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() { @@ -81,7 +83,6 @@ void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() { .bucket(BUCKET) .key(KEY) .overrideConfiguration(o -> o.signer(DefaultAwsCrtS3V4aSigner.create())) - .checksumAlgorithm(ChecksumAlgorithm.CRC32) .build(), AsyncRequestBody.fromString("Hello world")).join(); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); @@ -114,14 +115,13 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallReques @ParameterizedTest - //createDataOfSizeInKb already creates data in kb - @ValueSource(ints = {1*KB, 3*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB}) + @ValueSource(ints = {1 * KB, 3 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB}) void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody(int dataSize) throws InterruptedException { s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString(createDataOfSize(dataSize, 'a'))).join(); + .build(), AsyncRequestBody.fromString(createDataOfSize(64 * KB, 'a'))).join(); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -130,14 +130,14 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo(createDataOfSize(dataSize, 'a')); + assertThat(response).isEqualTo(createDataOfSize(64 * KB, 'a')); } @ParameterizedTest - @ValueSource(ints = {1*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB}) + @ValueSource(ints = {1 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB, 65 * KB}) void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody (int chunkSize) throws IOException { - File randomFileOfFixedLength = new RandomTempFile(64 * KB); + File randomFileOfFixedLength = new RandomTempFile(32 * KB + 23); s3Async.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) @@ -224,6 +224,47 @@ void asyncValidSignedTrailerChecksumCalculatedBySdkClient() { assertThat(response).isEqualTo("Hello world"); } + @Test + public void putObject_with_bufferCreatedFromEmptyString() { + + s3HttpAsync.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString("")) + .join(); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY) + .checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join() + .asUtf8String(); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(""); + } + + @Test + public void putObject_with_bufferCreatedFromZeroCapacityByteBuffer() { + ByteBuffer content = ByteBuffer.allocate(0); + s3HttpAsync.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromByteBuffer(content)) + .join(); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + + String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY) + .checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join() + .asUtf8String(); + + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(""); + } } From e671bebc4c355f27ee850c2c7441eb1a0269c6c0 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Wed, 14 Sep 2022 17:02:08 -0700 Subject: [PATCH 6/6] Made ChunkBuffer synchronized --- .../ChecksumCalculatingAsyncRequestBody.java | 3 +- .../core/internal/async/ChunkBuffer.java | 19 +--- .../awssdk/core/async/ChunkBufferTest.java | 87 +++++++++++++++++++ 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 9a461f1337d8..101aed7b7441 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -239,8 +239,7 @@ private static final class SynchronousChunkBuffer { } private Iterable buffer(ByteBuffer bytes) { - chunkBuffer.bufferAndCreateChunks(bytes); - return chunkBuffer.getBufferedList(); + return chunkBuffer.bufferAndCreateChunks(bytes); } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java index c5a7d28c883a..8fd7f0260b76 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -34,8 +33,6 @@ public final class ChunkBuffer { private final AtomicLong remainingBytes; private final ByteBuffer currentBuffer; private final int bufferSize; - private List bufferedList; - private ChunkBuffer(Long totalBytes, Integer bufferSize) { Validate.notNull(totalBytes, "The totalBytes must not be null"); @@ -44,29 +41,19 @@ private ChunkBuffer(Long totalBytes, Integer bufferSize) { this.bufferSize = chunkSize; this.currentBuffer = ByteBuffer.allocate(chunkSize); this.remainingBytes = new AtomicLong(totalBytes); - bufferedList = new ArrayList<>(); } - public static Builder builder() { return new DefaultBuilder(); } - public List getBufferedList() { - if (currentBuffer == null) { - throw new IllegalStateException(""); - } - List ret = bufferedList; - bufferedList = new ArrayList<>(); - return Collections.unmodifiableList(ret); - } - public Iterable bufferAndCreateChunks(ByteBuffer buffer) { + // currentBuffer and bufferedList can get over written if concurrent Threads calls this method at the same time. + public synchronized Iterable bufferAndCreateChunks(ByteBuffer buffer) { int startPosition = 0; + List bufferedList = new ArrayList<>(); int currentBytesRead = buffer.remaining(); - do { - int bufferedBytes = currentBuffer.position(); int availableToRead = bufferSize - bufferedBytes; int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java index de465c81ec49..8b73402dc468 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java @@ -20,7 +20,15 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.internal.async.ChunkBuffer; import software.amazon.awssdk.utils.StringUtils; @@ -110,6 +118,85 @@ void emptyAllocatedBytes_returnSameNumberOfEmptyBytes() { }); assertThat(iteratedCounts.get()).isEqualTo(4); } + + + /** + * * Total bytes 11(ChunkSize) 3 (threads) + * * Buffering Size of 5 + * threadOne 22222222222 + * threadTwo 33333333333 + * threadThree 11111111111 + + * + * * Streaming sequence as below + * * + * start 22222222222 + * 22222 + * 22222 + * end 22222222222 + * * + * start streaming 33333333333 + * 2 is from previous sequence which is buffered + * 23333 + * 33333 + * end 33333333333 + * * + * start 11111111111 + * 33 is from previous sequence which is buffered * + * 33111 + * 11111 + * 111 + * end 11111111111 + * 111 is given as output since we consumed all the total bytes* + */ + @Test + void concurrentTreads_calling_bufferAndCreateChunks() throws ExecutionException, InterruptedException { + int totalBytes = 17; + int bufferSize = 5; + int threads = 8; + + ByteBuffer wrap = ByteBuffer.allocate(totalBytes); + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(wrap.remaining() * threads).build(); + + ExecutorService service = Executors.newFixedThreadPool(threads); + + Collection> futures; + + AtomicInteger counter = new AtomicInteger(0); + + futures = IntStream.range(0, threads).>mapToObj(t -> service.submit(() -> { + String inputString = StringUtils.repeat(Integer.toString(counter.incrementAndGet()), totalBytes); + return chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + })).collect(Collectors.toCollection(() -> new ArrayList<>(threads))); + + AtomicInteger filledBuffers = new AtomicInteger(0); + AtomicInteger remainderBytesBuffers = new AtomicInteger(0); + AtomicInteger otherSizeBuffers = new AtomicInteger(0); + AtomicInteger remainderBytes = new AtomicInteger(0); + + for (Future bufferedFuture : futures) { + Iterable buffers = bufferedFuture.get(); + buffers.forEach(b -> { + if (b.remaining() == bufferSize) { + filledBuffers.incrementAndGet(); + } else if (b.remaining() == ((totalBytes * threads) % bufferSize)) { + remainderBytesBuffers.incrementAndGet(); + remainderBytes.set(b.remaining()); + + } else { + otherSizeBuffers.incrementAndGet(); + } + + }); + } + + assertThat(filledBuffers.get()).isEqualTo((totalBytes * threads) / bufferSize); + assertThat(remainderBytes.get()).isEqualTo((totalBytes * threads) % bufferSize); + assertThat(remainderBytesBuffers.get()).isOne(); + assertThat(otherSizeBuffers.get()).isZero(); + } + }