Skip to content

Various performance optimization for CRT sync HTTP client #4776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ final class ClasspathSdkHttpServiceProvider<T> implements SdkHttpServiceProvider

@Override
public Optional<T> loadService() {
Queue<T> impls = new PriorityQueue<>(Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass(),
Queue<T> impls = new PriorityQueue<>(Comparator.comparingInt(o -> httpServicesPriority.getOrDefault(o.getClass().getName(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the bug I found during testing that the priority order was not honored. I updated tests accordingly to catch this issue

Integer.MAX_VALUE)));
Iterable<T> iterable = () -> serviceLoader.loadServices(serviceClass);
iterable.forEach(impl -> impls.add(impl));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.SdkHttpService;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
import software.amazon.awssdk.http.async.SdkAsyncHttpService;
Expand Down Expand Up @@ -77,7 +75,7 @@ public void multipleSyncImplementationsFound_ReturnHighestPriority() {
SdkHttpService mock = mock(SdkHttpService.class);

when(serviceLoader.loadServices(SdkHttpService.class))
.thenReturn(iteratorOf(apacheSdkHttpService, mock));
.thenReturn(iteratorOf(mock, apacheSdkHttpService));
assertThat(provider.loadService()).contains(apacheSdkHttpService);

SdkHttpService mock1 = mock(SdkHttpService.class);
Expand All @@ -93,7 +91,7 @@ public void multipleAsyncImplementationsFound_ReturnHighestPriority() {
SdkAsyncHttpService mock = mock(SdkAsyncHttpService.class);

when(serviceLoader.loadServices(SdkAsyncHttpService.class))
.thenReturn(iteratorOf(netty, mock));
.thenReturn(iteratorOf(mock, netty));
assertThat(asyncProvider.loadService()).contains(netty);

SdkAsyncHttpService mock1 = mock(SdkAsyncHttpService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SdkHttpResponse httpResponse() {
}

/**
* /** Get the {@link AbortableInputStream} associated with this response.
* Get the {@link AbortableInputStream} associated with this response.
*
* <p>Always close the "responseBody" input stream to release the underlying HTTP connection.
* Even for error responses, the SDK creates an input stream for reading error data. It is essential to close the input stream
Expand Down
6 changes: 0 additions & 6 deletions http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLHandshakeException;
Expand All @@ -33,25 +32,19 @@
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpManagerMetrics;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.utils.async.SimplePublisher;

@SdkInternalApi
public final class CrtRequestExecutor {
Expand Down Expand Up @@ -201,87 +194,6 @@ private void executeRequest(CrtAsyncRequestContext executionContext,
}
}

private static final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this class to a separate class.

private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
private volatile InputStreamSubscriber inputStreamSubscriber;
private volatile SimplePublisher<ByteBuffer> simplePublisher;

private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
private final HttpClientConnection crtConn;

InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
this.crtConn = crtConn;
this.requestCompletionFuture = requestCompletionFuture;
}

@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
if (blockType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
}
}

if (responseBuilder.statusCode() == 0) {
responseBuilder.statusCode(responseStatusCode);
}

}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
// always close the connection on a 5XX response code.
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
crtConn.shutdown();
}

if (errorCode == 0) {
requestCompletionFuture.complete(responseBuilder.build());
} else {
Throwable toThrow =
wrapWithIoExceptionIfRetryable(new HttpException(errorCode));
requestCompletionFuture.completeExceptionally(toThrow);
}
stream.close();
crtConn.close();

if (simplePublisher != null) {
simplePublisher.complete();
}
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {

// Per the client conformance tests, unless there's an actual body the response stream must be null.
// This is function is only called in one of the CRT's IO threads, or is otherwise sequentially ordered.
// It is volatile in the declaration for thread visibility reasons, NOT concurrency control.
if (inputStreamSubscriber == null && simplePublisher == null) {
inputStreamSubscriber = new InputStreamSubscriber();
simplePublisher = new SimplePublisher<>();
simplePublisher.subscribe(inputStreamSubscriber);
responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber));
}

CompletableFuture<Void> writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn));

writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
requestCompletionFuture.completeExceptionally(failure);
return;
}

// increment the window upon buffer consumption.
stream.incrementWindow(bodyBytesIn.length);
});

// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
return 0;
}
}

private void executeRequest(CrtRequestContext executionContext,
CompletableFuture<SdkHttpFullResponse> requestFuture,
HttpClientConnection crtConn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Response handler adaptor for {@link AwsCrtAsyncHttpClient}.
* <p>
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
*/
@SdkInternalApi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt.internal.response;

import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Response handler adaptor for {@link AwsCrtHttpClient}.
*/
@SdkInternalApi
public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler {
private final SdkHttpFullResponse.Builder responseBuilder = SdkHttpFullResponse.builder();
private volatile InputStreamSubscriber inputStreamSubscriber;
private final SimplePublisher<ByteBuffer> simplePublisher = new SimplePublisher<>();

private final CompletableFuture<SdkHttpFullResponse> requestCompletionFuture;
private final HttpClientConnection crtConn;

public InputStreamAdaptingHttpStreamResponseHandler(HttpClientConnection crtConn,
CompletableFuture<SdkHttpFullResponse> requestCompletionFuture) {
this.crtConn = crtConn;
this.requestCompletionFuture = requestCompletionFuture;
}

@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
if (blockType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
}
}

responseBuilder.statusCode(responseStatusCode);
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
if (inputStreamSubscriber == null) {
inputStreamSubscriber = new InputStreamSubscriber();
simplePublisher.subscribe(inputStreamSubscriber);
responseBuilder.content(AbortableInputStream.create(inputStreamSubscriber));
}

CompletableFuture<Void> writeFuture = simplePublisher.send(ByteBuffer.wrap(bodyBytesIn));

if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
return bodyBytesIn.length;
}

writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
failFutureAndCloseConnection(stream, failure);
return;
}

// increment the window upon buffer consumption.
stream.incrementWindow(bodyBytesIn.length);
});

// the bodyBytesIn have not cleared the queues yet, so do let backpressure do its thing.
return 0;
}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
if (errorCode == CRT.AWS_CRT_SUCCESS) {
onSuccessfulResponseComplete(stream);
} else {
onFailedResponseComplete(stream, errorCode);
}
}

