Skip to content

Various performance optimization for CRT sync HTTP client #4776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ final class ClasspathSdkHttpServiceProvider<T> implements SdkHttpServiceProvider

@Override
public Optional<T> loadService() {
Queue<T> impls = new PriorityQueue<>(Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass(),
Integer.MAX_VALUE)));
Queue<T> impls = new PriorityQueue<>(
Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass().getName(),
Integer.MAX_VALUE)));
Iterable<T> iterable = () -> serviceLoader.loadServices(serviceClass);
iterable.forEach(impl -> impls.add(impl));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.SdkHttpService;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
import software.amazon.awssdk.http.async.SdkAsyncHttpService;
Expand Down Expand Up @@ -77,7 +75,7 @@ public void multipleSyncImplementationsFound_ReturnHighestPriority() {
SdkHttpService mock = mock(SdkHttpService.class);

when(serviceLoader.loadServices(SdkHttpService.class))
.thenReturn(iteratorOf(apacheSdkHttpService, mock));
.thenReturn(iteratorOf(mock, apacheSdkHttpService));
assertThat(provider.loadService()).contains(apacheSdkHttpService);

SdkHttpService mock1 = mock(SdkHttpService.class);
Expand All @@ -93,7 +91,7 @@ public void multipleAsyncImplementationsFound_ReturnHighestPriority() {
SdkAsyncHttpService mock = mock(SdkAsyncHttpService.class);

when(serviceLoader.loadServices(SdkAsyncHttpService.class))
.thenReturn(iteratorOf(netty, mock));
.thenReturn(iteratorOf(mock, netty));
assertThat(asyncProvider.loadService()).contains(netty);

SdkAsyncHttpService mock1 = mock(SdkAsyncHttpService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SdkHttpResponse httpResponse() {
}

/**
* /** Get the {@link AbortableInputStream} associated with this response.
* Get the {@link AbortableInputStream} associated with this response.
*
* <p>Always close the "responseBody" input stream to release the underlying HTTP connection.
* Even for error responses, the SDK creates an input stream for reading error data. It is essential to close the input stream
Expand Down
6 changes: 0 additions & 6 deletions http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLHandshakeException;
Expand All @@ -33,25 +32,19 @@
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpManagerMetrics;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.utils.async.SimplePublisher;

@SdkInternalApi
public final class CrtRequestExecutor {
Expand Down Expand Up @@ -201,87 +194,6 @@ private void executeRequest(CrtAsyncRequestContext executionContext,
}
}

private static final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this class to a separate class.

private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
private volatile InputStreamSubscriber inputStreamSubscriber;
private volatile SimplePublisher<ByteBuffer> simplePublisher;

private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
private final HttpClientConnection crtConn;

InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
this.crtConn = crtConn;
this.requestCompletionFuture = requestCompletionFuture;
}

@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
if (blockType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
}
}

if (responseBuilder.statusCode() == 0) {
responseBuilder.statusCode(responseStatusCode);
}

}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
// always close the connection on a 5XX response code.
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
crtConn.shutdown();
}

if (errorCode == 0) {
requestCompletionFuture.complete(responseBuilder.build());
} else {
Throwable toThrow =
wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
requestCompletionFuture.completeExceptionally(toThrow);
}
stream.close();
crtConn.close();

if (simplePublisher != null) {
simplePublisher.complete();
}
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {

// Per the client conformance tests, unless there's an actual body the response stream must be null.
// This is function is only called in one of the CRT's IO threads, or is otherwise sequentially ordered.
// It is volatile in the declaration for thread visibility reasons, NOT concurrency control.
if (inputStreamSubscriber == null && simplePublisher == null) {
inputStreamSubscriber = new InputStreamSubscriber();
simplePublisher = new SimplePublisher<>();
simplePublisher.subscribe(inputStreamSubscriber);
responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber));
}

CompletableFuture<Void> writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn));

writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
requestCompletionFuture.completeExceptionally(failure);
return;
}

// increment the window upon buffer consumption.
stream.incrementWindow(bodyBytesIn.length);
});

// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
return 0;
}
}

private void executeRequest(CrtRequestContext executionContext,
CompletableFuture<SdkHttpFullResponse> requestFuture,
HttpClientConnection crtConn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Response handler adaptor for {@link AwsCrtAsyncHttpClient}.
* <p>
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
*/
@SdkInternalApi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal.response;

import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Response handler adaptor for {@link AwsCrtHttpClient}.
*/
@SdkInternalApi
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
private volatile InputStreamSubscriber inputStreamSubscriber;
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();

private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
private final HttpClientConnection crtConn;

public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
this.crtConn = crtConn;
this.requestCompletionFuture = requestCompletionFuture;
}

@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
if (blockType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
}
}

responseBuilder.statusCode(responseStatusCode);
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
if (inputStreamSubscriber == null) {
inputStreamSubscriber = new InputStreamSubscriber();
simplePublisher.subscribe(inputStreamSubscriber);
// For response with a payload, we need to complete the future here to allow downstream to retrieve the data from
// the stream directly.
responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber));
requestCompletionFuture.complete(responseBuilder.build());
}

CompletableFuture<Void> writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn));

if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
return bodyBytesIn.length;
}

writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
failFutureAndCloseConnection(stream, failure);
return;
}

// increment the window upon buffer consumption.
stream.incrementWindow(bodyBytesIn.length);
});

// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
return 0;
}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
if (errorCode == CRT.AWS_CRT_SUCCESS) {
onSuccessfulResponseComplete(stream);
} else {
onFailedResponseComplete(stream, errorCode);
}
}

private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
requestCompletionFuture.completeExceptionally(failure);
crtConn.shutdown();
crtConn.close();
stream.close();
}

private void onFailedResponseComplete(HttpStream stream, int errorCode) {
Throwable toThrow =
wrapWithIoExceptionIfRetryable(new HttpException(errorCode));

simplePublisher.error(toThrow);
failFutureAndCloseConnection(stream, toThrow);
}

private void onSuccessfulResponseComplete(HttpStream stream) {
// always close the connection on a 5XX response code.
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
crtConn.shutdown();
}

// For response without a payload, for example, S3 PutObjectResponse, we need to complete the future
// in onResponseComplete callback since onResponseBody will never be invoked.
requestCompletionFuture.complete(responseBuilder.build());
simplePublisher.complete();
crtConn.close();
stream.close();
}
}
6 changes: 6 additions & 0 deletions services/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
Expand Down
Loading