diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json b/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json new file mode 100644 index 000000000000..74e753aa0cb5 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async Request body." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java index ff4f573f4173..8856ab0ed9d0 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java @@ -39,6 +39,11 @@ public final class HttpChecksumConstant { public static final String HEADER_FOR_TRAILER_REFERENCE = "x-amz-trailer"; + /** + * Default chunk size for Async trailer based checksum data transfer* + */ + public static final int DEFAULT_ASYNC_CHUNK_SIZE = 16 * 1024; + private HttpChecksumConstant() { } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java index 7bc1ca43b817..101aed7b7441 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.core.internal.async; +import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChunkLength; import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChecksumTrailer; @@ -41,12 +42,12 @@ @SdkInternalApi public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody { - public static final byte[] FINAL_BYTE = new byte[0]; + private static final byte[] FINAL_BYTE = new byte[0]; private final AsyncRequestBody wrapped; private final SdkChecksum sdkChecksum; private final Algorithm algorithm; private final String trailerHeader; - private final AtomicLong remainingBytes; + private final long totalBytes; private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) { @@ -57,8 +58,8 @@ private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) { this.algorithm = builder.algorithm; this.sdkChecksum = builder.algorithm != null ? SdkChecksum.forAlgorithm(algorithm) : null; this.trailerHeader = builder.trailerHeader; - this.remainingBytes = new AtomicLong(wrapped.contentLength() - .orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied."))); + this.totalBytes = wrapped.contentLength() + .orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied.")); } /** @@ -148,7 +149,10 @@ public void subscribe(Subscriber s) { if (sdkChecksum != null) { sdkChecksum.reset(); } - wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes)); + + SynchronousChunkBuffer synchronousChunkBuffer = new SynchronousChunkBuffer(totalBytes); + wrapped.flatMapIterable(synchronousChunkBuffer::buffer) + .subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes)); } private static final class ChecksumCalculatingSubscriber implements Subscriber { @@ -162,11 +166,11 @@ private static final class ChecksumCalculatingSubscriber implements Subscriber wrapped, SdkChecksum checksum, - String trailerHeader, AtomicLong remainingBytes) { + String trailerHeader, long totalBytes) { this.wrapped = wrapped; this.checksum = checksum; this.trailerHeader = trailerHeader; - this.remainingBytes = remainingBytes; + this.remainingBytes = new AtomicLong(totalBytes); } @Override @@ -189,7 +193,8 @@ public void onNext(ByteBuffer byteBuffer) { ByteBuffer allocatedBuffer = getFinalChecksumAppendedChunk(byteBuffer); wrapped.onNext(allocatedBuffer); } else { - wrapped.onNext(byteBuffer); + ByteBuffer allocatedBuffer = createChunk(byteBuffer, false); + wrapped.onNext(allocatedBuffer); } } catch (SdkException sdkException) { this.subscription.cancel(); @@ -201,7 +206,7 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) { ByteBuffer finalChunkedByteBuffer = createChunk(ByteBuffer.wrap(FINAL_BYTE), true); ByteBuffer checksumTrailerByteBuffer = createChecksumTrailer( BinaryUtils.toBase64(checksumBytes), trailerHeader); - ByteBuffer contentChunk = createChunk(byteBuffer, false); + ByteBuffer contentChunk = byteBuffer.hasRemaining() ? createChunk(byteBuffer, false) : byteBuffer; ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate( contentChunk.remaining() @@ -225,4 +230,17 @@ public void onComplete() { wrapped.onComplete(); } } + + private static final class SynchronousChunkBuffer { + private final ChunkBuffer chunkBuffer; + + SynchronousChunkBuffer(long totalBytes) { + this.chunkBuffer = ChunkBuffer.builder().bufferSize(DEFAULT_ASYNC_CHUNK_SIZE).totalBytes(totalBytes).build(); + } + + private Iterable buffer(ByteBuffer bytes) { + return chunkBuffer.bufferAndCreateChunks(bytes); + } + } + } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java new file mode 100644 index 000000000000..8fd7f0260b76 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java @@ -0,0 +1,129 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.builder.SdkBuilder; + +/** + * Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize* + */ +@SdkInternalApi +public final class ChunkBuffer { + private final AtomicLong remainingBytes; + private final ByteBuffer currentBuffer; + private final int bufferSize; + + private ChunkBuffer(Long totalBytes, Integer bufferSize) { + Validate.notNull(totalBytes, "The totalBytes must not be null"); + + int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE; + this.bufferSize = chunkSize; + this.currentBuffer = ByteBuffer.allocate(chunkSize); + this.remainingBytes = new AtomicLong(totalBytes); + } + + public static Builder builder() { + return new DefaultBuilder(); + } + + + // currentBuffer and bufferedList can get over written if concurrent Threads calls this method at the same time. + public synchronized Iterable bufferAndCreateChunks(ByteBuffer buffer) { + int startPosition = 0; + List bufferedList = new ArrayList<>(); + int currentBytesRead = buffer.remaining(); + do { + int bufferedBytes = currentBuffer.position(); + int availableToRead = bufferSize - bufferedBytes; + int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); + + if (bufferedBytes == 0) { + currentBuffer.put(buffer.array(), startPosition, bytesToMove); + } else { + currentBuffer.put(buffer.array(), 0, bytesToMove); + } + + startPosition = startPosition + bytesToMove; + + // Send the data once the buffer is full + if (currentBuffer.position() == bufferSize) { + currentBuffer.position(0); + ByteBuffer bufferToSend = ByteBuffer.allocate(bufferSize); + bufferToSend.put(currentBuffer.array(), 0, bufferSize); + bufferToSend.clear(); + currentBuffer.clear(); + bufferedList.add(bufferToSend); + remainingBytes.addAndGet(-bufferSize); + } + } while (startPosition < currentBytesRead); + + int remainingBytesInBuffer = currentBuffer.position(); + + // Send the remaining buffer when + // 1. remainingBytes in buffer are same as the last few bytes to be read. + // 2. If it is a zero byte and the last byte to be read. + if (remainingBytes.get() == remainingBytesInBuffer && + (buffer.remaining() == 0 || remainingBytesInBuffer > 0)) { + currentBuffer.clear(); + ByteBuffer trimmedBuffer = ByteBuffer.allocate(remainingBytesInBuffer); + trimmedBuffer.put(currentBuffer.array(), 0, remainingBytesInBuffer); + trimmedBuffer.clear(); + bufferedList.add(trimmedBuffer); + remainingBytes.addAndGet(-remainingBytesInBuffer); + } + return bufferedList; + } + + public interface Builder extends SdkBuilder { + + Builder bufferSize(int bufferSize); + + Builder totalBytes(long totalBytes); + + + } + + private static final class DefaultBuilder implements Builder { + + private Integer bufferSize; + private Long totalBytes; + + @Override + public ChunkBuffer build() { + return new ChunkBuffer(totalBytes, bufferSize); + } + + @Override + public Builder bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + @Override + public Builder totalBytes(long totalBytes) { + this.totalBytes = totalBytes; + return this; + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java index ea7113043262..5785f9c6778d 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java @@ -15,6 +15,9 @@ package software.amazon.awssdk.core.internal.interceptor; +import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE; +import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength; + import java.util.Optional; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.ClientType; @@ -97,7 +100,9 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttpRequest context, ChecksumSpecs checksum, long checksumContentLength, long originalContentLength) { - long chunkLength = ChunkContentUtils.calculateChunkLength(originalContentLength); + long chunkLength = + calculateStreamContentLength(originalContentLength, DEFAULT_ASYNC_CHUNK_SIZE); + return context.httpRequest().copy(r -> r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName()) .putHeader("Content-encoding", HttpChecksumConstant.AWS_CHUNKED_HEADER) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java index 57b125f8caf2..b76e20653d2c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java @@ -29,6 +29,7 @@ import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.util.HttpChecksumResolver; import software.amazon.awssdk.core.internal.util.HttpChecksumUtils; @@ -73,7 +74,7 @@ public Optional modifyHttpContent(Context.ModifyHttpRequest context RequestBody.fromContentProvider( streamProvider, AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength( - requestBody.optionalContentLength().orElse(0L)) + requestBody.optionalContentLength().orElse(0L), AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE) + checksumContentLength, requestBody.contentType())); } @@ -112,7 +113,8 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, Execu .putHeader("x-amz-decoded-content-length", Long.toString(originalContentLength)) .putHeader(CONTENT_LENGTH, Long.toString(AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength( - originalContentLength) + checksumContentLength))); + originalContentLength, AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE) + + checksumContentLength))); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java index d98deb0e229e..9dee93e70a32 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java @@ -39,7 +39,7 @@ @SdkInternalApi public abstract class AwsChunkedEncodingInputStream extends SdkInputStream { - protected static final int DEFAULT_CHUNK_SIZE = 128 * 1024; + public static final int DEFAULT_CHUNK_SIZE = 128 * 1024; protected static final int SKIP_BUFFER_SIZE = 256 * 1024; protected static final String CRLF = "\r\n"; protected static final byte[] FINAL_CHUNK = new byte[0]; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java index 4c04a1b644ec..186ca5d7d0d8 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java @@ -48,12 +48,10 @@ public static Builder builder() { * @return Content length of the trailer that will be appended at the end. */ public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) { - int checksumLength = algorithm.base64EncodedLength(); - - return (headerName.length() - + HEADER_COLON_SEPARATOR.length() - + checksumLength - + CRLF.length()); + return headerName.length() + + HEADER_COLON_SEPARATOR.length() + + algorithm.base64EncodedLength().longValue() + + CRLF.length() + CRLF.length(); } /** @@ -68,17 +66,20 @@ private static long calculateChunkLength(long originalContentLength) { + CRLF.length(); } - public static long calculateStreamContentLength(long originalLength) { - if (originalLength < 0) { - throw new IllegalArgumentException("Non negative content length expected."); + public static long calculateStreamContentLength(long originalLength, long defaultChunkSize) { + if (originalLength < 0 || defaultChunkSize == 0) { + throw new IllegalArgumentException(originalLength + ", " + defaultChunkSize + "Args <= 0 not expected"); } - long maxSizeChunks = originalLength / DEFAULT_CHUNK_SIZE; - long remainingBytes = originalLength % DEFAULT_CHUNK_SIZE; + long maxSizeChunks = originalLength / defaultChunkSize; + long remainingBytes = originalLength % defaultChunkSize; + + long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize); + long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0; + // last byte is composed of a "0" and "\r\n" + long lastByteSize = 1 + (long) CRLF.length(); - return maxSizeChunks * calculateChunkLength(DEFAULT_CHUNK_SIZE) - + (remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0) - + calculateChunkLength(0); + return allChunks + remainingInChunk + lastByteSize; } @Override diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java new file mode 100644 index 000000000000..8b73402dc468 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ChunkBufferTest.java @@ -0,0 +1,202 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.internal.async.ChunkBuffer; +import software.amazon.awssdk.utils.StringUtils; + +class ChunkBufferTest { + + @Test + void builderWithNoTotalSize() { + assertThatThrownBy(() -> ChunkBuffer.builder().build()).isInstanceOf(NullPointerException.class); + } + + @Test + void numberOfChunkMultipleOfTotalBytes() { + String inputString = StringUtils.repeat("*", 25); + + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(5).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + assertThat(r.array()).isEqualTo(StringUtils.repeat("*", 5).getBytes(StandardCharsets.UTF_8)); + }); + assertThat(iteratedCounts.get()).isEqualTo(5); + } + + @Test + void numberOfChunk_Not_MultipleOfTotalBytes() { + int totalBytes = 23; + int bufferSize = 5; + + String inputString = StringUtils.repeat("*", totalBytes); + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + if (iteratedCounts.get() * bufferSize < totalBytes) { + assertThat(r.array()).isEqualTo(StringUtils.repeat("*", bufferSize).getBytes(StandardCharsets.UTF_8)); + } else { + assertThat(r.array()).isEqualTo(StringUtils.repeat("*", 3).getBytes(StandardCharsets.UTF_8)); + + } + }); + } + + @Test + void zeroTotalBytesAsInput_returnsZeroByte() { + byte[] zeroByte = new byte[0]; + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(5).totalBytes(zeroByte.length).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(zeroByte)); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + }); + assertThat(iteratedCounts.get()).isEqualTo(1); + } + + @Test + void emptyAllocatedBytes_returnSameNumberOfEmptyBytes() { + + int totalBytes = 17; + int bufferSize = 5; + ByteBuffer wrap = ByteBuffer.allocate(totalBytes); + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(wrap.remaining()).build(); + Iterable byteBuffers = + chunkBuffer.bufferAndCreateChunks(wrap); + + AtomicInteger iteratedCounts = new AtomicInteger(); + byteBuffers.forEach(r -> { + iteratedCounts.getAndIncrement(); + if (iteratedCounts.get() * bufferSize < totalBytes) { + // array of empty bytes + assertThat(r.array()).isEqualTo(ByteBuffer.allocate(bufferSize).array()); + } else { + assertThat(r.array()).isEqualTo(ByteBuffer.allocate(totalBytes % bufferSize).array()); + } + }); + assertThat(iteratedCounts.get()).isEqualTo(4); + } + + + /** + * * Total bytes 11(ChunkSize) 3 (threads) + * * Buffering Size of 5 + * threadOne 22222222222 + * threadTwo 33333333333 + * threadThree 11111111111 + + * + * * Streaming sequence as below + * * + * start 22222222222 + * 22222 + * 22222 + * end 22222222222 + * * + * start streaming 33333333333 + * 2 is from previous sequence which is buffered + * 23333 + * 33333 + * end 33333333333 + * * + * start 11111111111 + * 33 is from previous sequence which is buffered * + * 33111 + * 11111 + * 111 + * end 11111111111 + * 111 is given as output since we consumed all the total bytes* + */ + @Test + void concurrentTreads_calling_bufferAndCreateChunks() throws ExecutionException, InterruptedException { + int totalBytes = 17; + int bufferSize = 5; + int threads = 8; + + ByteBuffer wrap = ByteBuffer.allocate(totalBytes); + ChunkBuffer chunkBuffer = + ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(wrap.remaining() * threads).build(); + + ExecutorService service = Executors.newFixedThreadPool(threads); + + Collection> futures; + + AtomicInteger counter = new AtomicInteger(0); + + futures = IntStream.range(0, threads).>mapToObj(t -> service.submit(() -> { + String inputString = StringUtils.repeat(Integer.toString(counter.incrementAndGet()), totalBytes); + return chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8))); + })).collect(Collectors.toCollection(() -> new ArrayList<>(threads))); + + AtomicInteger filledBuffers = new AtomicInteger(0); + AtomicInteger remainderBytesBuffers = new AtomicInteger(0); + AtomicInteger otherSizeBuffers = new AtomicInteger(0); + AtomicInteger remainderBytes = new AtomicInteger(0); + + for (Future bufferedFuture : futures) { + Iterable buffers = bufferedFuture.get(); + buffers.forEach(b -> { + if (b.remaining() == bufferSize) { + filledBuffers.incrementAndGet(); + } else if (b.remaining() == ((totalBytes * threads) % bufferSize)) { + remainderBytesBuffers.incrementAndGet(); + remainderBytes.set(b.remaining()); + + } else { + otherSizeBuffers.incrementAndGet(); + } + + }); + } + + assertThat(filledBuffers.get()).isEqualTo((totalBytes * threads) / bufferSize); + assertThat(remainderBytes.get()).isEqualTo((totalBytes * threads) % bufferSize); + assertThat(remainderBytesBuffers.get()).isOne(); + assertThat(otherSizeBuffers.get()).isZero(); + } + +} + + diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java index 7f212dbb7094..0fa862dd2acb 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java @@ -55,11 +55,12 @@ public void readAwsUnsignedChunkedEncodingInputStream() throws IOException { public void lengthsOfCalculateByChecksumCalculatingInputStream(){ String initialString = "Hello world"; - long calculateChunkLength = AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(initialString.length()); + long calculateChunkLength = AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(initialString.length(), + AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE); long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength( SHA256_ALGORITHM, SHA256_HEADER_NAME); - assertThat(calculateChunkLength).isEqualTo(21); - assertThat(checksumContentLength).isEqualTo(69); + assertThat(calculateChunkLength).isEqualTo(19); + assertThat(checksumContentLength).isEqualTo(71); } @Test diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java new file mode 100644 index 000000000000..e4aa6d9d9249 --- /dev/null +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java @@ -0,0 +1,270 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.checksum; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.KB; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.createDataOfSize; +import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute; +import software.amazon.awssdk.authcrt.signer.internal.DefaultAwsCrtS3V4aSigner; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.checksums.Algorithm; +import software.amazon.awssdk.core.checksums.ChecksumValidation; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor; +import software.amazon.awssdk.testutils.RandomTempFile; + +public class AsyncHttpChecksumIntegrationTest extends S3IntegrationTestBase { + + protected static final String KEY = "some-key"; + private static final String BUCKET = temporaryBucketName(AsyncHttpChecksumIntegrationTest.class); + public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); + protected static S3AsyncClient s3HttpAsync; + + @BeforeAll + public static void setUp() throws Exception { + + s3 = s3ClientBuilder().build(); + s3Async = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)).build(); + + // Http Client to generate Signed request + s3HttpAsync = s3AsyncClientBuilder().overrideConfiguration(o -> o.addExecutionInterceptor(interceptor)) + .endpointOverride(URI.create("http://s3." + DEFAULT_REGION + ".amazonaws.com")).build(); + + createBucket(BUCKET); + s3.waiter().waitUntilBucketExists(s -> s.bucket(BUCKET)); + interceptor.reset(); + } + + @AfterEach + public void clear() { + interceptor.reset(); + } + + @Test + void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .overrideConfiguration(o -> o.signer(DefaultAwsCrtS3V4aSigner.create())) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString("Hello world")).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo("Hello world"); + } + + @Test + void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallRequestBody() throws InterruptedException { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString("Hello world")).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo("Hello world"); + } + + + @ParameterizedTest + @ValueSource(ints = {1 * KB, 3 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB}) + void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody(int dataSize) throws InterruptedException { + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString(createDataOfSize(64 * KB, 'a'))).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(createDataOfSize(64 * KB, 'a')); + } + + @ParameterizedTest + @ValueSource(ints = {1 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB, 65 * KB}) + void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody + (int chunkSize) throws IOException { + File randomFileOfFixedLength = new RandomTempFile(32 * KB + 23); + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) + .chunkSizeInBytes(chunkSize) + .build()).join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(response).isEqualTo(new String(bytes)); + } + + /** + * Test two async call made back to back with different sizes parameterized to test for different chunk sizes + */ + @ParameterizedTest + @ValueSource(ints = {1 * KB, 12 * KB, 16 * KB, 17 * KB, 32 * KB, 33 * KB}) + void asyncHttpsValidUnsignedTrailer_TwoRequests_withDifferentChunkSize_OfFileAsyncFileRequestBody(int chunkSize) + throws IOException { + + File randomFileOfFixedLengthOne = new RandomTempFile(64 * KB); + File randomFileOfFixedLengthTwo = new RandomTempFile(17 * KB); + CompletableFuture putObjectFutureOne = + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), + FileAsyncRequestBody.builder().path(randomFileOfFixedLengthOne.toPath()).chunkSizeInBytes(chunkSize).build()); + + String keyTwo = KEY + "_two"; + CompletableFuture putObjectFutureTwo = + s3Async.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(keyTwo) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), + FileAsyncRequestBody.builder().path(randomFileOfFixedLengthTwo.toPath()).chunkSizeInBytes(chunkSize).build()); + + putObjectFutureOne.join(); + putObjectFutureTwo.join(); + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + + String responseTwo = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(keyTwo).checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); + + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + + assertThat(response).isEqualTo(new String(Files.readAllBytes(randomFileOfFixedLengthOne.toPath()))); + assertThat(responseTwo).isEqualTo(new String(Files.readAllBytes(randomFileOfFixedLengthTwo.toPath()))); + + } + + + @Disabled("Http Async Signing is not supported for S3") + void asyncValidSignedTrailerChecksumCalculatedBySdkClient() { + ExecutionAttributes executionAttributes = ExecutionAttributes.builder() + .put(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING, + true).build(); + s3HttpAsync.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .overrideConfiguration(o -> o.executionAttributes(executionAttributes)) + .key(KEY) + .build(), AsyncRequestBody.fromString("Hello world")).join(); + String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY) + .build(), AsyncResponseTransformer.toBytes()).join() + .asUtf8String(); + assertThat(response).isEqualTo("Hello world"); + } + + @Test + public void putObject_with_bufferCreatedFromEmptyString() { + + s3HttpAsync.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromString("")) + .join(); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + + String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY) + .checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join() + .asUtf8String(); + + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(""); + } + + @Test + public void putObject_with_bufferCreatedFromZeroCapacityByteBuffer() { + ByteBuffer content = ByteBuffer.allocate(0); + s3HttpAsync.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), AsyncRequestBody.fromByteBuffer(content)) + .join(); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + + String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) + .key(KEY) + .checksumMode(ChecksumMode.ENABLED) + .build(), AsyncResponseTransformer.toBytes()).join() + .asUtf8String(); + + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + assertThat(response).isEqualTo(""); + } +} diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java index 43f16fc40097..72d05cc93f69 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java @@ -17,26 +17,26 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.KB; +import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.createDataOfSize; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute; import software.amazon.awssdk.authcrt.signer.internal.DefaultAwsCrtS3V4aSigner; import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.checksums.Algorithm; import software.amazon.awssdk.core.checksums.ChecksumValidation; -import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; @@ -50,16 +50,17 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.utils.CaptureChecksumValidationInterceptor; +import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.testutils.Waiter; public class HttpChecksumIntegrationTest extends S3IntegrationTestBase { + public static final int HUGE_MSG_SIZE = 1600 * KB; protected static final String KEY = "some-key"; - public static final int HUGE_MSG_SIZE = 16384; + private static final String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class); public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor(); protected static S3Client s3Https; protected static S3AsyncClient s3HttpAsync; - private static String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class); @BeforeAll public static void setUp() throws Exception { @@ -78,27 +79,15 @@ public static void setUp() throws Exception { createBucket(BUCKET); - - Waiter.run(() -> s3.headBucket(r -> r.bucket(BUCKET))) - .ignoringException(NoSuchBucketException.class) - .orFail(); + s3.waiter().waitUntilBucketExists(s ->s.bucket(BUCKET)); interceptor.reset(); } @AfterAll - public static void tearDown(){ + public static void tearDown() { deleteBucketAndAllContents(BUCKET); } - private static String createDataSize(int msgSize) { - msgSize = msgSize / 2; - msgSize = msgSize * 1024; - StringBuilder sb = new StringBuilder(msgSize); - for (int i = 0; i < msgSize; i++) { - sb.append('a'); - } - return sb.toString(); - } @AfterEach public void clear() { @@ -281,71 +270,13 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClientWithSigv4a() { assertThat(text).isEqualTo("Hello world"); } - - @Test - public void asyncValidUnsignedTrailerChecksumCalculatedBySdkClient() throws InterruptedException { - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .overrideConfiguration(o -> o.signer(DefaultAwsCrtS3V4aSigner.create())) - - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString("Hello world")).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo("Hello world"); - } - - @Test - public void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient() throws InterruptedException { - s3Async.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .key(KEY) - .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), AsyncRequestBody.fromString("Hello world")).join(); - assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); - assertThat(interceptor.requestChecksumInHeader()).isNull(); - - String response = s3Async.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY).checksumMode(ChecksumMode.ENABLED) - .build(), AsyncResponseTransformer.toBytes()).join().asUtf8String(); - assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); - assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(response).isEqualTo("Hello world"); - } - - - - @Disabled("Http Async Signing is not supported for S3") - public void asyncValidSignedTrailerChecksumCalculatedBySdkClient() { - ExecutionAttributes executionAttributes = ExecutionAttributes.builder() - .put(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING, - true).build(); - s3HttpAsync.putObject(PutObjectRequest.builder() - .bucket(BUCKET) - .overrideConfiguration(o -> o.executionAttributes(executionAttributes)) - .key(KEY) - .build(), AsyncRequestBody.fromString("Hello world")).join(); - String response = s3HttpAsync.getObject(GetObjectRequest.builder().bucket(BUCKET) - .key(KEY) - .build(), AsyncResponseTransformer.toBytes()).join() - .asUtf8String(); - assertThat(response).isEqualTo("Hello world"); - } - @Test public void syncUnsignedPayloadForHugeMessage() throws InterruptedException { s3Https.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataSize(HUGE_MSG_SIZE))); + .build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -363,16 +294,16 @@ public void syncUnsignedPayloadForHugeMessage() throws InterruptedException { .collect(Collectors.joining("\n")); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); + assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a')); } @Test - public void syncSignedPayloadForHugeMessage(){ + public void syncSignedPayloadForHugeMessage() { s3.putObject(PutObjectRequest.builder() .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataSize(HUGE_MSG_SIZE))); + .build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -389,7 +320,7 @@ public void syncSignedPayloadForHugeMessage(){ .collect(Collectors.joining("\n")); assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); - assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); + assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a')); } @Test @@ -398,7 +329,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep .bucket(BUCKET) .key(KEY) .checksumAlgorithm(ChecksumAlgorithm.CRC32) - .build(), RequestBody.fromString(createDataSize(HUGE_MSG_SIZE))); + .build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a'))); assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); assertThat(interceptor.requestChecksumInHeader()).isNull(); @@ -414,6 +345,59 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep assertThat(interceptor.validationAlgorithm()).isNull(); assertThat(interceptor.responseValidation()).isNull(); - assertThat(text).isEqualTo(createDataSize(HUGE_MSG_SIZE)); + assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a')); + } + + + @Test + public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException, + IOException { + File randomFileOfFixedLength = new RandomTempFile(10 * KB); + + s3Https.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), RequestBody.fromFile(randomFileOfFixedLength.toPath())); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + ResponseInputStream s3HttpsObject = + s3Https.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).checksumMode(ChecksumMode.ENABLED).build()); + String text = new BufferedReader( + new InputStreamReader(s3HttpsObject, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(text).isEqualTo(new String(bytes)); + } + + + @Test + public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws IOException { + File randomFileOfFixedLength = new RandomTempFile(34 * KB); + s3Https.putObject(PutObjectRequest.builder() + .bucket(BUCKET) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .build(), RequestBody.fromFile(randomFileOfFixedLength.toPath())); + + assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32"); + assertThat(interceptor.requestChecksumInHeader()).isNull(); + + ResponseInputStream s3HttpsObject = + s3Https.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).checksumMode(ChecksumMode.ENABLED).build()); + String text = new BufferedReader( + new InputStreamReader(s3HttpsObject, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32); + assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED); + byte[] bytes = Files.readAllBytes(randomFileOfFixedLength.toPath()); + assertThat(text).isEqualTo(new String(bytes)); } + } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java index 28dcf23340d8..22a313f4186e 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/utils/ChecksumUtils.java @@ -15,17 +15,30 @@ package software.amazon.awssdk.services.s3.utils; +import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import software.amazon.awssdk.core.checksums.Algorithm; +import software.amazon.awssdk.core.checksums.SdkChecksum; +import software.amazon.awssdk.utils.BinaryUtils; /** * Utilities for computing the SHA-256 checksums of various binary objects. */ public final class ChecksumUtils { + + public static final int KB = 1024; + public static byte[] computeCheckSum(InputStream is) throws IOException { MessageDigest instance = createMessageDigest(); @@ -66,4 +79,16 @@ private static MessageDigest createMessageDigest() { throw new RuntimeException("Unable to create SHA-256 MessageDigest instance", e); } } + + public static String calculatedChecksum(String contentString, Algorithm algorithm) { + SdkChecksum sdkChecksum = SdkChecksum.forAlgorithm(algorithm); + for (byte character : contentString.getBytes(StandardCharsets.UTF_8)) { + sdkChecksum.update(character); + } + return BinaryUtils.toBase64(sdkChecksum.getChecksumBytes()); + } + + public static String createDataOfSize(int dataSize, char contentCharacter) { + return IntStream.range(0, dataSize).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); + } } diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java index 2be93377d816..fb95a8d0bc57 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/AsyncRequestBodyFlexibleChecksumInTrailerTest.java @@ -33,8 +33,14 @@ import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.stubbing.Scenario; import com.github.tomakehurst.wiremock.verification.LoggedRequest; +import java.io.File; +import java.io.IOException; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -45,13 +51,19 @@ import software.amazon.awssdk.core.HttpChecksumConstant; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.checksums.Algorithm; +import software.amazon.awssdk.core.checksums.SdkChecksum; +import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; import software.amazon.awssdk.services.protocolrestjson.model.ChecksumAlgorithm; import software.amazon.awssdk.testutils.EnvironmentVariableHelper; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.utils.BinaryUtils; public class AsyncRequestBodyFlexibleChecksumInTrailerTest { + public static final int KB = 1024; private static final String CRLF = "\r\n"; private static final EnvironmentVariableHelper ENVIRONMENT_VARIABLE_HELPER = new EnvironmentVariableHelper(); private static final String SCENARIO = "scenario"; @@ -62,6 +74,10 @@ public class AsyncRequestBodyFlexibleChecksumInTrailerTest { private ProtocolRestJsonAsyncClient asyncClient; private ProtocolRestJsonAsyncClient asyncClientWithSigner; + public static String createDataOfSize(int dataSize, char contentCharacter) { + return IntStream.range(0, dataSize).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining()); + } + @Before public void setupClient() { asyncClientWithSigner = ProtocolRestJsonAsyncClient.builder() @@ -86,15 +102,11 @@ public void cleanUp() { public void asyncStreaming_NoSigner_shouldContainChecksum_fromInterceptors() { stubResponseWithHeaders(); asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), AsyncRequestBody.fromString( - "abc"), + "abc"), AsyncResponseTransformer.toBytes()).join(); //payload would in json form as "{"StringMember":"foo"}x-amz-checksum-crc32:tcUDMQ==[\r][\n]" - verify(putRequestedFor(anyUrl()).withHeader(CONTENT_LENGTH, equalTo("44"))); - verify(putRequestedFor(anyUrl()).withHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, equalTo("x-amz-checksum-crc32"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-content-sha256", equalTo("STREAMING-UNSIGNED-PAYLOAD-TRAILER"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-decoded-content-length", equalTo("3"))); - verify(putRequestedFor(anyUrl()).withHeader("content-encoding", equalTo("aws-chunked"))); - verify(putRequestedFor(anyUrl()).withHeader("Content-Encoding", equalTo("aws-chunked"))); + verifyHeadersForPutRequest("44", "3", "x-amz-checksum-crc32"); + verify(putRequestedFor(anyUrl()).withRequestBody( containing( "3" + CRLF + "abc" + CRLF @@ -111,23 +123,76 @@ public void asyncStreaming_withRetry_NoSigner_shouldContainChecksum_fromIntercep + "x-amz-checksum-sha256:ungWv48Bz+pBQUDeXa4iI7ADYaOWF3qctBD/YfIAFa0=" + CRLF + CRLF; asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.SHA256), AsyncRequestBody.fromString( - "abc"), + "abc"), AsyncResponseTransformer.toBytes()).join(); List requests = getRecordedRequests(); assertThat(requests.size()).isEqualTo(2); assertThat(requests.get(0).getBody()).contains(expectedRequestBody.getBytes()); assertThat(requests.get(1).getBody()).contains(expectedRequestBody.getBytes()); + verify(putRequestedFor(anyUrl()).withHeader(CONTENT_TYPE, equalTo(Mimetype.MIMETYPE_TEXT_PLAIN))); - verify(putRequestedFor(anyUrl()).withHeader(CONTENT_LENGTH, equalTo("81"))); - verify(putRequestedFor(anyUrl()).withHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, equalTo("x-amz-checksum-sha256"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-content-sha256", equalTo("STREAMING-UNSIGNED-PAYLOAD-TRAILER"))); - verify(putRequestedFor(anyUrl()).withHeader("x-amz-decoded-content-length", equalTo("3"))); - verify(putRequestedFor(anyUrl()).withHeader("Content-Encoding", equalTo("aws-chunked"))); + verifyHeadersForPutRequest("81", "3", "x-amz-checksum-sha256"); + verify(putRequestedFor(anyUrl()).withRequestBody( containing( expectedRequestBody))); } + @Test + public void asyncStreaming_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsChecksums_fromInterceptors() throws IOException { + + stubForFailureThenSuccess(500, "500"); + File randomFileOfFixedLength = new RandomTempFile(37 * KB); + String contentString = new String(Files.readAllBytes(randomFileOfFixedLength.toPath())); + String expectedChecksum = calculatedChecksum(contentString, Algorithm.CRC32); + + asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), + FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) + .chunkSizeInBytes(16 * KB) + .build(), + AsyncResponseTransformer.toBytes()).join(); + verifyHeadersForPutRequest("37948", "37888", "x-amz-checksum-crc32"); + verify(putRequestedFor(anyUrl()).withRequestBody( + containing( + "4000" + CRLF + contentString.substring(0, 16 * KB) + CRLF + + "4000" + CRLF + contentString.substring(16 * KB, 32 * KB) + CRLF + + "1400" + CRLF + contentString.substring(32 * KB) + CRLF + + "0" + CRLF + + "x-amz-checksum-crc32:" + expectedChecksum + CRLF + CRLF))); + } + + @Test + public void asyncStreaming_withRetry_FromAsyncRequestBody_VariableChunkSize_NoSigner_addsChecksums_fromInterceptors() throws IOException { + + + File randomFileOfFixedLength = new RandomTempFile(37 * KB); + String contentString = new String(Files.readAllBytes(randomFileOfFixedLength.toPath())); + String expectedChecksum = calculatedChecksum(contentString, Algorithm.CRC32); + stubResponseWithHeaders(); + + asyncClient.putOperationWithChecksum(b -> b.checksumAlgorithm(ChecksumAlgorithm.CRC32), + FileAsyncRequestBody.builder().path(randomFileOfFixedLength.toPath()) + .chunkSizeInBytes(16 * KB) + .build(), + AsyncResponseTransformer.toBytes()).join(); + verifyHeadersForPutRequest("37948", "37888", "x-amz-checksum-crc32"); + verify(putRequestedFor(anyUrl()).withRequestBody( + containing( + "4000" + CRLF + contentString.substring(0, 16 * KB) + CRLF + + "4000" + CRLF + contentString.substring(16 * KB, 32 * KB) + CRLF + + "1400" + CRLF + contentString.substring(32 * KB) + CRLF + + "0" + CRLF + + "x-amz-checksum-crc32:" + expectedChecksum + CRLF + CRLF))); + } + + private String calculatedChecksum(String contentString, Algorithm algorithm) { + SdkChecksum sdkChecksum = SdkChecksum.forAlgorithm(algorithm); + for (byte character : contentString.getBytes(StandardCharsets.UTF_8)) { + sdkChecksum.update(character); + } + return BinaryUtils.toBase64(sdkChecksum.getChecksumBytes()); + } + private void stubResponseWithHeaders() { stubFor(put(anyUrl()) .willReturn(aResponse().withStatus(200) @@ -163,4 +228,15 @@ private List getRecordedRequests() { } + private void verifyHeadersForPutRequest(String contentLength, String decodedContentLength, String checksumHeader) { + verify(putRequestedFor(anyUrl()).withHeader(CONTENT_LENGTH, equalTo(contentLength))); + verify(putRequestedFor(anyUrl()).withHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, equalTo( + checksumHeader))); + verify(putRequestedFor(anyUrl()).withHeader("x-amz-content-sha256", equalTo("STREAMING-UNSIGNED-PAYLOAD-TRAILER"))); + verify(putRequestedFor(anyUrl()).withHeader("x-amz-decoded-content-length", equalTo(decodedContentLength))); + verify(putRequestedFor(anyUrl()).withHeader("content-encoding", equalTo("aws-chunked"))); + verify(putRequestedFor(anyUrl()).withHeader("Content-Encoding", equalTo("aws-chunked"))); + } + + } \ No newline at end of file diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java index 2053a4e820cf..bf68ee568789 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java @@ -133,11 +133,17 @@ public void onComplete() { * Increment the downstream demand by the provided value, accounting for overflow. */ private void addDownstreamDemand(long l) { - Validate.isTrue(l > 0, "Demand must not be negative."); - downstreamDemand.getAndUpdate(current -> { - long newValue = current + l; - return newValue >= 0 ? newValue : Long.MAX_VALUE; - }); + + if (l > 0) { + downstreamDemand.getAndUpdate(current -> { + long newValue = current + l; + return newValue >= 0 ? newValue : Long.MAX_VALUE; + }); + } else { + log.error(() -> "Demand " + l + " must not be negative."); + upstreamSubscription.cancel(); + onError(new IllegalArgumentException("Demand must not be negative")); + } } /**