diff --git a/http-clients/aws-crt-client/pom.xml b/http-clients/aws-crt-client/pom.xml index 5bdb0b1dd194..55a360c5e780 100644 --- a/http-clients/aws-crt-client/pom.xml +++ b/http-clients/aws-crt-client/pom.xml @@ -33,7 +33,7 @@ software.amazon.awssdk.crt aws-crt - 0.3.35 + 0.5.1 diff --git a/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientCallingPatternIntegrationTest.java b/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientCallingPatternIntegrationTest.java index 2e505a9c6583..c128924cea4b 100644 --- a/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientCallingPatternIntegrationTest.java +++ b/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientCallingPatternIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -32,6 +32,8 @@ import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; @@ -95,8 +97,12 @@ private boolean testWithClient(KmsAsyncClient asyncKMSClient, int numberOfReques } private boolean testWithNewClient(int eventLoopSize, int numberOfRequests) { - try (SdkAsyncHttpClient newAwsCrtHttpClient = AwsCrtAsyncHttpClient.builder() - .eventLoopSize(eventLoopSize) + + try (EventLoopGroup eventLoopGroup = new EventLoopGroup(eventLoopSize); + HostResolver hostResolver = new HostResolver(eventLoopGroup); + SdkAsyncHttpClient newAwsCrtHttpClient = AwsCrtAsyncHttpClient.builder() + .eventLoopGroup(eventLoopGroup) + .hostResolver(hostResolver) .build()) { try (KmsAsyncClient newAsyncKMSClient = KmsAsyncClient.builder() .region(REGION) @@ -156,9 +162,12 @@ public void checkAllCombinations(@FromDataPoints("EventLoop") int eventLoopSize, .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, connectionPoolSize) .build(); + EventLoopGroup eventLoopGroup = new EventLoopGroup(eventLoopSize); + HostResolver hostResolver = new HostResolver(eventLoopGroup); SdkAsyncHttpClient awsCrtHttpClient = AwsCrtAsyncHttpClient.builder() - .eventLoopSize(eventLoopSize) + .eventLoopGroup(eventLoopGroup) + .hostResolver(hostResolver) .buildWithDefaults(attributes); KmsAsyncClient sharedAsyncKMSClient = KmsAsyncClient.builder() @@ -194,6 +203,9 @@ public void checkAllCombinations(@FromDataPoints("EventLoop") int eventLoopSize, awsCrtHttpClient.close(); Assert.assertFalse(failed.get()); + hostResolver.close(); + eventLoopGroup.close(); + CrtResource.waitForNoResources(); float numSeconds = (float) ((System.currentTimeMillis() - start) / 1000.0); diff --git a/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientKmsIntegrationTest.java b/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientKmsIntegrationTest.java index 74cda6550c09..4508bd7c6f9c 100644 --- a/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientKmsIntegrationTest.java +++ b/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientKmsIntegrationTest.java @@ -11,6 +11,8 @@ import org.junit.Test; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; import software.amazon.awssdk.crt.io.TlsCipherPreference; import software.amazon.awssdk.crt.io.TlsContextOptions; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -32,6 +34,8 @@ public class AwsCrtClientKmsIntegrationTest { private static String KEY_ALIAS = "alias/aws-sdk-java-v2-integ-test"; private static Region REGION = Region.US_EAST_1; private static List awsCrtHttpClients = new ArrayList<>(); + private static EventLoopGroup eventLoopGroup; + private static HostResolver hostResolver; @Before public void setup() { @@ -43,9 +47,13 @@ public void setup() { continue; } + int numThreads = 1; + eventLoopGroup = new EventLoopGroup(numThreads); + hostResolver = new HostResolver(eventLoopGroup); SdkAsyncHttpClient awsCrtHttpClient = AwsCrtAsyncHttpClient.builder() - .eventLoopSize(1) + .eventLoopGroup(eventLoopGroup) + .hostResolver(hostResolver) .build(); awsCrtHttpClients.add(awsCrtHttpClient); @@ -55,6 +63,8 @@ public void setup() { @After public void tearDown() { + hostResolver.close(); + eventLoopGroup.close(); CrtResource.waitForNoResources(); } diff --git a/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientS3IntegrationTest.java b/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientS3IntegrationTest.java index afc4a115dd5f..16d64821a3f1 100644 --- a/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientS3IntegrationTest.java +++ b/http-clients/aws-crt-client/src/it/java/software/amazon/awssdk/http/crt/AwsCrtClientS3IntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -30,6 +30,8 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -50,6 +52,8 @@ public class AwsCrtClientS3IntegrationTest { private static Region REGION = Region.US_EAST_1; + private static EventLoopGroup eventLoopGroup; + private static HostResolver hostResolver; private static SdkAsyncHttpClient crtClient; private static S3AsyncClient s3; @@ -58,8 +62,13 @@ public class AwsCrtClientS3IntegrationTest { public void setup() { CrtResource.waitForNoResources(); + int numThreads = 4; + eventLoopGroup = new EventLoopGroup(numThreads); + hostResolver = new HostResolver(eventLoopGroup); + crtClient = AwsCrtAsyncHttpClient.builder() - .eventLoopSize(4) + .eventLoopGroup(eventLoopGroup) + .hostResolver(hostResolver) .build(); s3 = S3AsyncClient.builder() @@ -73,7 +82,8 @@ public void setup() { public void tearDown() { s3.close(); crtClient.close(); - + hostResolver.close(); + eventLoopGroup.close(); CrtResource.waitForNoResources(); } @@ -108,4 +118,4 @@ public void testParallelDownloadFromS3() throws Exception { } } -} \ No newline at end of file +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java index b02bb647bcd9..69add45a203a 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java @@ -30,9 +30,12 @@ import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.crt.io.TlsCipherPreference; import software.amazon.awssdk.crt.io.TlsContext; @@ -74,15 +77,16 @@ public class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient { private final TlsContext tlsContext; private final int windowSize; private final int maxConnectionsPerEndpoint; - + private final boolean manualWindowManagement; public AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) { int maxConns = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS); Validate.isPositive(maxConns, "maxConns"); - Validate.isPositive(builder.eventLoopSize, "eventLoopSize"); Validate.notNull(builder.cipherPreference, "cipherPreference"); Validate.isPositive(builder.windowSize, "windowSize"); + Validate.notNull(builder.eventLoopGroup, "eventLoopGroup"); + Validate.notNull(builder.hostResolver, "hostResolver"); /** * Must call own() in same order that CrtResources are created in, so that they will be closed in reverse order. @@ -93,14 +97,21 @@ public AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) { * in the correct order. */ - bootstrap = own(new ClientBootstrap(builder.eventLoopSize)); - socketOptions = own(new SocketOptions()); - tlsContextOptions = own(TlsContextOptions.createDefaultClient().withCipherPreference(builder.cipherPreference)); - tlsContextOptions.setVerifyPeer(builder.verifyPeer); - tlsContext = own(new TlsContext(tlsContextOptions)); + this.bootstrap = own(new ClientBootstrap(builder.eventLoopGroup, builder.hostResolver)); + this.socketOptions = own(new SocketOptions()); + /** + * Sonar raises a false-positive that the TlsContextOptions created here will not be closed. Using a "NOSONAR" + * comment so that Sonar will ignore that false-positive. + */ + this.tlsContextOptions = own(TlsContextOptions.createDefaultClient() // NOSONAR + .withCipherPreference(builder.cipherPreference) + .withVerifyPeer(builder.verifyPeer)); + + this.tlsContext = own(new TlsContext(this.tlsContextOptions)); this.windowSize = builder.windowSize; this.maxConnectionsPerEndpoint = maxConns; + this.manualWindowManagement = builder.manualWindowManagement; } /** @@ -135,9 +146,16 @@ private HttpClientConnectionManager createConnectionPool(URI uri) { Validate.notNull(uri, "URI must not be null"); log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint); - return new HttpClientConnectionManager(bootstrap, socketOptions, tlsContext, uri, - HttpClientConnectionManager.DEFAULT_MAX_BUFFER_SIZE, windowSize, - maxConnectionsPerEndpoint); + HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions() + .withClientBootstrap(bootstrap) + .withSocketOptions(socketOptions) + .withTlsContext(tlsContext) + .withUri(uri) + .withWindowSize(windowSize) + .withMaxConnections(maxConnectionsPerEndpoint) + .withManualWindowManagement(manualWindowManagement); + + return HttpClientConnectionManager.create(options); } private HttpClientConnectionManager getOrCreateConnectionPool(URI uri) { @@ -195,7 +213,7 @@ private HttpHeader[] asArray(List crtHeaderList) { return crtHeaderList.toArray(new HttpHeader[crtHeaderList.size()]); } - private HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest) { + private HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest, AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter) { SdkHttpRequest sdkRequest = asyncRequest.request(); Validate.notNull(uri, "URI must not be null"); Validate.notNull(sdkRequest, "SdkHttpRequest must not be null"); @@ -208,7 +226,7 @@ private HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest) { HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(uri, asyncRequest)); - return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray); + return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, crtToSdkAdapter); } @Override @@ -223,8 +241,6 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { URI uri = toUri(asyncRequest.request()); HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(uri); - HttpRequest crtRequest = toCrtRequest(uri, asyncRequest); - CompletableFuture requestFuture = new CompletableFuture<>(); // When a Connection is ready from the Connection Pool, schedule the Request on the connection @@ -238,9 +254,10 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter = new AwsCrtAsyncHttpStreamAdapter(crtConn, requestFuture, asyncRequest, windowSize); + HttpRequest crtRequest = toCrtRequest(uri, asyncRequest, crtToSdkAdapter); // Submit the Request on this Connection - invokeSafely(() -> crtConn.makeRequest(crtRequest, crtToSdkAdapter)); + invokeSafely(() -> crtConn.makeRequest(crtRequest, crtToSdkAdapter).activate()); }); return requestFuture; @@ -264,36 +281,51 @@ public void close() { */ public interface Builder extends SdkAsyncHttpClient.Builder { - /** - * The number of Threads to use in the EventLoop. - * @param eventLoopSize The number of Threads to use in the EventLoop. - * @return the builder of the method chaining. - */ - Builder eventLoopSize(int eventLoopSize); - /** * The AWS CRT TlsCipherPreference to use for this Client * @param tlsCipherPreference The AWS Common Runtime TlsCipherPreference - * @return the builder of the method chaining. + * @return The builder of the method chaining. */ Builder tlsCipherPreference(TlsCipherPreference tlsCipherPreference); /** * Whether or not to Verify the Peer's TLS Certificate Chain. * @param verifyPeer true if the Certificate Chain should be validated, false if validation should be skipped. - * @return the builder of the method chaining. + * @return The builder of the method chaining. */ Builder verifyPeer(boolean verifyPeer); + /** + * If set to true, then the TCP read back pressure mechanism will be enabled, and the user + * is responsible for calling incrementWindow on the stream object. + * @param manualWindowManagement true if the TCP back pressure mechanism should be enabled. + * @return The builder of the method chaining. + */ + Builder manualWindowManagement(boolean manualWindowManagement); + /** * The AWS CRT WindowSize to use for this HttpClient. This represents the number of unread bytes that can be * buffered in the ResponseBodyPublisher before we stop reading from the underlying TCP socket and wait for * the Subscriber to read more data. * * @param windowSize The AWS Common Runtime WindowSize - * @return the builder of the method chaining. + * @return The builder of the method chaining. */ Builder windowSize(int windowSize); + + /** + * The AWS CRT EventLoopGroup to use for this Client. + * @param eventLoopGroup The AWS CRT EventLoopGroup to use for this client. + * @return The builder of the method chaining. + */ + Builder eventLoopGroup(EventLoopGroup eventLoopGroup); + + /** + * The AWS CRT HostResolver to use for this Client. + * @param hostResolver The AWS CRT HostResolver to use for this client. + * @return The builder of the method chaining. + */ + Builder hostResolver(HostResolver hostResolver); } /** @@ -302,10 +334,12 @@ public interface Builder extends SdkAsyncHttpClient.Builder { private final AtomicInteger mutualRecursionDepth = new AtomicInteger(0); private final AtomicInteger queuedBytes = new AtomicInteger(0); private final AtomicReference> subscriberRef = new AtomicReference<>(null); - private final Queue queuedBuffers = new ConcurrentLinkedQueue<>(); + private final Queue queuedBuffers = new ConcurrentLinkedQueue<>(); private final AtomicReference error = new AtomicReference<>(null); /** @@ -97,18 +97,18 @@ public void subscribe(Subscriber subscriber) { * Adds a Buffer to the Queue to be published to any Subscribers * @param buffer The Buffer to be queued. */ - public void queueBuffer(ByteBuffer buffer) { + public void queueBuffer(byte[] buffer) { Validate.notNull(buffer, "ByteBuffer must not be null"); if (isCancelled.get()) { // Immediately open HttpStream's IO window so it doesn't see any IO Back-pressure. // AFAIK there's no way to abort an in-progress HttpStream, only free it's memory by calling close() - stream.incrementWindow(buffer.remaining()); + stream.incrementWindow(buffer.length); return; } queuedBuffers.add(buffer); - int totalBytesQueued = queuedBytes.addAndGet(buffer.remaining()); + int totalBytesQueued = queuedBytes.addAndGet(buffer.length); if (totalBytesQueued > windowSize) { throw new IllegalStateException("Queued more than Window Size: queued=" + totalBytesQueued @@ -132,6 +132,12 @@ protected void request(long n) { outstandingReqs = outstandingRequests.addAndGet(n); } + /* + * Since we buffer, in the case where the subscriber came in after the publication has already begun, + * go ahead and flush what we have. + */ + publishToSubscribers(); + log.trace(() -> "Subscriber Requested more Buffers. Outstanding Requests: " + outstandingReqs); } @@ -230,10 +236,10 @@ protected synchronized void publishToSubscribers() { int totalAmountTransferred = 0; while (outstandingRequests.get() > 0 && queuedBuffers.size() > 0) { - ByteBuffer buffer = queuedBuffers.poll(); + byte[] buffer = queuedBuffers.poll(); outstandingRequests.getAndUpdate(DECREMENT_IF_GREATER_THAN_ZERO); - int amount = buffer.remaining(); - publishWithoutMutualRecursion(subscriberRef.get(), buffer); + int amount = buffer.length; + publishWithoutMutualRecursion(subscriberRef.get(), ByteBuffer.wrap(buffer)); totalAmountTransferred += amount; } @@ -248,7 +254,7 @@ protected synchronized void publishToSubscribers() { } } - // Check if Complete + // Check if Complete, consider no subscriber as a completion. if (queueComplete.get() && queuedBuffers.size() == 0) { completeSubscriptionExactlyOnce(); } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java index 12d4d80c8d4d..8bec55c03b66 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -37,8 +37,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; + import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -46,6 +46,8 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.http.SdkHttpMethod; import software.amazon.awssdk.http.SdkHttpRequest; @@ -64,19 +66,30 @@ public class AwsCrtHttpClientSpiVerificationTest { .dynamicPort() .dynamicHttpsPort()); + private EventLoopGroup eventLoopGroup; + private HostResolver hostResolver; private SdkAsyncHttpClient client; @Before public void setup() throws Exception { CrtResource.waitForNoResources(); + int numThreads = Runtime.getRuntime().availableProcessors(); + eventLoopGroup = new EventLoopGroup(numThreads); + hostResolver = new HostResolver(eventLoopGroup); + client = AwsCrtAsyncHttpClient.builder() + .eventLoopGroup(eventLoopGroup) + .hostResolver(hostResolver) + .manualWindowManagement(true) .build(); } @After public void tearDown() { client.close(); + hostResolver.close(); + eventLoopGroup.close(); CrtResource.waitForNoResources(); } @@ -201,6 +214,25 @@ private void makePutRequest(String path, byte[] reqBody, int expectedStatus) thr final AtomicReference response = new AtomicReference<>(null); final AtomicReference error = new AtomicReference<>(null); + Subscriber subscriber = new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onComplete() { + } + }; + SdkAsyncHttpResponseHandler handler = new SdkAsyncHttpResponseHandler() { @Override public void onHeaders(SdkHttpResponse headers) { @@ -208,6 +240,7 @@ public void onHeaders(SdkHttpResponse headers) { } @Override public void onStream(Publisher stream) { + stream.subscribe(subscriber); streamReceived.complete(true); } @@ -239,7 +272,6 @@ public void testPutRequest() throws Exception { stubFor(any(urlEqualTo(pathExpect200)).withRequestBody(binaryEqualTo(expectedBody)).willReturn(aResponse().withStatus(200))); makePutRequest(pathExpect200, expectedBody, 200); - String pathExpect404 = "/testPutRequest/return_404_always"; byte[] randomBody = generateRandomBody(TEST_BODY_LEN); stubFor(any(urlEqualTo(pathExpect404)).willReturn(aResponse().withStatus(404))); diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java index 8989aca0471d..143f1e7b591b 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ public Publisher createPublisher(long elements) { AwsCrtResponseBodyPublisher bodyPublisher = new AwsCrtResponseBodyPublisher(connection, stream, new CompletableFuture<>(), Integer.MAX_VALUE); for (long i = 0; i < elements; i++) { - bodyPublisher.queueBuffer(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes())); + bodyPublisher.queueBuffer(UUID.randomUUID().toString().getBytes()); } bodyPublisher.setQueueComplete(); diff --git a/test/sdk-benchmarks/pom.xml b/test/sdk-benchmarks/pom.xml index 5a2a97f6cfb8..b7009d7935c8 100755 --- a/test/sdk-benchmarks/pom.xml +++ b/test/sdk-benchmarks/pom.xml @@ -204,7 +204,7 @@ software.amazon.awssdk.crt aws-crt - 0.3.35 + 0.5.1 compile diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientBenchmark.java index 377446824981..36402b2f61e0 100644 --- a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientBenchmark.java +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientBenchmark.java @@ -42,6 +42,8 @@ import org.openjdk.jmh.runner.options.OptionsBuilder; import software.amazon.awssdk.benchmark.apicall.httpclient.SdkHttpClientBenchmark; import software.amazon.awssdk.benchmark.utils.MockServer; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; @@ -59,14 +61,22 @@ public class AwsCrtClientBenchmark implements SdkHttpClientBenchmark { private MockServer mockServer; private SdkAsyncHttpClient sdkHttpClient; private ProtocolRestJsonAsyncClient client; + private EventLoopGroup eventLoopGroup; + private HostResolver hostResolver; @Setup(Level.Trial) public void setup() throws Exception { mockServer = new MockServer(); mockServer.start(); + int numThreads = Runtime.getRuntime().availableProcessors(); + eventLoopGroup = new EventLoopGroup(numThreads); + hostResolver = new HostResolver(eventLoopGroup); + sdkHttpClient = AwsCrtAsyncHttpClient.builder() .verifyPeer(false) + .eventLoopGroup(this.eventLoopGroup) + .hostResolver(this.hostResolver) .build(); client = ProtocolRestJsonAsyncClient.builder() @@ -83,6 +93,8 @@ public void tearDown() throws Exception { mockServer.stop(); client.close(); sdkHttpClient.close(); + hostResolver.close(); + eventLoopGroup.close(); } @Override diff --git a/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientNonTlsBenchmark.java b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientNonTlsBenchmark.java new file mode 100644 index 000000000000..87c8c4730848 --- /dev/null +++ b/test/sdk-benchmarks/src/main/java/software/amazon/awssdk/benchmark/apicall/httpclient/async/AwsCrtClientNonTlsBenchmark.java @@ -0,0 +1,128 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.benchmark.apicall.httpclient.async; + +import static software.amazon.awssdk.benchmark.utils.BenchmarkConstant.CONCURRENT_CALLS; +import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.awaitCountdownLatchUninterruptibly; +import static software.amazon.awssdk.benchmark.utils.BenchmarkUtils.countDownUponCompletion; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.profile.StackProfiler; +import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import software.amazon.awssdk.benchmark.apicall.httpclient.SdkHttpClientBenchmark; +import software.amazon.awssdk.benchmark.utils.MockServer; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; + +/** + * Using aws-crt-client to test against local mock https server. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 15, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) +@Fork(2) // To reduce difference between each run +@BenchmarkMode(Mode.Throughput) +public class AwsCrtClientNonTlsBenchmark implements SdkHttpClientBenchmark { + + private MockServer mockServer; + private SdkAsyncHttpClient sdkHttpClient; + private ProtocolRestJsonAsyncClient client; + private EventLoopGroup eventLoopGroup; + private HostResolver hostResolver; + + @Setup(Level.Trial) + public void setup() throws Exception { + mockServer = new MockServer(); + mockServer.start(); + + int numThreads = Runtime.getRuntime().availableProcessors(); + eventLoopGroup = new EventLoopGroup(numThreads); + hostResolver = new HostResolver(eventLoopGroup); + + sdkHttpClient = AwsCrtAsyncHttpClient.builder() + .verifyPeer(false) + .eventLoopGroup(eventLoopGroup) + .hostResolver(hostResolver) + .build(); + + client = ProtocolRestJsonAsyncClient.builder() + .endpointOverride(mockServer.getHttpUri()) + .httpClient(sdkHttpClient) + .build(); + + // Making sure the request actually succeeds + client.allTypes().join(); + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception { + mockServer.stop(); + client.close(); + sdkHttpClient.close(); + hostResolver.close(); + eventLoopGroup.close(); + } + + @Override + @Benchmark + @OperationsPerInvocation(CONCURRENT_CALLS) + public void concurrentApiCall(Blackhole blackhole) { + CountDownLatch countDownLatch = new CountDownLatch(CONCURRENT_CALLS); + for (int i = 0; i < CONCURRENT_CALLS; i++) { + countDownUponCompletion(blackhole, client.allTypes(), countDownLatch); + } + + awaitCountdownLatchUninterruptibly(countDownLatch, 10, TimeUnit.SECONDS); + + } + + @Override + @Benchmark + public void sequentialApiCall(Blackhole blackhole) { + CountDownLatch countDownLatch = new CountDownLatch(1); + countDownUponCompletion(blackhole, client.allTypes(), countDownLatch); + awaitCountdownLatchUninterruptibly(countDownLatch, 1, TimeUnit.SECONDS); + } + + public static void main(String... args) throws Exception { + Options opt = new OptionsBuilder() + .include(AwsCrtClientNonTlsBenchmark.class.getSimpleName()) + .addProfiler(StackProfiler.class) + .build(); + Collection run = new Runner(opt).run(); + } +} diff --git a/test/tests-coverage-reporting/pom.xml b/test/tests-coverage-reporting/pom.xml index fef40a3d8c77..9f1f1f1c161e 100644 --- a/test/tests-coverage-reporting/pom.xml +++ b/test/tests-coverage-reporting/pom.xml @@ -102,6 +102,11 @@ software.amazon.awssdk ${awsjavasdk.version} + + aws-crt-client + software.amazon.awssdk + ${awsjavasdk.version} + url-connection-client software.amazon.awssdk @@ -244,4 +249,4 @@ - \ No newline at end of file +