Skip to content

Commit 1dcd041

Browse files
committed
Adding a separate class ChunkBuffer to handle Multiple thread accessing currentBuffer
1 parent b662515 commit 1dcd041

File tree

11 files changed

+290
-122
lines changed

11 files changed

+290
-122
lines changed

.changes/next-release/bugfix-AWSSDKforJavav2-5ecdce1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"type": "bugfix",
33
"category": "AWS SDK for Java v2",
44
"contributor": "",
5-
"description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async File Request body."
5+
"description": "Fixed issue where request used to fail while calculating Trailer based checksum for Async Request body."
66
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/HttpChecksumConstant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public final class HttpChecksumConstant {
3939

4040
public static final String HEADER_FOR_TRAILER_REFERENCE = "x-amz-trailer";
4141

42+
/**
43+
* Default chunk size for Async trailer based checksum data transfer*
44+
*/
45+
public static final int DEFAULT_ASYNC_CHUNK_SIZE = 16 * 1024;
46+
4247
private HttpChecksumConstant() {
4348
}
4449
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 11 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
1819
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength;
1920
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChunkLength;
2021
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChecksumTrailer;
2122
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChunk;
2223

2324
import java.nio.ByteBuffer;
24-
import java.util.ArrayList;
25-
import java.util.List;
2625
import java.util.Optional;
2726
import java.util.concurrent.atomic.AtomicLong;
2827
import org.reactivestreams.Subscriber;
@@ -43,16 +42,12 @@
4342
@SdkInternalApi
4443
public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody {
4544

46-
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024;
47-
48-
public static final byte[] FINAL_BYTE = new byte[0];
45+
private static final byte[] FINAL_BYTE = new byte[0];
4946
private final AsyncRequestBody wrapped;
5047
private final SdkChecksum sdkChecksum;
5148
private final Algorithm algorithm;
5249
private final String trailerHeader;
53-
private final AtomicLong remainingBytes;
5450
private final long totalBytes;
55-
private final ByteBuffer currentBuffer;
5651

5752
private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) {
5853

@@ -65,8 +60,6 @@ private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) {
6560
this.trailerHeader = builder.trailerHeader;
6661
this.totalBytes = wrapped.contentLength()
6762
.orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied."));
68-
this.remainingBytes = new AtomicLong();
69-
this.currentBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
7063
}
7164

7265
/**
@@ -157,9 +150,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
157150
sdkChecksum.reset();
158151
}
159152

160-
this.remainingBytes.set(totalBytes);
161-
162-
wrapped.flatMapIterable(this::bufferAndCreateChunks)
153+
wrapped.flatMapIterable(this::buffer)
163154
.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes));
164155
}
165156

@@ -239,47 +230,13 @@ public void onComplete() {
239230
}
240231
}
241232

242-
private Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
243-
int startPosition = 0;
244-
int currentBytesRead = buffer.remaining();
245-
246-
List<ByteBuffer> resultBufferedList = new ArrayList<>();
247-
do {
248-
int bufferedBytes = currentBuffer.position();
249-
int availableToRead = DEFAULT_CHUNK_SIZE - bufferedBytes;
250-
int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition);
251-
252-
if (bufferedBytes == 0) {
253-
currentBuffer.put(buffer.array(), startPosition, bytesToMove);
254-
} else {
255-
currentBuffer.put(buffer.array(), 0, bytesToMove);
256-
}
257-
258-
startPosition = startPosition + bytesToMove;
259-
260-
// Send the data once the buffer is full
261-
if (currentBuffer.position() == DEFAULT_CHUNK_SIZE) {
262-
currentBuffer.position(0);
263-
ByteBuffer bufferToSend = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
264-
bufferToSend.put(currentBuffer.array(), 0, DEFAULT_CHUNK_SIZE);
265-
bufferToSend.clear();
266-
currentBuffer.clear();
267-
resultBufferedList.add(bufferToSend);
268-
remainingBytes.addAndGet(-DEFAULT_CHUNK_SIZE);
269-
}
270-
271-
} while (startPosition < currentBytesRead);
272-
273-
int bufferedBytes = currentBuffer.position();
274-
// Send the remainder buffered bytes at the end when there no more bytes
275-
if (bufferedBytes > 0 && remainingBytes.get() == bufferedBytes) {
276-
currentBuffer.clear();
277-
ByteBuffer trimmedBuffer = ByteBuffer.allocate(bufferedBytes);
278-
trimmedBuffer.put(currentBuffer.array(), 0, bufferedBytes);
279-
trimmedBuffer.clear();
280-
resultBufferedList.add(trimmedBuffer);
281-
remainingBytes.addAndGet(-bufferedBytes);
282-
}
283-
return resultBufferedList;
233+
private Iterable<ByteBuffer> buffer(ByteBuffer bytes) {
234+
ChunkBuffer chunkBuffer = ChunkBuffer.builder()
235+
.bufferSize(DEFAULT_ASYNC_CHUNK_SIZE)
236+
.totalBytes(bytes.remaining())
237+
.build();
238+
chunkBuffer.bufferAndCreateChunks(bytes);
239+
return chunkBuffer.getBufferedList();
284240
}
241+
285242
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.utils.Validate;
27+
import software.amazon.awssdk.utils.builder.SdkBuilder;
28+
29+
/**
30+
* Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize*
31+
*/
32+
@SdkInternalApi
33+
public class ChunkBuffer {
34+
private final AtomicLong remainingBytes;
35+
private final ByteBuffer currentBuffer;
36+
private final int bufferSize;
37+
private List<ByteBuffer> bufferedList;
38+
39+
40+
private ChunkBuffer(Long totalBytes, Integer bufferSize) {
41+
Validate.notNull(totalBytes, "The totalBytes must not be null");
42+
43+
int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
44+
this.bufferSize = chunkSize;
45+
this.currentBuffer = ByteBuffer.allocate(chunkSize);
46+
this.remainingBytes = new AtomicLong(totalBytes);
47+
bufferedList = new ArrayList<>();
48+
}
49+
50+
51+
public static Builder builder() {
52+
return new DefaultBuilder();
53+
}
54+
55+
public List<ByteBuffer> getBufferedList() {
56+
if (currentBuffer == null) {
57+
throw new IllegalStateException("");
58+
}
59+
List<ByteBuffer> ret = bufferedList;
60+
bufferedList = new ArrayList<>();
61+
return Collections.unmodifiableList(ret);
62+
}
63+
64+
public Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
65+
int startPosition = 0;
66+
int currentBytesRead = buffer.remaining();
67+
68+
do {
69+
70+
int bufferedBytes = currentBuffer.position();
71+
int availableToRead = bufferSize - bufferedBytes;
72+
int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition);
73+
74+
if (bufferedBytes == 0) {
75+
currentBuffer.put(buffer.array(), startPosition, bytesToMove);
76+
} else {
77+
currentBuffer.put(buffer.array(), 0, bytesToMove);
78+
}
79+
80+
startPosition = startPosition + bytesToMove;
81+
82+
// Send the data once the buffer is full
83+
if (currentBuffer.position() == bufferSize) {
84+
currentBuffer.position(0);
85+
ByteBuffer bufferToSend = ByteBuffer.allocate(bufferSize);
86+
bufferToSend.put(currentBuffer.array(), 0, bufferSize);
87+
bufferToSend.clear();
88+
currentBuffer.clear();
89+
bufferedList.add(bufferToSend);
90+
remainingBytes.addAndGet(-bufferSize);
91+
}
92+
} while (startPosition < currentBytesRead);
93+
94+
int remainingBytesInBuffer = currentBuffer.position();
95+
// Send the remainder buffered bytes at the end when there no more bytes
96+
if (remainingBytesInBuffer > 0 && remainingBytes.get() == remainingBytesInBuffer) {
97+
currentBuffer.clear();
98+
ByteBuffer trimmedBuffer = ByteBuffer.allocate(remainingBytesInBuffer);
99+
trimmedBuffer.put(currentBuffer.array(), 0, remainingBytesInBuffer);
100+
trimmedBuffer.clear();
101+
bufferedList.add(trimmedBuffer);
102+
remainingBytes.addAndGet(-remainingBytesInBuffer);
103+
}
104+
return bufferedList;
105+
}
106+
107+
public interface Builder extends SdkBuilder<Builder, ChunkBuffer> {
108+
109+
Builder bufferSize(int bufferSize);
110+
111+
Builder totalBytes(long totalBytes);
112+
113+
114+
}
115+
116+
private static final class DefaultBuilder implements Builder {
117+
118+
private Integer bufferSize;
119+
private Long totalBytes;
120+
121+
@Override
122+
public ChunkBuffer build() {
123+
return new ChunkBuffer(totalBytes, bufferSize);
124+
}
125+
126+
@Override
127+
public Builder bufferSize(int bufferSize) {
128+
this.bufferSize = bufferSize;
129+
return this;
130+
}
131+
132+
@Override
133+
public Builder totalBytes(long totalBytes) {
134+
this.totalBytes = totalBytes;
135+
return this;
136+
}
137+
}
138+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/AsyncRequestBodyHttpChecksumTrailerInterceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.core.internal.interceptor;
1717

