Skip to content

Commit c6b1184

Browse files
authored
Ensure onNext will be called even if publishing empty content and onC… (#4290)
* Ensure onNext will be called even if publishing empty content and onComplete is called directly * Adding changelog and removing unnecessary override
1 parent 7d54ff8 commit c6b1184

File tree

7 files changed

+195
-150
lines changed

7 files changed

+195
-150
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Sends final checksum chunk and trailer when only onComplete() is called by upstream (empty content)"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
19-
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength;
19+
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.LAST_CHUNK_LEN;
20+
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumTrailerLength;
2021
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChunkLength;
2122
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChecksumTrailer;
2223
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.createChunk;
@@ -28,11 +29,13 @@
2829
import org.reactivestreams.Subscription;
2930
import software.amazon.awssdk.annotations.SdkInternalApi;
3031
import software.amazon.awssdk.core.async.AsyncRequestBody;
32+
import software.amazon.awssdk.core.async.SdkPublisher;
3133
import software.amazon.awssdk.core.checksums.Algorithm;
3234
import software.amazon.awssdk.core.checksums.SdkChecksum;
3335
import software.amazon.awssdk.core.exception.SdkException;
3436
import software.amazon.awssdk.utils.BinaryUtils;
3537
import software.amazon.awssdk.utils.Validate;
38+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
3639
import software.amazon.awssdk.utils.builder.SdkBuilder;
3740

3841
/**
@@ -129,13 +132,12 @@ public ChecksumCalculatingAsyncRequestBody.Builder trailerHeader(String trailerH
129132

130133
@Override
131134
public Optional<Long> contentLength() {
132-
133135
if (wrapped.contentLength().isPresent() && algorithm != null) {
134136
return Optional.of(calculateChunkLength(wrapped.contentLength().get())
135-
+ calculateChecksumContentLength(algorithm, trailerHeader));
136-
} else {
137-
return wrapped.contentLength();
137+
+ LAST_CHUNK_LEN
138+
+ calculateChecksumTrailerLength(algorithm, trailerHeader));
138139
}
140+
return wrapped.contentLength();
139141
}
140142

141143
@Override
@@ -149,12 +151,15 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
149151
if (sdkChecksum != null) {
150152
sdkChecksum.reset();
151153
}
152-
153154
SynchronousChunkBuffer synchronousChunkBuffer = new SynchronousChunkBuffer(totalBytes);
154-
wrapped.flatMapIterable(synchronousChunkBuffer::buffer)
155+
alwaysInvokeOnNext(wrapped).flatMapIterable(synchronousChunkBuffer::buffer)
155156
.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, trailerHeader, totalBytes));
156157
}
157158

159+
private SdkPublisher<ByteBuffer> alwaysInvokeOnNext(SdkPublisher<ByteBuffer> source) {
160+
return subscriber -> source.subscribe(new OnNextGuaranteedSubscriber(subscriber));
161+
}
162+
158163
private static final class ChecksumCalculatingSubscriber implements Subscriber<ByteBuffer> {
159164

160165
private final Subscriber<? super ByteBuffer> wrapped;
@@ -243,4 +248,30 @@ private Iterable<ByteBuffer> buffer(ByteBuffer bytes) {
243248
}
244249
}
245250

251+
public static class OnNextGuaranteedSubscriber extends DelegatingSubscriber<ByteBuffer, ByteBuffer> {
252+
253+
private volatile boolean onNextInvoked;
254+
255+
public OnNextGuaranteedSubscriber(Subscriber<? super ByteBuffer> subscriber) {
256+
super(subscriber);
257+
}
258+
259+
@Override
260+
public void onNext(ByteBuffer t) {
261+
if (!onNextInvoked) {
262+
onNextInvoked = true;
263+
}
264+
265+
subscriber.onNext(t);
266+
}
267+
268+
@Override
269+
public void onComplete() {
270+
if (!onNextInvoked) {
271+
subscriber.onNext(ByteBuffer.wrap(new byte[0]));
272+
}
273+
super.onComplete();
274+
}
275+
}
276+
246277
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
2121
import static software.amazon.awssdk.core.HttpChecksumConstant.SIGNING_METHOD;
2222
import static software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE;
23-
import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength;
24-
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength;
23+
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumTrailerLength;
24+
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateStreamContentLength;
2525
import static software.amazon.awssdk.core.internal.util.HttpChecksumResolver.getResolvedChecksumSpecs;
2626
import static software.amazon.awssdk.http.Header.CONTENT_LENGTH;
2727

@@ -179,7 +179,7 @@ private void addFlexibleChecksumInTrailer(SdkHttpFullRequest.Builder request, Re
179179
}
180180
}
181181

182-
long checksumContentLength = calculateChecksumContentLength(checksumSpecs.algorithm(), checksumSpecs.headerName());
182+
long checksumContentLength = calculateChecksumTrailerLength(checksumSpecs.algorithm(), checksumSpecs.headerName());
183183
long contentLen = checksumContentLength + calculateStreamContentLength(originalContentLength, chunkSize);
184184

185185
request.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksumSpecs.headerName())

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsUnsignedChunkedEncodingInputStream.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.io.InputStream;
1919
import java.nio.charset.StandardCharsets;
2020
import software.amazon.awssdk.annotations.SdkInternalApi;
21-
import software.amazon.awssdk.core.checksums.Algorithm;
2221
import software.amazon.awssdk.core.checksums.SdkChecksum;
2322
import software.amazon.awssdk.core.exception.SdkClientException;
2423
import software.amazon.awssdk.core.internal.chunked.AwsChunkedEncodingConfig;
@@ -40,48 +39,6 @@ public static Builder builder() {
4039
return new Builder();
4140
}
4241

43-
/**
44-
* Calculates the content length for a given Algorithm and header name.
45-
*
46-
* @param algorithm Algorithm used.
47-
* @param headerName Header name.
48-
* @return Content length of the trailer that will be appended at the end.
49-
*/
50-
public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) {
51-
return headerName.length()
52-
+ HEADER_COLON_SEPARATOR.length()
53-
+ algorithm.base64EncodedLength().longValue()
54-
+ CRLF.length() + CRLF.length();
55-
}
56-
57-
/**
58-
*
59-
* @param originalContentLength Original Content length.
60-
* @return Calculatec Chunk Length with the chunk encoding format.
61-
*/
62-
private static long calculateChunkLength(long originalContentLength) {
63-
return Long.toHexString(originalContentLength).length()
64-
+ CRLF.length()
65-
+ originalContentLength
66-
+ CRLF.length();
67-
}
68-
69-
public static long calculateStreamContentLength(long originalLength, long defaultChunkSize) {
70-
if (originalLength < 0 || defaultChunkSize == 0) {
71-
throw new IllegalArgumentException(originalLength + ", " + defaultChunkSize + "Args <= 0 not expected");
72-
}
73-
74-
long maxSizeChunks = originalLength / defaultChunkSize;
75-
long remainingBytes = originalLength % defaultChunkSize;
76-
77-
long allChunks = maxSizeChunks * calculateChunkLength(defaultChunkSize);
78-
long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0;
79-
// last byte is composed of a "0" and "\r\n"
80-
long lastByteSize = 1 + (long) CRLF.length();
81-
82-
return allChunks + remainingInChunk + lastByteSize;
83-
}
84-
8542
@Override
8643
protected byte[] createFinalChunk(byte[] finalChunk) {
8744
StringBuilder chunkHeader = new StringBuilder();

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/util/ChunkContentUtils.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,35 +28,64 @@ public final class ChunkContentUtils {
2828
public static final String ZERO_BYTE = "0";
2929
public static final String CRLF = "\r\n";
3030

31+
public static final String LAST_CHUNK = ZERO_BYTE + CRLF;
32+
public static final long LAST_CHUNK_LEN = LAST_CHUNK.length();
33+
3134
private ChunkContentUtils() {
3235
}
3336

3437
/**
38+
* The chunk format is: chunk-size CRLF chunk-data CRLF.
39+
*
3540
* @param originalContentLength Original Content length.
36-
* @return Calculates Chunk Length.
41+
* @return the length of this chunk
3742
*/
3843
public static long calculateChunkLength(long originalContentLength) {
44+
if (originalContentLength == 0) {
45+
return 0;
46+
}
3947
return Long.toHexString(originalContentLength).length()
40-
+ CRLF.length()
41-
+ originalContentLength
42-
+ CRLF.length()
43-
+ ZERO_BYTE.length() + CRLF.length();
48+
+ CRLF.length()
49+
+ originalContentLength
50+
+ CRLF.length();
51+
}
52+
53+
/**
54+
* Calculates the content length for data that is divided into chunks.
55+
*
56+
* @param originalLength original content length.
57+
* @param chunkSize chunk size
58+
* @return Content length of the trailer that will be appended at the end.
59+
*/
60+
public static long calculateStreamContentLength(long originalLength, long chunkSize) {
61+
if (originalLength < 0 || chunkSize == 0) {
62+
throw new IllegalArgumentException(originalLength + ", " + chunkSize + "Args <= 0 not expected");
63+
}
64+
65+
long maxSizeChunks = originalLength / chunkSize;
66+
long remainingBytes = originalLength % chunkSize;
67+
68+
long allChunks = maxSizeChunks * calculateChunkLength(chunkSize);
69+
long remainingInChunk = remainingBytes > 0 ? calculateChunkLength(remainingBytes) : 0;
70+
// last byte is composed of a "0" and "\r\n"
71+
long lastByteSize = 1 + (long) CRLF.length();
72+
73+
return allChunks + remainingInChunk + lastByteSize;
4474
}
4575

4676
/**
47-
* Calculates the content length for a given Algorithm and header name.
77+
* Calculates the content length for a given algorithm and header name.
4878
*
4979
* @param algorithm Algorithm used.
5080
* @param headerName Header name.
5181
* @return Content length of the trailer that will be appended at the end.
5282
*/
53-
public static long calculateChecksumContentLength(Algorithm algorithm, String headerName) {
54-
int checksumLength = algorithm.base64EncodedLength();
55-
56-
return (headerName.length()
57-
+ HEADER_COLON_SEPARATOR.length()
58-
+ checksumLength
59-
+ CRLF.length() + CRLF.length());
83+
public static long calculateChecksumTrailerLength(Algorithm algorithm, String headerName) {
84+
return headerName.length()
85+
+ HEADER_COLON_SEPARATOR.length()
86+
+ algorithm.base64EncodedLength().longValue()
87+
+ CRLF.length()
88+
+ CRLF.length();
6089
}
6190

6291
/**
@@ -86,17 +115,13 @@ public static ByteBuffer createChunk(ByteBuffer chunkData, boolean isLastByte) {
86115
chunkHeader.append(CRLF);
87116
try {
88117
byte[] header = chunkHeader.toString().getBytes(StandardCharsets.UTF_8);
89-
// Last byte does not need additional \r\n trailer
90118
byte[] trailer = !isLastByte ? CRLF.getBytes(StandardCharsets.UTF_8)
91119
: "".getBytes(StandardCharsets.UTF_8);
92120
ByteBuffer chunkFormattedBuffer = ByteBuffer.allocate(header.length + chunkLength + trailer.length);
93-
chunkFormattedBuffer.put(header)
94-
.put(chunkData)
95-
.put(trailer);
121+
chunkFormattedBuffer.put(header).put(chunkData).put(trailer);
96122
chunkFormattedBuffer.flip();
97123
return chunkFormattedBuffer;
98124
} catch (Exception e) {
99-
// This is to warp BufferOverflowException,ReadOnlyBufferException to SdkClientException.
100125
throw SdkClientException.builder()
101126
.message("Unable to create chunked data. " + e.getMessage())
102127
.cause(e)

core/sdk-core/src/test/java/software/amazon/awssdk/core/checksum/AwsChunkedEncodingInputStreamTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.checksum;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumTrailerLength;
1920

2021
import java.io.ByteArrayInputStream;
2122
import java.io.IOException;
@@ -25,6 +26,7 @@
2526
import software.amazon.awssdk.core.checksums.SdkChecksum;
2627
import software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream;
2728
import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream;
29+
import software.amazon.awssdk.core.internal.util.ChunkContentUtils;
2830

2931
public class AwsChunkedEncodingInputStreamTest {
3032

@@ -55,10 +57,9 @@ public void readAwsUnsignedChunkedEncodingInputStream() throws IOException {
5557
public void lengthsOfCalculateByChecksumCalculatingInputStream(){
5658

5759
String initialString = "Hello world";
58-
long calculateChunkLength = AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength(initialString.length(),
59-
AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE);
60-
long checksumContentLength = AwsUnsignedChunkedEncodingInputStream.calculateChecksumContentLength(
61-
SHA256_ALGORITHM, SHA256_HEADER_NAME);
60+
long calculateChunkLength = ChunkContentUtils.calculateStreamContentLength(initialString.length(),
61+
AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE);
62+
long checksumContentLength = calculateChecksumTrailerLength(SHA256_ALGORITHM, SHA256_HEADER_NAME);
6263
assertThat(calculateChunkLength).isEqualTo(19);
6364
assertThat(checksumContentLength).isEqualTo(71);
6465
}

0 commit comments

Comments
 (0)