Skip to content

Commit 62daf53

Browse files
authored
Fix Trailer based Http Checksum for Async Request body with variable chunk size (#3380)
* Fix Trailer based Http Checksum for Async Request body created from File * Updated Fix to support AsyncRequestFileBody of variable chunk size * Reusing .flatMapIterable API of SDKPublisher to create Chunks of fixed size * Adding a separate class ChunkBuffer to handle Multiple thread accessing currentBuffer * Creating a SynchronousBuffer for mapper , also handling ZeroByte s3 put Object * Made ChunkBuffer synchronized
1 parent fd35030 commit 62daf53

File tree

15 files changed

+864
-134
lines changed

15 files changed

+864
-134
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async Request body."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public final class HttpChecksumConstant {
3939

4040
public static final String HEADER_FOR_TRAILER_REFERENCE = "x-amz-trailer";
4141

42+
/**
43+
* Default chunk size for Async trailer based checksum data transfer*
44+
*/
45+
public static final int DEFAULT_ASYNC_CHUNK_SIZE = 16 * 1024;
46+
4247
private HttpChecksumConstant() {
4348
}
4449
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

+27-9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
1819
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength;
1920
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChunkLength;
2021
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChecksumTrailer;
@@ -41,12 +42,12 @@
4142
@SdkInternalApi
4243
public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody {
4344

44-
public static final byte[] FINAL_BYTE = new byte[0];
45+
private static final byte[] FINAL_BYTE = new byte[0];
4546
private final AsyncRequestBody wrapped;
4647
private final SdkChecksum sdkChecksum;
4748
private final Algorithm algorithm;
4849
private final String trailerHeader;
49-
private final AtomicLong remainingBytes;
50+
private final long totalBytes;
5051

5152
private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) {
5253

@@ -57,8 +58,8 @@ private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) {
5758
this.algorithm = builder.algorithm;
5859
this.sdkChecksum = builder.algorithm != null ? SdkChecksum.forAlgorithm(algorithm) : null;
5960
this.trailerHeader = builder.trailerHeader;
60-
this.remainingBytes = new AtomicLong(wrapped.contentLength()
61-
.orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied.")));
61+
this.totalBytes = wrapped.contentLength()
62+
.orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied."));
6263
}
6364

6465
/**
@@ -148,7 +149,10 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
148149
if (sdkChecksum != null) {
149150
sdkChecksum.reset();
150151
}
151-
wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes));
152+
153+
SynchronousChunkBuffer synchronousChunkBuffer = new SynchronousChunkBuffer(totalBytes);
154+
wrapped.flatMapIterable(synchronousChunkBuffer::buffer)
155+
.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes));
152156
}
153157

154158
private static final class ChecksumCalculatingSubscriber implements Subscriber<ByteBuffer> {
@@ -162,11 +166,11 @@ private static final class ChecksumCalculatingSubscriber implements Subscriber<B
162166

163167
ChecksumCalculatingSubscriber(Subscriber<? super ByteBuffer> wrapped,
164168
SdkChecksum checksum,
165-
String trailerHeader, AtomicLong remainingBytes) {
169+
String trailerHeader, long totalBytes) {
166170
this.wrapped = wrapped;
167171
this.checksum = checksum;
168172
this.trailerHeader = trailerHeader;
169-
this.remainingBytes = remainingBytes;
173+
this.remainingBytes = new AtomicLong(totalBytes);
170174
}
171175

172176
@Override
@@ -189,7 +193,8 @@ public void onNext(ByteBuffer byteBuffer) {
189193
ByteBuffer allocatedBuffer = getFinalChecksumAppendedChunk(byteBuffer);
190194
wrapped.onNext(allocatedBuffer);
191195
} else {
192-
wrapped.onNext(byteBuffer);
196+
ByteBuffer allocatedBuffer = createChunk(byteBuffer, false);
197+
wrapped.onNext(allocatedBuffer);
193198
}
194199
} catch (SdkException sdkException) {
195200
this.subscription.cancel();
@@ -201,7 +206,7 @@ private ByteBuffer getFinalChecksumAppendedChunk(ByteBuffer byteBuffer) {
201206
ByteBuffer finalChunkedByteBuffer = createChunk(ByteBuffer.wrap(FINAL_BYTE), true);
202207
ByteBuffer checksumTrailerByteBuffer = createChecksumTrailer(
203208
BinaryUtils.toBase64(checksumBytes), trailerHeader);
204-
ByteBuffer contentChunk = createChunk(byteBuffer, false);
209+
ByteBuffer contentChunk = byteBuffer.hasRemaining() ? createChunk(byteBuffer, false) : byteBuffer;
205210

206211
ByteBuffer checksumAppendedBuffer = ByteBuffer.allocate(
207212
contentChunk.remaining()
@@ -225,4 +230,17 @@ public void onComplete() {
225230
wrapped.onComplete();
226231
}
227232
}
233+
234+
private static final class SynchronousChunkBuffer {
235+
private final ChunkBuffer chunkBuffer;
236+
237+
SynchronousChunkBuffer(long totalBytes) {
238+
this.chunkBuffer = ChunkBuffer.builder().bufferSize(DEFAULT_ASYNC_CHUNK_SIZE).totalBytes(totalBytes).build();
239+
}
240+
241+
private Iterable<ByteBuffer> buffer(ByteBuffer bytes) {
242+
return chunkBuffer.bufferAndCreateChunks(bytes);
243+
}
244+
}
245+
228246
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.utils.Validate;
26+
import software.amazon.awssdk.utils.builder.SdkBuilder;
27+
28+
/**
29+
* Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize*
30+
*/
31+
@SdkInternalApi
32+
public final class ChunkBuffer {
33+
private final AtomicLong remainingBytes;
34+
private final ByteBuffer currentBuffer;
35+
private final int bufferSize;
36+
37+
private ChunkBuffer(Long totalBytes, Integer bufferSize) {
38+
Validate.notNull(totalBytes, "The totalBytes must not be null");
39+
40+
int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
41+
this.bufferSize = chunkSize;
42+
this.currentBuffer = ByteBuffer.allocate(chunkSize);
43+
this.remainingBytes = new AtomicLong(totalBytes);
44+
}
45+
46+
public static Builder builder() {
47+
return new DefaultBuilder();
48+
}
49+
50+
51+
// currentBuffer and bufferedList can get over written if concurrent Threads calls this method at the same time.
52+
public synchronized Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
53+
int startPosition = 0;
54+
List<ByteBuffer> bufferedList = new ArrayList<>();
55+
int currentBytesRead = buffer.remaining();
56+
do {
57+
int bufferedBytes = currentBuffer.position();
58+
int availableToRead = bufferSize - bufferedBytes;
59+
int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition);
60+
61+
if (bufferedBytes == 0) {
62+
currentBuffer.put(buffer.array(), startPosition, bytesToMove);
63+
} else {
64+
currentBuffer.put(buffer.array(), 0, bytesToMove);
65+
}
66+
67+
startPosition = startPosition + bytesToMove;
68+
69+
// Send the data once the buffer is full
70+
if (currentBuffer.position() == bufferSize) {
71+
currentBuffer.position(0);
72+
ByteBuffer bufferToSend = ByteBuffer.allocate(bufferSize);
73+
bufferToSend.put(currentBuffer.array(), 0, bufferSize);
74+
bufferToSend.clear();
75+
currentBuffer.clear();
76+
bufferedList.add(bufferToSend);
77+
remainingBytes.addAndGet(-bufferSize);
78+
}
79+
} while (startPosition < currentBytesRead);
80+
81+
int remainingBytesInBuffer = currentBuffer.position();
82+
83+
// Send the remaining buffer when
84+
// 1. remainingBytes in buffer are same as the last few bytes to be read.
85+
// 2. If it is a zero byte and the last byte to be read.
86+
if (remainingBytes.get() == remainingBytesInBuffer &&
87+
(buffer.remaining() == 0 || remainingBytesInBuffer > 0)) {
88+
currentBuffer.clear();
89+
ByteBuffer trimmedBuffer = ByteBuffer.allocate(remainingBytesInBuffer);
90+
trimmedBuffer.put(currentBuffer.array(), 0, remainingBytesInBuffer);
91+
trimmedBuffer.clear();
92+
bufferedList.add(trimmedBuffer);
93+
remainingBytes.addAndGet(-remainingBytesInBuffer);
94+
}
95+
return bufferedList;
96+
}
97+
98+
public interface Builder extends SdkBuilder<Builder, ChunkBuffer> {
99+
100+
Builder bufferSize(int bufferSize);
101+
102+
Builder totalBytes(long totalBytes);
103+
104+
105+
}
106+
107+
private static final class DefaultBuilder implements Builder {
108+
109+
private Integer bufferSize;
110+
private Long totalBytes;
111+
112+
@Override
113+
public ChunkBuffer build() {
114+
return new ChunkBuffer(totalBytes, bufferSize);
115+
}
116+
117+
@Override
118+
public Builder bufferSize(int bufferSize) {
119+
this.bufferSize = bufferSize;
120+
return this;
121+
}
122+
123+
@Override
124+
public Builder totalBytes(long totalBytes) {
125+
this.totalBytes = totalBytes;
126+
return this;
127+
}
128+
}
129+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package software.amazon.awssdk.core.internal.interceptor;
1717

18+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
19+
import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength;
20+
1821
import java.util.Optional;
1922
import software.amazon.awssdk.annotations.SdkInternalApi;
2023
import software.amazon.awssdk.core.ClientType;
@@ -97,7 +100,9 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
97100
private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttpRequest context, ChecksumSpecs checksum,
98101
long checksumContentLength, long originalContentLength) {
99102

100-
long chunkLength = ChunkContentUtils.calculateChunkLength(originalContentLength);
103+
long chunkLength =
104+
calculateStreamContentLength(originalContentLength, DEFAULT_ASYNC_CHUNK_SIZE);
105+
101106
return context.httpRequest().copy(r ->
102107
r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName())
103108
.putHeader("Content-encoding", HttpChecksumConstant.AWS_CHUNKED_HEADER)

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.awssdk.core.interceptor.Context;
3030
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
3131
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
32+
import software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream;
3233
import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream;
3334
import software.amazon.awssdk.core.internal.util.HttpChecksumResolver;
3435
import software.amazon.awssdk.core.internal.util.HttpChecksumUtils;
@@ -73,7 +74,7 @@ public Optional<RequestBody> modifyHttpContent(Context.ModifyHttpRequest context
7374
RequestBody.fromContentProvider(
7475
streamProvider,
7576
AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(
76-
requestBody.optionalContentLength().orElse(0L))
77+
requestBody.optionalContentLength().orElse(0L), AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE)
7778
+ checksumContentLength,
7879
requestBody.contentType()));
7980
}
@@ -112,7 +113,8 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, Execu
112113
.putHeader("x-amz-decoded-content-length", Long.toString(originalContentLength))
113114
.putHeader(CONTENT_LENGTH,
114115
Long.toString(AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(
115-
originalContentLength) + checksumContentLength)));
116+
originalContentLength, AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE)
117+
+ checksumContentLength)));
116118
}
117119

