Skip to content

Commit a34c488

Browse files
authored
Enable apache http client chunked encoding (#4198)
* Fix chunked-encoding in Apache client * Add test * Refactoring * Refactoring * Add tests * Refactoring * Uncomment UrlConnection integ test * Add orFailAfter to waiter * Update waiter
1 parent 4f59426 commit a34c488

File tree

8 files changed

+455
-5
lines changed

8 files changed

+455
-5
lines changed

http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/RepeatableInputStreamRequestEntity.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package software.amazon.awssdk.http.apache.internal;
1717

18+
import static software.amazon.awssdk.http.Header.CHUNKED;
19+
import static software.amazon.awssdk.http.Header.TRANSFER_ENCODING;
20+
1821
import java.io.ByteArrayInputStream;
1922
import java.io.IOException;
2023
import java.io.InputStream;
@@ -45,6 +48,11 @@ public class RepeatableInputStreamRequestEntity extends BasicHttpEntity {
4548
*/
4649
private boolean firstAttempt = true;
4750

51+
/**
52+
* True if the "Transfer-Encoding:chunked" header is present
53+
*/
54+
private boolean isChunked;
55+
4856
/**
4957
* The underlying InputStreamEntity being delegated to
5058
*/
@@ -74,7 +82,8 @@ public class RepeatableInputStreamRequestEntity extends BasicHttpEntity {
7482
* content length, and content).
7583
*/
7684
public RepeatableInputStreamRequestEntity(final HttpExecuteRequest request) {
77-
setChunked(false);
85+
isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
86+
setChunked(isChunked);
7887

7988
/*
8089
* If we don't specify a content length when we instantiate our
@@ -116,7 +125,7 @@ private InputStream getContent(Optional<ContentStreamProvider> contentStreamProv
116125

117126
@Override
118127
public boolean isChunked() {
119-
return false;
128+
return isChunked;
120129
}
121130

122131
/**

http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/impl/ApacheHttpRequestFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
@SdkInternalApi
4949
public class ApacheHttpRequestFactory {
5050

51-
private static final List<String> IGNORE_HEADERS = Arrays.asList(HttpHeaders.CONTENT_LENGTH, HttpHeaders.HOST);
51+
private static final List<String> IGNORE_HEADERS = Arrays.asList(HttpHeaders.CONTENT_LENGTH, HttpHeaders.HOST,
52+
HttpHeaders.TRANSFER_ENCODING);
5253

5354
public HttpRequestBase create(final HttpExecuteRequest request, final ApacheHttpRequestConfig requestConfig) {
5455
HttpRequestBase base = createApacheRequest(request, sanitizeUri(request.httpRequest()));
@@ -150,7 +151,7 @@ private HttpRequestBase wrapEntity(HttpExecuteRequest request,
150151
*/
151152
if (request.contentStreamProvider().isPresent()) {
152153
HttpEntity entity = new RepeatableInputStreamRequestEntity(request);
153-
if (!request.httpRequest().firstMatchingHeader(HttpHeaders.CONTENT_LENGTH).isPresent()) {
154+
if (!request.httpRequest().firstMatchingHeader(HttpHeaders.CONTENT_LENGTH).isPresent() && !entity.isChunked()) {
154155
entity = ApacheUtils.newBufferedHttpEntity(entity);
155156
}
156157
entityEnclosingRequest.setEntity(entity);

http-clients/apache-client/src/test/java/software/amazon/awssdk/http/apache/internal/impl/ApacheHttpRequestFactoryTest.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,25 @@
1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020
import static org.junit.jupiter.api.Assertions.assertNotNull;
2121

22+
import java.io.ByteArrayInputStream;
23+
import java.io.InputStream;
2224
import java.net.InetAddress;
2325
import java.net.URI;
26+
import java.nio.charset.StandardCharsets;
2427
import java.time.Duration;
2528
import org.apache.http.Header;
29+
import org.apache.http.HttpEntity;
2630
import org.apache.http.HttpHeaders;
31+
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
2732
import org.apache.http.client.methods.HttpRequestBase;
33+
import org.apache.http.entity.BufferedHttpEntity;
2834
import org.junit.jupiter.api.BeforeEach;
2935
import org.junit.jupiter.api.Test;
3036
import software.amazon.awssdk.http.HttpExecuteRequest;
3137
import software.amazon.awssdk.http.SdkHttpMethod;
3238
import software.amazon.awssdk.http.SdkHttpRequest;
3339
import software.amazon.awssdk.http.apache.internal.ApacheHttpRequestConfig;
40+
import software.amazon.awssdk.http.apache.internal.RepeatableInputStreamRequestEntity;
3441

3542
public class ApacheHttpRequestFactoryTest {
3643

@@ -49,7 +56,7 @@ public void setup() {
4956
}
5057

5158
@Test
52-
public void ceateSetsHostHeaderByDefault() {
59+
public void createSetsHostHeaderByDefault() {
5360
SdkHttpRequest sdkRequest = SdkHttpRequest.builder()
5461
.uri(URI.create("http://localhost:12345/"))
5562
.method(SdkHttpMethod.HEAD)
@@ -64,6 +71,29 @@ public void ceateSetsHostHeaderByDefault() {
6471
assertEquals("localhost:12345", hostHeaders[0].getValue());
6572
}
6673

74+
@Test
75+
public void putRequest_withTransferEncodingChunked_isChunkedAndDoesNotIncludeHeader() {
76+
SdkHttpRequest sdkRequest = SdkHttpRequest.builder()
77+
.uri(URI.create("http://localhost:12345/"))
78+
.method(SdkHttpMethod.PUT)
79+
.putHeader("Transfer-Encoding", "chunked")
80+
.build();
81+
InputStream inputStream = new ByteArrayInputStream("TestStream".getBytes(StandardCharsets.UTF_8));
82+
HttpExecuteRequest request = HttpExecuteRequest.builder()
83+
.request(sdkRequest)
84+
.contentStreamProvider(() -> inputStream)
85+
.build();
86+
HttpRequestBase result = instance.create(request, requestConfig);
87+
Header[] transferEncodingHeaders = result.getHeaders("Transfer-Encoding");
88+
assertThat(transferEncodingHeaders).isEmpty();
89+
90+
HttpEntityEnclosingRequestBase enclosingRequest = (HttpEntityEnclosingRequestBase) result;
91+
HttpEntity httpEntity = enclosingRequest.getEntity();
92+
assertThat(httpEntity.isChunked()).isTrue();
93+
assertThat(httpEntity).isNotInstanceOf(BufferedHttpEntity.class);
94+
assertThat(httpEntity).isInstanceOf(RepeatableInputStreamRequestEntity.class);
95+
}
96+
6797
@Test
6898
public void defaultHttpPortsAreNotInDefaultHostHeader() {
6999
SdkHttpRequest sdkRequest = SdkHttpRequest.builder()

services/mediastoredata/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,10 @@
6969
<version>${awsjavasdk.version}</version>
7070
<scope>test</scope>
7171
</dependency>
72+
<dependency>
73+
<groupId>org.apache.commons</groupId>
74+
<artifactId>commons-lang3</artifactId>
75+
<scope>test</scope>
76+
</dependency>
7277
</dependencies>
7378
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.mediastoredata;
17+
18+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19+
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
20+
21+
import io.reactivex.Flowable;
22+
import java.io.ByteArrayInputStream;
23+
import java.io.FilterInputStream;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.net.URI;
27+
import java.nio.ByteBuffer;
28+
import java.nio.charset.StandardCharsets;
29+
import java.time.Duration;
30+
import java.time.Instant;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.Optional;
34+
import org.apache.commons.lang3.RandomStringUtils;
35+
import org.junit.jupiter.api.AfterAll;
36+
import org.junit.jupiter.api.BeforeAll;
37+
import org.junit.jupiter.api.Test;
38+
import org.reactivestreams.Subscriber;
39+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
40+
import software.amazon.awssdk.core.async.AsyncRequestBody;
41+
import software.amazon.awssdk.core.interceptor.Context;
42+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
43+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
44+
import software.amazon.awssdk.core.sync.RequestBody;
45+
import software.amazon.awssdk.http.ContentStreamProvider;
46+
import software.amazon.awssdk.http.apache.ApacheHttpClient;
47+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
48+
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
49+
import software.amazon.awssdk.services.mediastore.MediaStoreClient;
50+
import software.amazon.awssdk.services.mediastore.model.Container;
51+
import software.amazon.awssdk.services.mediastore.model.ContainerStatus;
52+
import software.amazon.awssdk.services.mediastore.model.DescribeContainerResponse;
53+
import software.amazon.awssdk.services.mediastoredata.model.DeleteObjectRequest;
54+
import software.amazon.awssdk.services.mediastoredata.model.ObjectNotFoundException;
55+
import software.amazon.awssdk.services.mediastoredata.model.PutObjectRequest;
56+
import software.amazon.awssdk.testutils.Waiter;
57+
import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase;
58+
59+
/**
60+
* Integration test to verify Transfer-Encoding:chunked functionalities for all supported HTTP clients. Do not delete.
61+
*/
62+
public class TransferEncodingChunkedIntegrationTest extends AwsIntegrationTestBase {
63+
private static final String CONTAINER_NAME = "java-sdk-test-" + Instant.now().toEpochMilli();
64+
private static MediaStoreClient mediaStoreClient;
65+
private static MediaStoreDataClient syncClientWithApache;
66+
private static MediaStoreDataClient syncClientWithUrlConnection;
67+
private static MediaStoreDataAsyncClient asyncClientWithNetty;
68+
private static AwsCredentialsProvider credentialsProvider;
69+
private static Container container;
70+
private static PutObjectRequest putObjectRequest;
71+
private static DeleteObjectRequest deleteObjectRequest;
72+
73+
@BeforeAll
74+
public static void setup() {
75+
credentialsProvider = getCredentialsProvider();
76+
mediaStoreClient = MediaStoreClient.builder()
77+
.credentialsProvider(credentialsProvider)
78+
.httpClient(ApacheHttpClient.builder().build())
79+
.build();
80+
container = createContainer();
81+
URI uri = URI.create(container.endpoint());
82+
83+
syncClientWithApache = MediaStoreDataClient.builder()
84+
.endpointOverride(uri)
85+
.credentialsProvider(credentialsProvider)
86+
.httpClient(ApacheHttpClient.builder().build())
87+
.overrideConfiguration(o -> o.addExecutionInterceptor(new CaptureTransferEncodingHeaderInterceptor()))
88+
.build();
89+
90+
syncClientWithUrlConnection= MediaStoreDataClient.builder()
91+
.endpointOverride(uri)
92+
.credentialsProvider(credentialsProvider)
93+
.httpClient(UrlConnectionHttpClient.create())
94+
.overrideConfiguration(o -> o.addExecutionInterceptor(new CaptureTransferEncodingHeaderInterceptor()))
95+
.build();
96+
97+
asyncClientWithNetty = MediaStoreDataAsyncClient.builder()
98+
.endpointOverride(uri)
99+
.credentialsProvider(getCredentialsProvider())
100+
.httpClient(NettyNioAsyncHttpClient.create())
101+
.overrideConfiguration(o -> o.addExecutionInterceptor(new CaptureTransferEncodingHeaderInterceptor()))
102+
.build();
103+
104+
putObjectRequest = PutObjectRequest.builder()
105+
.contentType("application/octet-stream")
106+
.path("/foo")
107+
.build();
108+
109+
deleteObjectRequest = DeleteObjectRequest.builder()
110+
.path("/foo")
111+
.build();
112+
}
113+
114+
@AfterAll
115+
public static void tearDown() {
116+
syncClientWithApache.deleteObject(deleteObjectRequest);
117+
Waiter.run(() -> syncClientWithApache.describeObject(r -> r.path("/foo")))
118+
.untilException(ObjectNotFoundException.class)
119+
.orFailAfter(Duration.ofMinutes(1));
120+
CaptureTransferEncodingHeaderInterceptor.reset();
121+
}
122+
123+
@Test
124+
public void apacheClientPutObject_withoutContentLength_sendsSuccessfully() {
125+
TestContentProvider provider = new TestContentProvider(RandomStringUtils.random(1000).getBytes(StandardCharsets.UTF_8));
126+
syncClientWithApache.putObject(putObjectRequest, RequestBody.fromContentProvider(provider, "binary/octet-stream"));
127+
assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue();
128+
}
129+
130+
@Test
131+
public void urlConnectionClientPutObject_withoutContentLength_sendsSuccessfully() {
132+
TestContentProvider provider = new TestContentProvider(RandomStringUtils.random(1000).getBytes(StandardCharsets.UTF_8));
133+
syncClientWithUrlConnection.putObject(putObjectRequest, RequestBody.fromContentProvider(provider, "binary/octet-stream"));
134+
assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue();
135+
}
136+
137+
@Test
138+
public void nettyClientPutObject_withoutContentLength_sendsSuccessfully() {
139+
asyncClientWithNetty.putObject(putObjectRequest, customAsyncRequestBodyWithoutContentLength()).join();
140+
assertThat(CaptureTransferEncodingHeaderInterceptor.isChunked).isTrue();
141+
}
142+
143+
private static Container createContainer() {
144+
mediaStoreClient.createContainer(r -> r.containerName(CONTAINER_NAME));
145+
DescribeContainerResponse response = waitContainerToBeActive();
146+
return response.container();
147+
}
148+
149+
private static DescribeContainerResponse waitContainerToBeActive() {
150+
return Waiter.run(() -> mediaStoreClient.describeContainer(r -> r.containerName(CONTAINER_NAME)))
151+
.until(r -> ContainerStatus.ACTIVE.equals(r.container().status()))
152+
.orFailAfter(Duration.ofMinutes(3));
153+
}
154+
155+
private static class CaptureTransferEncodingHeaderInterceptor implements ExecutionInterceptor {
156+
private static boolean isChunked;
157+
158+
public static void reset() {
159+
isChunked = false;
160+
}
161+
162+
@Override
163+
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
164+
isChunked = context.httpRequest().matchingHeaders("Transfer-Encoding").contains("chunked");
165+
}
166+
}
167+
168+
private AsyncRequestBody customAsyncRequestBodyWithoutContentLength() {
169+
return new AsyncRequestBody() {
170+
@Override
171+
public Optional<Long> contentLength() {
172+
return Optional.empty();
173+
}
174+
175+
@Override
176+
public void subscribe(Subscriber<? super ByteBuffer> s) {
177+
Flowable.fromPublisher(AsyncRequestBody.fromBytes("Random text".getBytes()))
178+
.subscribe(s);
179+
}
180+
};
181+
}
182+
183+
private static class TestContentProvider implements ContentStreamProvider {
184+
private final byte[] content;
185+
private final List<CloseTrackingInputStream> createdStreams = new ArrayList<>();
186+
private CloseTrackingInputStream currentStream;
187+
188+
private TestContentProvider(byte[] content) {
189+
this.content = content;
190+
}
191+
192+
@Override
193+
public InputStream newStream() {
194+
if (currentStream != null) {
195+
invokeSafely(currentStream::close);
196+
}
197+
currentStream = new CloseTrackingInputStream(new ByteArrayInputStream(content));
198+
createdStreams.add(currentStream);
199+
return currentStream;
200+
}
201+
202+
List<CloseTrackingInputStream> getCreatedStreams() {
203+
return createdStreams;
204+
}
205+
}
206+
207+
private static class CloseTrackingInputStream extends FilterInputStream {
208+
private boolean isClosed = false;
209+
210+
CloseTrackingInputStream(InputStream in) {
211+
super(in);
212+
}
213+
214+
@Override
215+
public void close() throws IOException {
216+
super.close();
217+
isClosed = true;
218+
}
219+
220+
boolean isClosed() {
221+
return isClosed;
222+
}
223+
}
224+
}

test/protocol-tests/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@
147147
<artifactId>assertj-core</artifactId>
148148
<scope>test</scope>
149149
</dependency>
150+
<dependency>
151+
<groupId>org.apache.commons</groupId>
152+
<artifactId>commons-lang3</artifactId>
153+
<scope>test</scope>
154+
</dependency>
150155
<dependency>
151156
<groupId>org.mockito</groupId>
152157
<artifactId>mockito-core</artifactId>
@@ -194,6 +199,11 @@
194199
<version>${awsjavasdk.version}</version>
195200
<scope>test</scope>
196201
</dependency>
202+
<dependency>
203+
<groupId>io.reactivex.rxjava2</groupId>
204+
<artifactId>rxjava</artifactId>
205+
<scope>test</scope>
206+
</dependency>
197207
</dependencies>
198208

199209
<build>

0 commit comments

Comments
 (0)