private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) {
requestCompletionFuture.completeExceptionally(failure);
crtConn.shutdown();
crtConn.close();
stream.close();
}

private void onFailedResponseComplete(HttpStream stream, int errorCode) {
Throwable toThrow =
wrapWithIoExceptionIfRetryable(new HttpException(errorCode));

simplePublisher.error(toThrow);
failFutureAndCloseConnection(stream, toThrow);
}

private void onSuccessfulResponseComplete(HttpStream stream) {
// always close the connection on a 5XX response code.
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
crtConn.shutdown();
}

requestCompletionFuture.complete(responseBuilder.build());
simplePublisher.complete();
crtConn.close();
stream.close();
}
}
6 changes: 6 additions & 0 deletions services/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import software.amazon.awssdk.core.ClientType;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.crt.Log;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.model.BucketLocationConstraint;
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
Expand All @@ -43,6 +45,8 @@ public class S3IntegrationTestBase extends AwsTestBase {
*/
protected static S3Client s3;

protected static S3Client s3WithCrtHttpClient;

protected static S3AsyncClient s3Async;

/**
Expand All @@ -51,9 +55,18 @@ public class S3IntegrationTestBase extends AwsTestBase {
*/
@BeforeClass
public static void setUp() throws Exception {
System.setProperty("aws.crt.debugnative", "true");
Log.initLoggingToStdout(Log.LogLevel.Warn);
s3 = s3ClientBuilder().build();
s3Async = s3AsyncClientBuilder().build();
s3WithCrtHttpClient = s3ClientBuilderWithCrtHttpClient().build();
}

@AfterClass
public static void cleanUpResources() {
s3.close();
s3Async.close();
s3WithCrtHttpClient.close();
}

protected static S3ClientBuilder s3ClientBuilder() {
Expand All @@ -64,6 +77,15 @@ protected static S3ClientBuilder s3ClientBuilder() {
new UserAgentVerifyingExecutionInterceptor("Apache", ClientType.SYNC)));
}

protected static S3ClientBuilder s3ClientBuilderWithCrtHttpClient() {
return S3Client.builder()
.region(DEFAULT_REGION)
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.httpClientBuilder(AwsCrtHttpClient.builder())
.overrideConfiguration(o -> o.addExecutionInterceptor(
new UserAgentVerifyingExecutionInterceptor("AwsCommonRuntime", ClientType.SYNC)));
}

protected static S3AsyncClientBuilder s3AsyncClientBuilder() {
return S3AsyncClient.builder()
.region(DEFAULT_REGION)
Expand Down
Loading