18+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
1819
import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength;
1920

2021
import java.util.Optional;
@@ -100,7 +101,7 @@ private static SdkHttpRequest updateHeadersForTrailerChecksum(Context.ModifyHttp
100101
long checksumContentLength, long originalContentLength) {
101102

102103
long chunkLength =
103-
calculateStreamContentLength(originalContentLength, ChecksumCalculatingAsyncRequestBody.DEFAULT_CHUNK_SIZE);
104+
calculateStreamContentLength(originalContentLength, DEFAULT_ASYNC_CHUNK_SIZE);
104105

105106
return context.httpRequest().copy(r ->
106107
r.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksum.headerName())
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import java.nio.ByteBuffer;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import org.junit.jupiter.api.Test;
25+
import software.amazon.awssdk.core.internal.async.ChunkBuffer;
26+
import software.amazon.awssdk.utils.StringUtils;
27+
28+
public class ChunkBufferTest {
29+
30+
@Test
31+
void builderWithNoTotalSize() {
32+
assertThatThrownBy(() -> ChunkBuffer.builder().build()).isInstanceOf(NullPointerException.class);
33+
}
34+
35+
@Test
36+
void numberOfChunkMultipleOfTotalBtyes() {
37+
String inputString = StringUtils.repeat("*", 25);
38+
39+
ChunkBuffer chunkBuffer =
40+
ChunkBuffer.builder().bufferSize(5).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build();
41+
Iterable<ByteBuffer> byteBuffers =
42+
chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8)));
43+
44+
AtomicInteger iteratedCounts = new AtomicInteger();
45+
byteBuffers.forEach(r -> {
46+
iteratedCounts.getAndIncrement();
47+
assertThat(r.array()).isEqualTo(StringUtils.repeat("*", 5).getBytes(StandardCharsets.UTF_8));
48+
});
49+
assertThat(iteratedCounts.get()).isEqualTo(5);
50+
}
51+
52+
@Test
53+
void numberOfChunk_Not_MultipleOfTotalBytes() {
54+
int totalBytes = 23;
55+
int bufferSize = 5;
56+
57+
String inputString = StringUtils.repeat("*", totalBytes);
58+
ChunkBuffer chunkBuffer =
59+
ChunkBuffer.builder().bufferSize(bufferSize).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build();
60+
Iterable<ByteBuffer> byteBuffers =
61+
chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8)));
62+
63+
AtomicInteger iteratedCounts = new AtomicInteger();
64+
byteBuffers.forEach(r -> {
65+
iteratedCounts.getAndIncrement();
66+
if (iteratedCounts.get() * bufferSize < totalBytes) {
67+
assertThat(r.array()).isEqualTo(StringUtils.repeat("*", bufferSize).getBytes(StandardCharsets.UTF_8));
68+
} else {
69+
assertThat(r.array()).isEqualTo(StringUtils.repeat("*", 3).getBytes(StandardCharsets.UTF_8));
70+
71+
}
72+
});
73+
}
74+
75+
@Test
76+
void zeroTotalByetsAsInput() {
77+
String inputString = "";
78+
ChunkBuffer chunkBuffer =
79+
ChunkBuffer.builder().bufferSize(5).totalBytes(inputString.getBytes(StandardCharsets.UTF_8).length).build();
80+
Iterable<ByteBuffer> byteBuffers =
81+
chunkBuffer.bufferAndCreateChunks(ByteBuffer.wrap(inputString.getBytes(StandardCharsets.UTF_8)));
82+
83+
AtomicInteger iteratedCounts = new AtomicInteger();
84+
byteBuffers.forEach(r -> {
85+
iteratedCounts.getAndIncrement();
86+
});
87+
assertThat(iteratedCounts.get()).isEqualTo(0);
88+
}
89+
90+
}

0 commit comments

Comments
 (0)