Skip to content

Commit ba4a221

Browse files
authored
Various performance optimization for CRT sync HTTP client (#4776)
* Various performance optimization for CRT sync HTTP client * Return as soon as the stream starts if there is a payload in sync http client
1 parent 58fd706 commit ba4a221

File tree

11 files changed

+269
-104
lines changed

11 files changed

+269
-104
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProvider.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ final class ClasspathSdkHttpServiceProvider<T> implements SdkHttpServiceProvider
6767

6868
@Override
6969
public Optional<T> loadService() {
70-
Queue<T> impls = new PriorityQueue<>(Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass(),
71-
Integer.MAX_VALUE)));
70+
Queue<T> impls = new PriorityQueue<>(
71+
Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass().getName(),
72+
Integer.MAX_VALUE)));
7273
Iterable<T> iterable = () -> serviceLoader.loadServices(serviceClass);
7374
iterable.forEach(impl -> impls.add(impl));
7475

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/loader/ClasspathSdkHttpServiceProviderTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.junit.runner.RunWith;
3030
import org.mockito.Mock;
3131
import org.mockito.junit.MockitoJUnitRunner;
32-
import software.amazon.awssdk.core.SdkSystemSetting;
33-
import software.amazon.awssdk.core.exception.SdkClientException;
3432
import software.amazon.awssdk.http.SdkHttpService;
3533
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
3634
import software.amazon.awssdk.http.async.SdkAsyncHttpService;
@@ -77,7 +75,7 @@ public void multipleSyncImplementationsFound_ReturnHighestPriority() {
7775
SdkHttpService mock = mock(SdkHttpService.class);
7876

7977
when(serviceLoader.loadServices(SdkHttpService.class))
80-
.thenReturn(iteratorOf(apacheSdkHttpService, mock));
78+
.thenReturn(iteratorOf(mock, apacheSdkHttpService));
8179
assertThat(provider.loadService()).contains(apacheSdkHttpService);
8280

8381
SdkHttpService mock1 = mock(SdkHttpService.class);
@@ -93,7 +91,7 @@ public void multipleAsyncImplementationsFound_ReturnHighestPriority() {
9391
SdkAsyncHttpService mock = mock(SdkAsyncHttpService.class);
9492

9593
when(serviceLoader.loadServices(SdkAsyncHttpService.class))
96-
.thenReturn(iteratorOf(netty, mock));
94+
.thenReturn(iteratorOf(mock, netty));
9795
assertThat(asyncProvider.loadService()).contains(netty);
9896

9997
SdkAsyncHttpService mock1 = mock(SdkAsyncHttpService.class);

http-client-spi/src/main/java/software/amazon/awssdk/http/HttpExecuteResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public SdkHttpResponse httpResponse() {
3838
}
3939

4040
/**
41-
* /** Get the {@link AbortableInputStream} associated with this response.
41+
* Get the {@link AbortableInputStream} associated with this response.
4242
*
4343
* <p>Always close the "responseBody" input stream to release the underlying HTTP connection.
4444
* Even for error responses, the SDK creates an input stream for reading error data. It is essential to close the input stream

http-clients/aws-crt-client/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,6 @@
151151
<version>${awsjavasdk.version}</version>
152152
<scope>test</scope>
153153
</dependency>
154-
<dependency>
155-
<groupId>software.amazon.awssdk</groupId>
156-
<artifactId>s3</artifactId>
157-
<version>${awsjavasdk.version}</version>
158-
<scope>test</scope>
159-
</dependency>
160154
<dependency>
161155
<groupId>software.amazon.awssdk</groupId>
162156
<artifactId>auth</artifactId>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 1 addition & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;
2525

2626
import java.io.IOException;
27-
import java.nio.ByteBuffer;
2827
import java.time.Duration;
2928
import java.util.concurrent.CompletableFuture;
3029
import javax.net.ssl.SSLHandshakeException;
@@ -33,25 +32,19 @@
3332
import software.amazon.awssdk.crt.http.HttpClientConnection;
3433
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
3534
import software.amazon.awssdk.crt.http.HttpException;
36-
import software.amazon.awssdk.crt.http.HttpHeader;
37-
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
3835
import software.amazon.awssdk.crt.http.HttpManagerMetrics;
3936
import software.amazon.awssdk.crt.http.HttpRequest;
40-
import software.amazon.awssdk.crt.http.HttpStream;
4137
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
42-
import software.amazon.awssdk.http.AbortableInputStream;
43-
import software.amazon.awssdk.http.HttpStatusFamily;
4438
import software.amazon.awssdk.http.SdkCancellationException;
4539
import software.amazon.awssdk.http.SdkHttpFullResponse;
4640
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
4741
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
4842
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
4943
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
44+
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
5045
import software.amazon.awssdk.metrics.MetricCollector;
5146
import software.amazon.awssdk.metrics.NoOpMetricCollector;
5247
import software.amazon.awssdk.utils.Logger;
53-
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
54-
import software.amazon.awssdk.utils.async.SimplePublisher;
5548

5649
@SdkInternalApi
5750
public final class CrtRequestExecutor {
@@ -201,87 +194,6 @@ private void executeRequest(CrtAsyncRequestContext executionContext,
201194
}
202195
}
203196

204-
private static final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
205-
private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
206-
private volatile InputStreamSubscriber inputStreamSubscriber;
207-
private volatile SimplePublisher<ByteBuffer> simplePublisher;
208-
209-
private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
210-
private final HttpClientConnection crtConn;
211-
212-
InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
213-
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
214-
this.crtConn = crtConn;
215-
this.requestCompletionFuture = requestCompletionFuture;
216-
}
217-
218-
@Override
219-
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
220-
HttpHeader[] nextHeaders) {
221-
if (blockType == HttpHeaderBlock.MAIN.getValue()) {
222-
for (HttpHeader h : nextHeaders) {
223-
responseBuilder.appendHeader(h.getName(), h.getValue());
224-
}
225-
}
226-
227-
if (responseBuilder.statusCode() == 0) {
228-
responseBuilder.statusCode(responseStatusCode);
229-
}
230-
231-
}
232-
233-
@Override
234-
public void onResponseComplete(HttpStream stream, int errorCode) {
235-
// always close the connection on a 5XX response code.
236-
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
237-
crtConn.shutdown();
238-
}
239-
240-
if (errorCode == 0) {
241-
requestCompletionFuture.complete(responseBuilder.build());
242-
} else {
243-
Throwable toThrow =
244-
wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
245-
requestCompletionFuture.completeExceptionally(toThrow);
246-
}
247-
stream.close();
248-
crtConn.close();
249-
250-
if (simplePublisher != null) {
251-
simplePublisher.complete();
252-
}
253-
}
254-
255-
@Override
256-
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
257-
258-
// Per the client conformance tests, unless there's an actual body the response stream must be null.
259-
// This is function is only called in one of the CRT's IO threads, or is otherwise sequentially ordered.
260-
// It is volatile in the declaration for thread visibility reasons, NOT concurrency control.
261-
if (inputStreamSubscriber == null && simplePublisher == null) {
262-
inputStreamSubscriber = new InputStreamSubscriber();
263-
simplePublisher = new SimplePublisher<>();
264-
simplePublisher.subscribe(inputStreamSubscriber);
265-
responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber));
266-
}
267-
268-
CompletableFuture<Void> writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn));
269-
270-
writeFuture.whenComplete((result, failure) -> {
271-
if (failure != null) {
272-
requestCompletionFuture.completeExceptionally(failure);
273-
return;
274-
}
275-
276-
// increment the window upon buffer consumption.
277-
stream.incrementWindow(bodyBytesIn.length);
278-
});
279-
280-
// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
281-
return 0;
282-
}
283-
}
284-
285197
private void executeRequest(CrtRequestContext executionContext,
286198
CompletableFuture<SdkHttpFullResponse> requestFuture,
287199
HttpClientConnection crtConn) {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
import software.amazon.awssdk.http.HttpStatusFamily;
3131
import software.amazon.awssdk.http.SdkHttpResponse;
3232
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
33+
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
3334
import software.amazon.awssdk.utils.Logger;
3435
import software.amazon.awssdk.utils.Validate;
3536
import software.amazon.awssdk.utils.async.SimplePublisher;
3637

3738
/**
39+
* Response handler adaptor for {@link AwsCrtAsyncHttpClient}.
40+
* <p>
3841
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
3942
*/
4043
@SdkInternalApi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal.response;
17+
18+
import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.concurrent.CompletableFuture;
22+
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.crt.CRT;
24+
import software.amazon.awssdk.crt.http.HttpClientConnection;
25+
import software.amazon.awssdk.crt.http.HttpException;
26+
import software.amazon.awssdk.crt.http.HttpHeader;
27+
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
28+
import software.amazon.awssdk.crt.http.HttpStream;
29+
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
30+
import software.amazon.awssdk.http.AbortableInputStream;
31+
import software.amazon.awssdk.http.HttpStatusFamily;
32+
import software.amazon.awssdk.http.SdkHttpFullResponse;
33+
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
34+
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
35+
import software.amazon.awssdk.utils.async.SimplePublisher;
36+
37+
/**
38+
* Response handler adaptor for {@link AwsCrtHttpClient}.
39+
*/
40+
@SdkInternalApi
41+
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
42+
private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
43+
private volatile InputStreamSubscriber inputStreamSubscriber;
44+
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();
45+
46+
private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
47+
private final HttpClientConnection crtConn;
48+
49+
public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
50+
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
51+
this.crtConn = crtConn;
52+
this.requestCompletionFuture = requestCompletionFuture;
53+
}
54+
55+
@Override
56+
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
57+
HttpHeader[] nextHeaders) {
58+
if (blockType == HttpHeaderBlock.MAIN.getValue()) {
59+
for (HttpHeader h : nextHeaders) {
60+
responseBuilder.appendHeader(h.getName(), h.getValue());
61+
}
62+
}
63+
64+
responseBuilder.statusCode(responseStatusCode);
65+
}
66+
67+
@Override
68+
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
69+
if (inputStreamSubscriber == null) {
70+
inputStreamSubscriber = new InputStreamSubscriber();
71+
simplePublisher.subscribe(inputStreamSubscriber);
72+
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
73+
// the stream directly.
74+
responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber));
75+
requestCompletionFuture.complete(responseBuilder.build());
76+
}
77+
78+
CompletableFuture<Void> writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn));
79+
80+
if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
81+
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
82+
return bodyBytesIn.length;
83+
}
84+
85+
writeFuture.whenComplete((result, failure) -> {
86+
if (failure != null) {
87+
failFutureAndCloseConnection(stream, failure);
88+
return;
89+
}
90+
91+
// increment the window upon buffer consumption.
92+
stream.incrementWindow(bodyBytesIn.length);
93+
});
94+
95+
// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
96+
return 0;
97+
}
98+
99+
@Override
100+
public void onResponseComplete(HttpStream stream, int errorCode) {
101+
if (errorCode == CRT.AWS_CRT_SUCCESS) {
102+
onSuccessfulResponseComplete(stream);
103+
} else {
104+
onFailedResponseComplete(stream, errorCode);
105+
}
106+
}
107+
108+
private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
109+
requestCompletionFuture.completeExceptionally(failure);
110+
crtConn.shutdown();
111+
crtConn.close();
112+
stream.close();
113+
}
114+
115+
private void onFailedResponseComplete(HttpStream stream, int errorCode) {
116+
Throwable toThrow =
117+
wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
118+
119+
simplePublisher.error(toThrow);
120+
failFutureAndCloseConnection(stream, toThrow);
121+
}
122+
123+
private void onSuccessfulResponseComplete(HttpStream stream) {
124+
// always close the connection on a 5XX response code.
125+
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
126+
crtConn.shutdown();
127+
}
128+
129+
// For response without a payload, for example, S3 PutObjectResponse, we need to complete the future
130+
// in onResponseComplete callback since onResponseBody will never be invoked.
131+
requestCompletionFuture.complete(responseBuilder.build());
132+
simplePublisher.complete();
133+
crtConn.close();
134+
stream.close();
135+
}
136+
}

services/s3/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@
174174
<version>${awsjavasdk.version}</version>
175175
<scope>test</scope>
176176
</dependency>
177+
<dependency>
178+
<groupId>software.amazon.awssdk</groupId>
179+
<artifactId>aws-crt-client</artifactId>
180+
<version>${awsjavasdk.version}</version>
181+
<scope>test</scope>
182+
</dependency>
177183
<dependency>
178184
<groupId>io.netty</groupId>
179185
<artifactId>netty-transport</artifactId>

0 commit comments

Comments
 (0)