Skip to content

Commit ac0e9cf

Browse files
committed
Updated Fix to support AsyncRequestFileBody of variable chunk size
1 parent f6a7a43 commit ac0e9cf

File tree

10 files changed

+480
-196
lines changed

10 files changed

+480
-196
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import software.amazon.awssdk.core.exception.SdkException;
3333
import software.amazon.awssdk.utils.BinaryUtils;
3434
import software.amazon.awssdk.utils.Validate;
35+
import software.amazon.awssdk.utils.async.ByteBufferingSubscriber;
3536
import software.amazon.awssdk.utils.builder.SdkBuilder;
3637

3738
/**
@@ -41,6 +42,8 @@
4142
@SdkInternalApi
4243
public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody {
4344

45+
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
46+
4447
public static final byte[] FINAL_BYTE = new byte[0];
4548
private final AsyncRequestBody wrapped;
4649
private final SdkChecksum sdkChecksum;
@@ -148,7 +151,9 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
148151
if (sdkChecksum != null) {
149152
sdkChecksum.reset();
150153
}
151-
wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes));
154+
ByteBufferingSubscriber<ByteBuffer> bufferingSubscriber = new ByteBufferingSubscriber(
155+
new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes), DEFAULT_CHUNK_SIZE);
156+
wrapped.subscribe(bufferingSubscriber);
152157
}
153158

154159
private static final class ChecksumCalculatingSubscriber implements Subscriber<ByteBuffer> {
@@ -189,7 +194,7 @@ public void onNext(ByteBuffer byteBuffer) {
189194
ByteBuffer allocatedBuffer = getFinalChecksumAppendedChunk(byteBuffer);
190195
wrapped.onNext(allocatedBuffer);
191196
} else {
192-
ByteBuffer allocatedBuffer = appendChunkSizeAndFinalByte(byteBuffer);
197+
ByteBuffer allocatedBuffer = createChunk(byteBuffer, false);
193198
wrapped.onNext(allocatedBuffer);
194199
}
195200
} catch (SdkException sdkException) {
@@ -216,14 +221,6 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) {
216221
return checksumAppendedBuffer;
217222
}
218223

219-
private ByteBuffer appendChunkSizeAndFinalByte(ByteBuffer byteBuffer) {
220-
ByteBuffer contentChunk = createChunk(byteBuffer, false);
221-
ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate(contentChunk.remaining());
222-
checksumAppendedBuffer.put(contentChunk);
223-
checksumAppendedBuffer.flip();
224-
return checksumAppendedBuffer;
225-
}
226-
227224
@Override
228225
public void onError(Throwable t) {
229226
wrapped.onError(t);

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,12 @@
4747
*/
4848
@SdkInternalApi
4949
public final class FileAsyncRequestBody implements AsyncRequestBody {
50+
private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class);
5051

5152
/**
5253
* Default size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
5354
*/
54-
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
55-
56-
private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class);
55+
private static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
5756

5857
/**
5958
* File to read.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.core.internal.interceptor;
1717

18+
import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength;
19+
1820
import java.util.Optional;
1921
import software.amazon.awssdk.annotations.SdkInternalApi;
2022
import software.amazon.awssdk.core.ClientType;
@@ -25,8 +27,6 @@
2527
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
2628
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
2729
import software.amazon.awssdk.core.internal.async.ChecksumCalculatingAsyncRequestBody;
28-
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
29-
import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream;
3030
import software.amazon.awssdk.core.internal.util.ChunkContentUtils;
3131
import software.amazon.awssdk.core.internal.util.HttpChecksumUtils;
3232
import software.amazon.awssdk.http.Header;
@@ -99,10 +99,8 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
9999
private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttpRequest context, ChecksumSpecs checksum,
100100
long checksumContentLength, long originalContentLength) {
101101

102-
long chunkLength = isFileAsyncRequestBody(context)
103-
? AwsUnsignedChunkedEncodingInputStream
104-
.calculateStreamContentLength(originalContentLength, FileAsyncRequestBody.DEFAULT_CHUNK_SIZE)
105-
: ChunkContentUtils.calculateChunkLength(originalContentLength);
102+
long chunkLength =
103+
calculateStreamContentLength(originalContentLength, ChecksumCalculatingAsyncRequestBody.DEFAULT_CHUNK_SIZE);
106104

107105
return context.httpRequest().copy(r ->
108106
r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName())
@@ -112,9 +110,4 @@ private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttp
112110
.putHeader(Header.CONTENT_LENGTH,
113111
Long.toString(chunkLength + checksumContentLength)));
114112
}
115-
116-
private static boolean isFileAsyncRequestBody(Context.ModifyHttpRequest context) {
117-
return context.asyncRequestBody().isPresent() && context.asyncRequestBody().get() instanceof FileAsyncRequestBody;
118-
}
119-
120113
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,10 @@ public static Builder builder() {
4848
* @return Content length of the trailer that will be appended at the end.
4949
*/
5050
public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) {
51-
int checksumLength = algorithm.base64EncodedLength();
52-
53-
return (headerName.length()
54-
+ HEADER_COLON_SEPARATOR.length()
55-
+ checksumLength
56-
+ CRLF.length());
51+
return (long) headerName.length()
52+
+ (long) HEADER_COLON_SEPARATOR.length()
53+
+ algorithm.base64EncodedLength().longValue()
54+
+ (long) CRLF.length() + CRLF.length();
5755
}
5856

5957
/**
@@ -78,7 +76,7 @@ public static long calculateStreamContentLength(long originalLength, long defaul
7876

7977
long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize);
8078
long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0;
81-
long lastByteSize = "0".length() + CRLF.length();
79+
long lastByteSize = (long) "0".length() + (long) CRLF.length();
8280

8381
return allChunks + remainingInChunk + lastByteSize;
8482
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void lengthsOfCalculateByChecksumCalculatingInputStream(){
6060
long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength(
6161
SHA256_ALGORITHM, SHA256_HEADER_NAME);
6262
assertThat(calculateChunkLength).isEqualTo(19);
63-
assertThat(checksumContentLength).isEqualTo(69);
63+
assertThat(checksumContentLength).isEqualTo(71);
6464
}
6565

6666
@Test

0 commit comments

Comments
 (0)