Skip to content

Commit 9662737

Browse files
committed
Reusing .flatMapIterable API of SDKPublisher to create Chunks of fixed size
1 parent f02605b commit 9662737

File tree

10 files changed

+197
-250
lines changed

10 files changed

+197
-250
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChunk;
2222

2323
import java.nio.ByteBuffer;
24+
import java.util.ArrayList;
25+
import java.util.List;
2426
import java.util.Optional;
2527
import java.util.concurrent.atomic.AtomicLong;
2628
import org.reactivestreams.Subscriber;
@@ -32,7 +34,6 @@
3234
import software.amazon.awssdk.core.exception.SdkException;
3335
import software.amazon.awssdk.utils.BinaryUtils;
3436
import software.amazon.awssdk.utils.Validate;
35-
import software.amazon.awssdk.utils.async.ByteBufferingSubscriber;
3637
import software.amazon.awssdk.utils.builder.SdkBuilder;
3738

3839
/**
@@ -50,6 +51,8 @@ public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody {
5051
private final Algorithm algorithm;
5152
private final String trailerHeader;
5253
private final AtomicLong remainingBytes;
54+
private final long totalBytes;
55+
private final ByteBuffer currentBuffer;
5356

5457
private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) {
5558

@@ -60,8 +63,10 @@ private ChecksumCalculatingAsyncRequestBody(DefaultBuilder builder) {
6063
this.algorithm = builder.algorithm;
6164
this.sdkChecksum = builder.algorithm != null ? SdkChecksum.forAlgorithm(algorithm) : null;
6265
this.trailerHeader = builder.trailerHeader;
63-
this.remainingBytes = new AtomicLong(wrapped.contentLength()
64-
.orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied.")));
66+
this.totalBytes = wrapped.contentLength()
67+
.orElseThrow(() -> new UnsupportedOperationException("Content length must be supplied."));
68+
this.remainingBytes = new AtomicLong();
69+
this.currentBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
6570
}
6671

6772
/**
@@ -151,8 +156,11 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
151156
if (sdkChecksum != null) {
152157
sdkChecksum.reset();
153158
}
154-
wrapped.subscribe(new ByteBufferingSubscriber(
155-
new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, remainingBytes), DEFAULT_CHUNK_SIZE));
159+
160+
this.remainingBytes.set(totalBytes);
161+
162+
wrapped.flatMapIterable(this::bufferAndCreateChunks)
163+
.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes));
156164
}
157165

158166
private static final class ChecksumCalculatingSubscriber implements Subscriber<ByteBuffer> {
@@ -166,11 +174,11 @@ private static final class ChecksumCalculatingSubscriber implements Subscriber<B
166174

167175
ChecksumCalculatingSubscriber(Subscriber<? super ByteBuffer> wrapped,
168176
SdkChecksum checksum,
169-
String trailerHeader, AtomicLong remainingBytes) {
177+
String trailerHeader, long totalBytes) {
170178
this.wrapped = wrapped;
171179
this.checksum = checksum;
172180
this.trailerHeader = trailerHeader;
173-
this.remainingBytes = remainingBytes;
181+
this.remainingBytes = new AtomicLong(totalBytes);
174182
}
175183

176184
@Override
@@ -230,4 +238,48 @@ public void onComplete() {
230238
wrapped.onComplete();
231239
}
232240
}
241+
242+
private Iterable<ByteBuffer> bufferAndCreateChunks(ByteBuffer buffer) {
243+
int startPosition = 0;
244+
int currentBytesRead = buffer.remaining();
245+
246+
List<ByteBuffer> resultBufferedList = new ArrayList<>();
247+
do {
248+
int bufferedBytes = currentBuffer.position();
249+
int availableToRead = DEFAULT_CHUNK_SIZE - bufferedBytes;
250+
int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition);
251+
252+
if (bufferedBytes == 0) {
253+
currentBuffer.put(buffer.array(), startPosition, bytesToMove);
254+
} else {
255+
currentBuffer.put(buffer.array(), 0, bytesToMove);
256+
}
257+
258+
startPosition = startPosition + bytesToMove;
259+
260+
// Send the data once the buffer is full
261+
if (currentBuffer.position() == DEFAULT_CHUNK_SIZE) {
262+
currentBuffer.position(0);
263+
ByteBuffer bufferToSend = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
264+
bufferToSend.put(currentBuffer.array(), 0, DEFAULT_CHUNK_SIZE);
265+
bufferToSend.clear();
266+
currentBuffer.clear();
267+
resultBufferedList.add(bufferToSend);
268+
remainingBytes.addAndGet(-DEFAULT_CHUNK_SIZE);
269+
}
270+
271+
} while (startPosition < currentBytesRead);
272+
273+
int bufferedBytes = currentBuffer.position();
274+
// Send the remainder buffered bytes at the end when there no more bytes
275+
if (bufferedBytes > 0 && remainingBytes.get() == bufferedBytes) {
276+
currentBuffer.clear();
277+
ByteBuffer trimmedBuffer = ByteBuffer.allocate(bufferedBytes);
278+
trimmedBuffer.put(currentBuffer.array(), 0, bufferedBytes);
279+
trimmedBuffer.clear();
280+
resultBufferedList.add(trimmedBuffer);
281+
remainingBytes.addAndGet(-bufferedBytes);
282+
}
283+
return resultBufferedList;
284+
}
233285
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public static long calculateStreamContentLength(long originalLength, long defaul
7676

7777
long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize);
7878
long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0;
79-
long lastByteSize = (long) "0".length() + (long) CRLF.length();
79+
// last byte is composed of a "0" and "\r\n"
80+
long lastByteSize = 1 + (long) CRLF.length();
8081

8182
return allChunks + remainingInChunk + lastByteSize;
8283
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/AsyncHttpChecksumIntegrationTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.KB;
20-
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb;
21-
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters;
20+
import static software.amazon.awssdk.testutils.FileUtils.createDataOfSize;
21+
import static software.amazon.awssdk.testutils.FileUtils.fixedLengthInKbFileWithRandomOrFixedCharacters;
2222
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
2323

2424
import java.io.File;
@@ -118,13 +118,14 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallReques
118118

119119

120120
@ParameterizedTest
121-
@ValueSource(ints = {1, 12, 16, 17, 32, 33})
121+
//createDataOfSizeInKb already creates data in kb
122+
@ValueSource(ints = {1*KB, 3*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB})
122123
void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequestBody(int dataSize) throws InterruptedException {
123124
s3Async.putObject(PutObjectRequest.builder()
124125
.bucket(BUCKET)
125126
.key(KEY)
126127
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
127-
.build(), AsyncRequestBody.fromString(createDataOfSizeInKb(dataSize, 'a'))).join();
128+
.build(), AsyncRequestBody.fromString(createDataOfSize(dataSize, 'a'))).join();
128129
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
129130
assertThat(interceptor.requestChecksumInHeader()).isNull();
130131

@@ -133,14 +134,14 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest
133134
.build(), AsyncResponseTransformer.toBytes()).join().asUtf8String();
134135
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
135136
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
136-
assertThat(response).isEqualTo(createDataOfSizeInKb(dataSize, 'a'));
137+
assertThat(response).isEqualTo(createDataOfSize(dataSize, 'a'));
137138
}
138139

139140
@ParameterizedTest
140141
@ValueSource(ints = {1*KB, 12*KB, 16*KB, 17*KB, 32*KB, 33*KB})
141142
void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withDifferentChunkSize_OfFileAsyncFileRequestBody
142143
(int chunkSize) throws IOException {
143-
File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(64);
144+
File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(64, true);
144145
s3Async.putObject(PutObjectRequest.builder()
145146
.bucket(BUCKET)
146147
.key(KEY)
@@ -169,8 +170,8 @@ void asyncHttpsValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeRequest
169170
void asyncHttpsValidUnsignedTrailer_TwoRequests_withDifferentChunkSize_OfFileAsyncFileRequestBody(int chunkSize)
170171
throws IOException {
171172

172-
File randomFileOfFixedLengthOne = fixedLengthFileWithRandomCharacters(64);
173-
File randomFileOfFixedLengthTwo = fixedLengthFileWithRandomCharacters(17);
173+
File randomFileOfFixedLengthOne = fixedLengthInKbFileWithRandomOrFixedCharacters(64, true);
174+
File randomFileOfFixedLengthTwo = fixedLengthInKbFileWithRandomOrFixedCharacters(17, true);
174175
CompletableFuture<PutObjectResponse> putObjectFutureOne =
175176
s3Async.putObject(PutObjectRequest.builder()
176177
.bucket(BUCKET)

services/s3/src/it/java/software/amazon/awssdk/services/s3/checksum/HttpChecksumIntegrationTest.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
20-
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.createDataOfSizeInKb;
21-
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.fixedLengthFileWithRandomCharacters;
20+
import static software.amazon.awssdk.services.s3.utils.S3TestUtils.KB;
21+
import static software.amazon.awssdk.testutils.FileUtils.createDataOfSize;
22+
import static software.amazon.awssdk.testutils.FileUtils.fixedLengthInKbFileWithRandomOrFixedCharacters;
2223
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
2324

2425
import java.io.BufferedReader;
@@ -54,7 +55,7 @@
5455

5556
public class HttpChecksumIntegrationTest extends S3IntegrationTestBase {
5657

57-
public static final int HUGE_MSG_SIZE = 16384;
58+
public static final int HUGE_MSG_SIZE = 16384 * KB;
5859
protected static final String KEY = "some-key";
5960
private static final String BUCKET = temporaryBucketName(HttpChecksumIntegrationTest.class);
6061
public static CaptureChecksumValidationInterceptor interceptor = new CaptureChecksumValidationInterceptor();
@@ -278,7 +279,7 @@ public void syncUnsignedPayloadForHugeMessage() throws InterruptedException {
278279
.bucket(BUCKET)
279280
.key(KEY)
280281
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
281-
.build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')));
282+
.build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a')));
282283

283284
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
284285
assertThat(interceptor.requestChecksumInHeader()).isNull();
@@ -296,7 +297,7 @@ public void syncUnsignedPayloadForHugeMessage() throws InterruptedException {
296297
.collect(Collectors.joining("\n"));
297298
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
298299
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
299-
assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'));
300+
assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a'));
300301
}
301302

302303
@Test
@@ -305,7 +306,7 @@ public void syncSignedPayloadForHugeMessage() {
305306
.bucket(BUCKET)
306307
.key(KEY)
307308
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
308-
.build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')));
309+
.build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a')));
309310

310311
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
311312
assertThat(interceptor.requestChecksumInHeader()).isNull();
@@ -322,7 +323,7 @@ public void syncSignedPayloadForHugeMessage() {
322323
.collect(Collectors.joining("\n"));
323324
assertThat(interceptor.validationAlgorithm()).isEqualTo(Algorithm.CRC32);
324325
assertThat(interceptor.responseValidation()).isEqualTo(ChecksumValidation.VALIDATED);
325-
assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'));
326+
assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a'));
326327
}
327328

328329
@Test
@@ -331,7 +332,7 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep
331332
.bucket(BUCKET)
332333
.key(KEY)
333334
.checksumAlgorithm(ChecksumAlgorithm.CRC32)
334-
.build(), RequestBody.fromString(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a')));
335+
.build(), RequestBody.fromString(createDataOfSize(HUGE_MSG_SIZE, 'a')));
335336

336337
assertThat(interceptor.requestChecksumInTrailer()).isEqualTo("x-amz-checksum-crc32");
337338
assertThat(interceptor.requestChecksumInHeader()).isNull();
@@ -347,15 +348,15 @@ public void syncUnsignedPayloadMultiPartForHugeMessage() throws InterruptedExcep
347348

348349
assertThat(interceptor.validationAlgorithm()).isNull();
349350
assertThat(interceptor.responseValidation()).isNull();
350-
assertThat(text).isEqualTo(createDataOfSizeInKb(HUGE_MSG_SIZE, 'a'));
351+
assertThat(text).isEqualTo(createDataOfSize(HUGE_MSG_SIZE, 'a'));
351352
}
352353

353354

354355
@Test
355356
public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileRequestBody() throws InterruptedException,
356357
IOException {
357358

358-
File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(10);
359+
File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(10, true);
359360

360361
s3Https.putObject(PutObjectRequest.builder()
361362
.bucket(BUCKET)
@@ -381,7 +382,7 @@ public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withSmallFileR
381382

382383
@Test
383384
public void syncValidUnsignedTrailerChecksumCalculatedBySdkClient_withHugeFileRequestBody() throws IOException {
384-
File randomFileOfFixedLength = fixedLengthFileWithRandomCharacters(34);
385+
File randomFileOfFixedLength = fixedLengthInKbFileWithRandomOrFixedCharacters(34, true);
385386
s3Https.putObject(PutObjectRequest.builder()
386387
.bucket(BUCKET)
387388
.key(KEY)

services/s3/src/it/java/software/amazon/awssdk/services/s3/utils/S3TestUtils.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -176,24 +176,4 @@ public static void deleteBucketAndAllContents(S3Client s3, String bucketName) {
176176
}
177177
}
178178

179-
public static File fixedLengthFileWithRandomCharacters(int sizeInKb) throws IOException {
180-
File tempFile = File.createTempFile("s3-object-file-", ".tmp");
181-
try (RandomAccessFile randomAccessFile = new RandomAccessFile(tempFile, "rw")) {
182-
PrintWriter writer = new PrintWriter(tempFile, "UTF-8");
183-
int objectSize = sizeInKb * KB;
184-
Random random = new Random();
185-
for (int index = 0; index < objectSize; index ++) {
186-
writer.print(index % 5 == 0 ? ' ' : (char) ('a' + random.nextInt(26)));
187-
}
188-
writer.flush();
189-
}
190-
tempFile.deleteOnExit();
191-
return tempFile;
192-
}
193-
194-
public static String createDataOfSizeInKb(int dataSize, char contentCharacter) {
195-
return IntStream.range(0, dataSize * 1024).mapToObj(i -> String.valueOf(contentCharacter)).collect(Collectors.joining());
196-
}
197-
198-
199179
}

0 commit comments

Comments
 (0)