Skip to content

Commit dc773ec

Browse files
committed
Fix SDK behavior when request content-length does not match the data length returned by the publisher.
This fixes two potential bugs: 1. A source of "checksum mismatch" exceptions (#953) when the published data length exceeds the content-length. 2. A source of "hung futures" (#2576?) when the published data is shorter than the content-length. This may not be the definitive fix for those two issues.
1 parent df41964 commit dc773ec

File tree

8 files changed

+246
-12
lines changed

8 files changed

+246
-12
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon S3",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Truncate the async request body when the content-length is shorter than the request body, instead of raising a \"Data read has a different checksum\" exception."
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon S3",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Raise an exception instead of hanging when a put-object content-length exceeds the data written by the async request body."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,8 +536,14 @@ public void onError(Throwable t) {
536536
@Override
537537
public void onComplete() {
538538
if (!done) {
539-
done = true;
540-
subscriber.onComplete();
539+
Long expectedContentLength = requestContentLength.orElse(null);
540+
if (expectedContentLength != null && written < expectedContentLength) {
541+
onError(new IllegalStateException("Request content was only " + written + " bytes, but the specified "
542+
+ "content-length was " + expectedContentLength + " bytes."));
543+
} else {
544+
done = true;
545+
subscriber.onComplete();
546+
}
541547
}
542548
}
543549
});

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,7 @@
522522
<excludes>
523523
<exclude>*.internal.*</exclude>
524524
<exclude>software.amazon.awssdk.thirdparty.*</exclude>
525-
<exclude>software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler</exclude>
526-
<exclude>software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler</exclude>
525+
<exclude>software.amazon.awssdk.services.s3.checksums.ChecksumCalculatingAsyncRequestBody</exclude>
527526
</excludes>
528527

529528
<excludeModules>

services/s3/src/main/java/software/amazon/awssdk/services/s3/checksums/ChecksumCalculatingAsyncRequestBody.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,26 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.util.Optional;
20+
import java.util.concurrent.atomic.AtomicLong;
2021
import org.reactivestreams.Subscriber;
2122
import org.reactivestreams.Subscription;
2223
import software.amazon.awssdk.annotations.SdkInternalApi;
2324
import software.amazon.awssdk.core.async.AsyncRequestBody;
2425
import software.amazon.awssdk.core.checksums.SdkChecksum;
26+
import software.amazon.awssdk.http.SdkHttpRequest;
2527
import software.amazon.awssdk.utils.BinaryUtils;
2628