118120

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
@SdkInternalApi
4040
public abstract class AwsChunkedEncodingInputStream extends SdkInputStream {
4141

42-
protected static final int DEFAULT_CHUNK_SIZE = 128 * 1024;
42+
public static final int DEFAULT_CHUNK_SIZE = 128 * 1024;
4343
protected static final int SKIP_BUFFER_SIZE = 256 * 1024;
4444
protected static final String CRLF = "\r\n";
4545
protected static final byte[] FINAL_CHUNK = new byte[0];

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java

+15-14
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,10 @@ public static Builder builder() {
4848
* @return Content length of the trailer that will be appended at the end.
4949
*/
5050
public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) {
51-
int checksumLength = algorithm.base64EncodedLength();
52-
53-
return (headerName.length()
54-
+ HEADER_COLON_SEPARATOR.length()
55-
+ checksumLength
56-
+ CRLF.length());
51+
return headerName.length()
52+
+ HEADER_COLON_SEPARATOR.length()
53+
+ algorithm.base64EncodedLength().longValue()
54+
+ CRLF.length() + CRLF.length();
5755
}
5856

5957
/**
@@ -68,17 +66,20 @@ private static long calculateChunkLength(long originalContentLength) {
6866
+ CRLF.length();
6967
}
7068

71-
public static long calculateStreamContentLength(long originalLength) {
72-
if (originalLength < 0) {
73-
throw new IllegalArgumentException("Non negative content length expected.");
69+
public static long calculateStreamContentLength(long originalLength, long defaultChunkSize) {
70+
if (originalLength < 0 || defaultChunkSize == 0) {
71+
throw new IllegalArgumentException(originalLength + ", " + defaultChunkSize + "Args <= 0 not expected");
7472
}
7573

76-
long maxSizeChunks = originalLength / DEFAULT_CHUNK_SIZE;
77-
long remainingBytes = originalLength % DEFAULT_CHUNK_SIZE;
74+
long maxSizeChunks = originalLength / defaultChunkSize;
75+
long remainingBytes = originalLength % defaultChunkSize;
76+
77+
long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize);
78+
long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0;
79+
// last byte is composed of a "0" and "\r\n"
80+
long lastByteSize = 1 + (long) CRLF.length();
7881

79-
return maxSizeChunks * calculateChunkLength(DEFAULT_CHUNK_SIZE)
80-
+ (remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0)
81-
+ calculateChunkLength(0);
82+
return allChunks + remainingInChunk + lastByteSize;
8283
}
8384

8485
@Override

0 commit comments

Comments
 (0)