2729
@SdkInternalApi
2830
public class ChecksumCalculatingAsyncRequestBody implements AsyncRequestBody {
29-
31+
private final Long contentLength;
3032
private final AsyncRequestBody wrapped;
3133
private final SdkChecksum sdkChecksum;
3234

33-
public ChecksumCalculatingAsyncRequestBody(AsyncRequestBody wrapped, SdkChecksum sdkChecksum) {
35+
public ChecksumCalculatingAsyncRequestBody(SdkHttpRequest request, AsyncRequestBody wrapped, SdkChecksum sdkChecksum) {
36+
this.contentLength = request.firstMatchingHeader("Content-Length")
37+
.map(Long::parseLong)
38+
.orElse(wrapped.contentLength()
39+
.orElse(null));
3440
this.wrapped = wrapped;
3541
this.sdkChecksum = sdkChecksum;
3642
}
@@ -48,18 +54,21 @@ public String contentType() {
4854
@Override
4955
public void subscribe(Subscriber<? super ByteBuffer> s) {
5056
sdkChecksum.reset();
51-
wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum));
57+
wrapped.subscribe(new ChecksumCalculatingSubscriber(s, sdkChecksum, contentLength));
5258
}
5359

5460
private static final class ChecksumCalculatingSubscriber implements Subscriber<ByteBuffer> {
55-
61+
private final AtomicLong contentRead = new AtomicLong(0);
5662
private final Subscriber<? super ByteBuffer> wrapped;
5763
private final SdkChecksum checksum;
64+
private final Long contentLength;
5865

5966
ChecksumCalculatingSubscriber(Subscriber<? super ByteBuffer> wrapped,
60-
SdkChecksum sdkChecksum) {
67+
SdkChecksum sdkChecksum,
68+
Long contentLength) {
6169
this.wrapped = wrapped;
6270
this.checksum = sdkChecksum;
71+
this.contentLength = contentLength;
6372
}
6473

6574
@Override
@@ -69,11 +78,34 @@ public void onSubscribe(Subscription s) {
6978

7079
@Override
7180
public void onNext(ByteBuffer byteBuffer) {
72-
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer);
73-
checksum.update(buf, 0, buf.length);
81+
int amountToReadFromByteBuffer = getAmountToReadFromByteBuffer(byteBuffer);
82+
83+
if (amountToReadFromByteBuffer > 0) {
84+
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
85+
checksum.update(buf, 0, amountToReadFromByteBuffer);
86+
}
87+
88+
7489
wrapped.onNext(byteBuffer);
7590
}
7691

92+
private int getAmountToReadFromByteBuffer(ByteBuffer byteBuffer) {
93+
// If content length is null, we should include everything in the checksum because the stream is essentially
94+
// unbounded.
95+
if (contentLength == null) {
96+
return byteBuffer.remaining();
97+
}
98+
99+
long amountReadSoFar = contentRead.getAndAdd(byteBuffer.remaining());
100+
long amountRemaining = Math.max(0, contentLength - amountReadSoFar);
101+
102+
if (amountRemaining > byteBuffer.remaining()) {
103+
return byteBuffer.remaining();
104+
} else {
105+
return Math.toIntExact(amountRemaining);
106+
}
107+
}
108+
77109
@Override
78110
public void onError(Throwable t) {
79111
wrapped.onError(t);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/handlers/AsyncChecksumValidationInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public Optional<AsyncRequestBody> modifyAsyncHttpContent(Context.ModifyHttpReque
5151
SdkChecksum checksum = new Md5Checksum();
5252
executionAttributes.putAttribute(ASYNC_RECORDING_CHECKSUM, true);
5353
executionAttributes.putAttribute(CHECKSUM, checksum);
54-
return Optional.of(new ChecksumCalculatingAsyncRequestBody(context.asyncRequestBody().get(), checksum));
54+
return Optional.of(new ChecksumCalculatingAsyncRequestBody(context.httpRequest(),
55+
context.asyncRequestBody().get(),
56+
checksum));
5557
}
5658

5759
return context.asyncRequestBody();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.functionaltests;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.any;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
22+
import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
23+
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
24+
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
27+
28+
import com.github.tomakehurst.wiremock.junit.WireMockRule;
29+
import java.net.URI;
30+
import java.nio.ByteBuffer;
31+
import java.util.Optional;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.TimeUnit;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
import org.reactivestreams.Subscriber;
37+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
38+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
39+
import software.amazon.awssdk.core.async.AsyncRequestBody;
40+
import software.amazon.awssdk.regions.Region;
41+
import software.amazon.awssdk.services.s3.S3AsyncClient;
42+
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
43+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
44+
45+
public class ContentLengthMismatchTest {
46+
@Rule
47+
public WireMockRule wireMock = new WireMockRule(0);
48+
49+
private S3AsyncClientBuilder getAsyncClientBuilder() {
50+
return S3AsyncClient.builder()
51+
.region(Region.US_EAST_1)
52+
.endpointOverride(endpoint())
53+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")));
54+
}
55+
56+
private URI endpoint() {
57+
return URI.create("http://localhost:" + wireMock.port());
58+
}
59+
60+
@Test
61+
public void checksumDoesNotExceedContentLengthHeaderForPuts() {
62+
String bucket = "Example-Bucket";
63+
String key = "Example-Object";
64+
String content = "Hello, World!";
65+
String eTag = "65A8E27D8879283831B664BD8B7F0AD4";
66+
67+
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withHeader("ETag", eTag)));
68+
69+
S3AsyncClient s3Client = getAsyncClientBuilder().build();
70+
71+
PutObjectResponse response =
72+
s3Client.putObject(r -> r.bucket(bucket).key(key).contentLength((long) content.length()),
73+
AsyncRequestBody.fromString(content + " Extra stuff!"))
74+
.join();
75+
76+
verify(putRequestedFor(anyUrl()).withRequestBody(equalTo(content)));
77+
assertThat(response.eTag()).isEqualTo(eTag);
78+
}
79+
@Test
80+
public void checksumDoesNotExceedAsyncRequestBodyLengthForPuts() {
81+
String bucket = "Example-Bucket";
82+
String key = "Example-Object";
83+
String content = "Hello, World!";
84+
String eTag = "65A8E27D8879283831B664BD8B7F0AD4";
85+
86+
stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withHeader("ETag", eTag)));
87+
88+
S3AsyncClient s3Client = getAsyncClientBuilder().build();
89+
90+
PutObjectResponse response =
91+
s3Client.putObject(r -> r.bucket(bucket).key(key),
92+
new AsyncRequestBody() {
93+
@Override
94+
public Optional<Long> contentLength() {
95+
return Optional.of((long) content.length());
96+
}
97+
98+
@Override
99+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
100+
AsyncRequestBody.fromString(content + " Extra stuff!").subscribe(subscriber);
101+
}
102+
})
103+
.join();
104+
105+
verify(putRequestedFor(anyUrl()).withRequestBody(equalTo(content)));
106+
assertThat(response.eTag()).isEqualTo(eTag);
107+
}
108+
109+
@Test
110+
public void contentShorterThanContentLengthHeaderFails() {
111+
String bucket = "Example-Bucket";
112+
String key = "Example-Object";
113+
114+
S3AsyncClient s3Client = getAsyncClientBuilder().build();
115+
116+
AsyncRequestBody requestBody = new AsyncRequestBody() {
117+
@Override
118+
public Optional<Long> contentLength() {
119+
return Optional.empty();
120+
}
121+
122+
@Override
123+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
124+
AsyncRequestBody.fromString("A").subscribe(subscriber);
125+
}
126+
};
127+
128+
assertThatThrownBy(() -> s3Client.putObject(r -> r.bucket(bucket).key(key).contentLength(2L), requestBody)
129+
.get(10, TimeUnit.SECONDS))
130+
.isInstanceOf(ExecutionException.class)
131+
.hasMessageContaining("content-length");
132+
}
133+
134+
@Test
135+
public void contentShorterThanRequestBodyLengthFails() {
136+
String bucket = "Example-Bucket";
137+
String key = "Example-Object";
138+
139+
S3AsyncClient s3Client = getAsyncClientBuilder().build();
140+
141+
AsyncRequestBody requestBody = new AsyncRequestBody() {
142+
@Override
143+
public Optional<Long> contentLength() {
144+
return Optional.of(2L);
145+
}
146+
147+
@Override
148+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
149+
AsyncRequestBody.fromString("A").subscribe(subscriber);
150+
}
151+
};
152+
153+
assertThatThrownBy(() -> s3Client.putObject(r -> r.bucket(bucket).key(key), requestBody)
154+
.get(10, TimeUnit.SECONDS))
155+
.isInstanceOf(ExecutionException.class)
156+
.hasMessageContaining("content-length");
157+
}
158+
159+
}

utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,28 @@ public static byte[] copyBytesFrom(ByteBuffer bb) {
206206
return dst;
207207
}
208208

209+
/**
210+
* This behaves identically do {@link #copyBytesFrom(ByteBuffer)}, except
211+
* that the readLimit acts as a limit to the number of bytes that should be
212+
* read from the byte buffer.
213+
*/
214+
public static byte[] copyBytesFrom(ByteBuffer bb, int readLimit) {
215+
if (bb == null) {
216+
return null;
217+
}
218+
219+
int numBytesToRead = Math.min(readLimit, bb.limit() - bb.position());
220+
221+
if (bb.hasArray()) {
222+
return Arrays.copyOfRange(
223+
bb.array(),
224+
bb.arrayOffset() + bb.position(),
225+
bb.arrayOffset() + bb.position() + numBytesToRead);
226+
}
227+
228+
byte[] dst = new byte[numBytesToRead];
229+
bb.asReadOnlyBuffer().get(dst);
230+
return dst;
231+
}
232+
209233
}

0 commit comments

Comments
 (0)