> f: requestFutures) {
+ f.join();
+ Assert.assertEquals(32, f.get().asByteArray().length);
+ }
+ }
+
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java
new file mode 100644
index 000000000000..f9491676f4d9
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java
@@ -0,0 +1,383 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt;
+
+import static software.amazon.awssdk.utils.CollectionUtils.isNullOrEmpty;
+import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import software.amazon.awssdk.annotations.SdkPublicApi;
+import software.amazon.awssdk.crt.CrtResource;
+import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
+import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions;
+import software.amazon.awssdk.crt.http.HttpHeader;
+import software.amazon.awssdk.crt.http.HttpRequest;
+import software.amazon.awssdk.crt.io.ClientBootstrap;
+import software.amazon.awssdk.crt.io.EventLoopGroup;
+import software.amazon.awssdk.crt.io.HostResolver;
+import software.amazon.awssdk.crt.io.SocketOptions;
+import software.amazon.awssdk.crt.io.TlsCipherPreference;
+import software.amazon.awssdk.crt.io.TlsContext;
+import software.amazon.awssdk.crt.io.TlsContextOptions;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.SdkHttpRequest;
+import software.amazon.awssdk.http.async.AsyncExecuteRequest;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.crt.internal.AwsCrtAsyncHttpStreamAdapter;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.IoUtils;
+import software.amazon.awssdk.utils.Logger;
+import software.amazon.awssdk.utils.Validate;
+import software.amazon.awssdk.utils.http.SdkHttpUtils;
+
+/**
+ * An implementation of {@link SdkHttpClient} that uses the AWS Common Runtime (CRT) Http Client to communicate with
+ * Http Web Services. This client is asynchronous and uses non-blocking IO.
+ *
+ * This can be created via {@link #builder()}
+ */
+@SdkPublicApi
+public class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
+ private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpClient.class);
+ private static final String HOST_HEADER = "Host";
+ private static final String CONTENT_LENGTH = "Content-Length";
+ private static final String CONNECTION = "Connection";
+ private static final String KEEP_ALIVE = "keep-alive";
+ private static final String AWS_COMMON_RUNTIME = "AwsCommonRuntime";
+ private static final int DEFAULT_STREAM_WINDOW_SIZE = 16 * 1024 * 1024; // 16 MB
+
+ private final Map connectionPools = new ConcurrentHashMap<>();
+ private final LinkedList ownedSubResources = new LinkedList<>();
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final ClientBootstrap bootstrap;
+ private final SocketOptions socketOptions;
+ private final TlsContextOptions tlsContextOptions;
+ private final TlsContext tlsContext;
+ private final int windowSize;
+ private final int maxConnectionsPerEndpoint;
+
+
+ public AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
+ int maxConns = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
+
+ Validate.isPositive(maxConns, "maxConns");
+ Validate.notNull(builder.cipherPreference, "cipherPreference");
+ Validate.isPositive(builder.windowSize, "windowSize");
+ Validate.notNull(builder.eventLoopGroup, "eventLoopGroup");
+ Validate.notNull(builder.hostResolver, "hostResolver");
+
+ /**
+ * Must call own() in same order that CrtResources are created in, so that they will be closed in reverse order.
+ *
+ * Do NOT use Dependency Injection for Native CrtResources. It's possible to crash the JVM Process if Native
+ * Resources are closed in the wrong order (Eg closing the Bootstrap/Threadpool when there are still open
+ * connections). By creating and owning our own Native CrtResources we can guarantee that things are shutdown
+ * in the correct order.
+ */
+
+ this.bootstrap = own(new ClientBootstrap(builder.eventLoopGroup, builder.hostResolver));
+ this.socketOptions = own(new SocketOptions());
+
+ /**
+ * Sonar raises a false-positive that the TlsContextOptions created here will not be closed. Using a "NOSONAR"
+ * comment so that Sonar will ignore that false-positive.
+ */
+ this.tlsContextOptions = own(TlsContextOptions.createDefaultClient() // NOSONAR
+ .withCipherPreference(builder.cipherPreference)
+ .withVerifyPeer(builder.verifyPeer));
+
+ this.tlsContext = own(new TlsContext(this.tlsContextOptions));
+ this.windowSize = builder.windowSize;
+ this.maxConnectionsPerEndpoint = maxConns;
+ }
+
+ /**
+ * Marks a Native CrtResource as owned by the current Java Object.
+ * This will guarantee that any owned CrtResources are closed in reverse order when this Java Object is closed.
+ *
+ * @param subresource The Resource to own.
+ * @param The CrtResource Type
+ * @return The CrtResource passed in
+ */
+ private T own(T subresource) {
+ ownedSubResources.push(subresource);
+ return subresource;
+ }
+
+ private static URI toUri(SdkHttpRequest sdkRequest) {
+ Validate.notNull(sdkRequest, "SdkHttpRequest must not be null");
+ return invokeSafely(() -> new URI(sdkRequest.protocol(), null, sdkRequest.host(), sdkRequest.port(),
+ null, null, null));
+ }
+
+ public static Builder builder() {
+ return new DefaultBuilder();
+ }
+
+ @Override
+ public String clientName() {
+ return AWS_COMMON_RUNTIME;
+ }
+
+ private HttpClientConnectionManager createConnectionPool(URI uri) {
+ Validate.notNull(uri, "URI must not be null");
+ log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);
+
+ HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions()
+ .withClientBootstrap(bootstrap)
+ .withSocketOptions(socketOptions)
+ .withTlsContext(tlsContext)
+ .withUri(uri)
+ .withWindowSize(windowSize)
+ .withMaxConnections(maxConnectionsPerEndpoint);
+
+ return HttpClientConnectionManager.create(options);
+ }
+
+ private HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
+ Validate.notNull(uri, "URI must not be null");
+ HttpClientConnectionManager connPool = connectionPools.get(uri);
+
+ if (connPool == null) {
+ HttpClientConnectionManager newConnPool = createConnectionPool(uri);
+ HttpClientConnectionManager alreadyExistingConnPool = connectionPools.putIfAbsent(uri, newConnPool);
+
+ if (alreadyExistingConnPool == null) {
+ connPool = newConnPool;
+ } else {
+ // Multiple threads trying to open connections to the same URI at once, close the newer one
+ newConnPool.close();
+ connPool = alreadyExistingConnPool;
+ }
+ }
+
+ return connPool;
+ }
+
+ private List createHttpHeaderList(URI uri, AsyncExecuteRequest asyncRequest) {
+ SdkHttpRequest sdkRequest = asyncRequest.request();
+ List crtHeaderList = new ArrayList<>(sdkRequest.headers().size() + 2);
+
+ // Set Host Header if needed
+ if (isNullOrEmpty(sdkRequest.headers().get(HOST_HEADER))) {
+ crtHeaderList.add(new HttpHeader(HOST_HEADER, uri.getHost()));
+ }
+
+ // Add Connection Keep Alive Header to reuse this Http Connection as long as possible
+ if (isNullOrEmpty(sdkRequest.headers().get(CONNECTION))) {
+ crtHeaderList.add(new HttpHeader(CONNECTION, KEEP_ALIVE));
+ }
+
+ // Set Content-Length if needed
+ Optional contentLength = asyncRequest.requestContentPublisher().contentLength();
+ if (isNullOrEmpty(sdkRequest.headers().get(CONTENT_LENGTH)) && contentLength.isPresent()) {
+ crtHeaderList.add(new HttpHeader(CONTENT_LENGTH, Long.toString(contentLength.get())));
+ }
+
+ // Add the rest of the Headers
+ for (Map.Entry> headerList: sdkRequest.headers().entrySet()) {
+ for (String val: headerList.getValue()) {
+ HttpHeader h = new HttpHeader(headerList.getKey(), val);
+ crtHeaderList.add(h);
+ }
+ }
+
+ return crtHeaderList;
+ }
+
+ private HttpHeader[] asArray(List crtHeaderList) {
+ return crtHeaderList.toArray(new HttpHeader[crtHeaderList.size()]);
+ }
+
+ private HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest, AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter) {
+ SdkHttpRequest sdkRequest = asyncRequest.request();
+ Validate.notNull(uri, "URI must not be null");
+ Validate.notNull(sdkRequest, "SdkHttpRequest must not be null");
+
+ String method = sdkRequest.method().name();
+ String encodedPath = sdkRequest.encodedPath();
+ String encodedQueryString = SdkHttpUtils.encodeAndFlattenQueryParameters(sdkRequest.rawQueryParameters())
+ .map(value -> "?" + value)
+ .orElse("");
+
+ HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(uri, asyncRequest));
+
+ return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, crtToSdkAdapter);
+ }
+
+ @Override
+ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) {
+ if (isClosed.get()) {
+ throw new IllegalStateException("Client is closed. No more requests can be made with this client.");
+ }
+ Validate.notNull(asyncRequest, "AsyncExecuteRequest must not be null");
+ Validate.notNull(asyncRequest.request(), "SdkHttpRequest must not be null");
+ Validate.notNull(asyncRequest.requestContentPublisher(), "RequestContentPublisher must not be null");
+ Validate.notNull(asyncRequest.responseHandler(), "ResponseHandler must not be null");
+
+ URI uri = toUri(asyncRequest.request());
+ HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(uri);
+ CompletableFuture requestFuture = new CompletableFuture<>();
+
+ // When a Connection is ready from the Connection Pool, schedule the Request on the connection
+ crtConnPool.acquireConnection()
+ .whenComplete((crtConn, throwable) -> {
+ // If we didn't get a connection for some reason, fail the request
+ if (throwable != null) {
+ requestFuture.completeExceptionally(throwable);
+ return;
+ }
+
+ AwsCrtAsyncHttpStreamAdapter crtToSdkAdapter =
+ new AwsCrtAsyncHttpStreamAdapter(crtConn, requestFuture, asyncRequest, windowSize);
+ HttpRequest crtRequest = toCrtRequest(uri, asyncRequest, crtToSdkAdapter);
+
+ // Submit the Request on this Connection
+ invokeSafely(() -> crtConn.makeRequest(crtRequest, crtToSdkAdapter).activate());
+ });
+
+ return requestFuture;
+ }
+
+ @Override
+ public void close() {
+ isClosed.set(true);
+ for (HttpClientConnectionManager connPool : connectionPools.values()) {
+ IoUtils.closeQuietly(connPool, log.logger());
+ }
+
+ while (ownedSubResources.size() > 0) {
+ CrtResource r = ownedSubResources.pop();
+ IoUtils.closeQuietly(r, log.logger());
+ }
+ }
+
+ /**
+ * Builder that allows configuration of the AWS CRT HTTP implementation.
+ */
+ public interface Builder extends SdkAsyncHttpClient.Builder {
+
+ /**
+ * The AWS CRT TlsCipherPreference to use for this Client
+ * @param tlsCipherPreference The AWS Common Runtime TlsCipherPreference
+ * @return The builder of the method chaining.
+ */
+ Builder tlsCipherPreference(TlsCipherPreference tlsCipherPreference);
+
+ /**
+ * Whether or not to Verify the Peer's TLS Certificate Chain.
+ * @param verifyPeer true if the Certificate Chain should be validated, false if validation should be skipped.
+ * @return The builder of the method chaining.
+ */
+ Builder verifyPeer(boolean verifyPeer);
+
+ /**
+ * The AWS CRT WindowSize to use for this HttpClient. This represents the number of unread bytes that can be
+ * buffered in the ResponseBodyPublisher before we stop reading from the underlying TCP socket and wait for
+ * the Subscriber to read more data.
+ *
+ * @param windowSize The AWS Common Runtime WindowSize
+ * @return The builder of the method chaining.
+ */
+ Builder windowSize(int windowSize);
+
+ /**
+ * The AWS CRT EventLoopGroup to use for this Client.
+ * @param eventLoopGroup The AWS CRT EventLoopGroup to use for this client.
+ * @return The builder of the method chaining.
+ */
+ Builder eventLoopGroup(EventLoopGroup eventLoopGroup);
+
+ /**
+ * The AWS CRT HostResolver to use for this Client.
+ * @param hostResolver The AWS CRT HostResolver to use for this client.
+ * @return The builder of the method chaining.
+ */
+ Builder hostResolver(HostResolver hostResolver);
+ }
+
+ /**
+ * Factory that allows more advanced configuration of the AWS CRT HTTP implementation. Use {@link #builder()} to
+ * configure and construct an immutable instance of the factory.
+ */
+ private static final class DefaultBuilder implements Builder {
+ private final AttributeMap.Builder standardOptions = AttributeMap.builder();
+ private TlsCipherPreference cipherPreference = TlsCipherPreference.TLS_CIPHER_SYSTEM_DEFAULT;
+ private int windowSize = DEFAULT_STREAM_WINDOW_SIZE;
+ private boolean verifyPeer = true;
+ private EventLoopGroup eventLoopGroup;
+ private HostResolver hostResolver;
+
+ private DefaultBuilder() {
+ }
+
+ @Override
+ public SdkAsyncHttpClient build() {
+ return new AwsCrtAsyncHttpClient(this, standardOptions.build()
+ .merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
+ }
+
+ @Override
+ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
+ return new AwsCrtAsyncHttpClient(this, standardOptions.build()
+ .merge(serviceDefaults)
+ .merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
+ }
+
+ @Override
+ public Builder tlsCipherPreference(TlsCipherPreference tlsCipherPreference) {
+ Validate.notNull(tlsCipherPreference, "cipherPreference");
+ Validate.isTrue(TlsContextOptions.isCipherPreferenceSupported(tlsCipherPreference),
+ "TlsCipherPreference not supported on current Platform");
+ this.cipherPreference = tlsCipherPreference;
+ return this;
+ }
+
+ @Override
+ public Builder verifyPeer(boolean verifyPeer) {
+ this.verifyPeer = verifyPeer;
+ return this;
+ }
+
+ @Override
+ public Builder windowSize(int windowSize) {
+ Validate.isPositive(windowSize, "windowSize");
+ this.windowSize = windowSize;
+ return this;
+ }
+
+ @Override
+ public Builder eventLoopGroup(EventLoopGroup eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
+ return this;
+ }
+
+ @Override
+ public Builder hostResolver(HostResolver hostResolver) {
+ this.hostResolver = hostResolver;
+ return this;
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java
new file mode 100644
index 000000000000..730a85413ad6
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.CRT;
+import software.amazon.awssdk.crt.http.HttpClientConnection;
+import software.amazon.awssdk.crt.http.HttpException;
+import software.amazon.awssdk.crt.http.HttpHeader;
+import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
+import software.amazon.awssdk.crt.http.HttpStream;
+import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.http.async.AsyncExecuteRequest;
+import software.amazon.awssdk.utils.Logger;
+import software.amazon.awssdk.utils.Validate;
+
+/**
+ * Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
+ */
+@SdkInternalApi
+public class AwsCrtAsyncHttpStreamAdapter implements HttpStreamResponseHandler, HttpRequestBodyStream {
+ private static final Logger log = Logger.loggerFor(AwsCrtAsyncHttpStreamAdapter.class);
+
+ private final HttpClientConnection connection;
+ private final CompletableFuture responseComplete;
+ private final AsyncExecuteRequest sdkRequest;
+ private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
+ private final int windowSize;
+ private final AwsCrtRequestBodySubscriber requestBodySubscriber;
+ private AwsCrtResponseBodyPublisher respBodyPublisher = null;
+
+ public AwsCrtAsyncHttpStreamAdapter(HttpClientConnection connection, CompletableFuture responseComplete,
+ AsyncExecuteRequest sdkRequest, int windowSize) {
+ Validate.notNull(connection, "HttpConnection is null");
+ Validate.notNull(responseComplete, "reqComplete Future is null");
+ Validate.notNull(sdkRequest, "AsyncExecuteRequest Future is null");
+ Validate.isPositive(windowSize, "windowSize is <= 0");
+
+ this.connection = connection;
+ this.responseComplete = responseComplete;
+ this.sdkRequest = sdkRequest;
+ this.windowSize = windowSize;
+ this.requestBodySubscriber = new AwsCrtRequestBodySubscriber(windowSize);
+
+ sdkRequest.requestContentPublisher().subscribe(requestBodySubscriber);
+ }
+
+ private void initRespBodyPublisherIfNeeded(HttpStream stream) {
+ if (respBodyPublisher == null) {
+ respBodyPublisher = new AwsCrtResponseBodyPublisher(connection, stream, responseComplete, windowSize);
+ }
+ }
+
+ @Override
+ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
+ initRespBodyPublisherIfNeeded(stream);
+
+ respBuilder.statusCode(responseStatusCode);
+
+ for (HttpHeader h : nextHeaders) {
+ respBuilder.appendHeader(h.getName(), h.getValue());
+ }
+ }
+
+ @Override
+ public void onResponseHeadersDone(HttpStream stream, int headerType) {
+ initRespBodyPublisherIfNeeded(stream);
+
+ respBuilder.statusCode(stream.getResponseStatusCode());
+ sdkRequest.responseHandler().onHeaders(respBuilder.build());
+ sdkRequest.responseHandler().onStream(respBodyPublisher);
+ }
+
+ @Override
+ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
+ initRespBodyPublisherIfNeeded(stream);
+
+ if (respBodyPublisher == null) {
+ log.error(() -> "Publisher is null, onResponseHeadersDone() was never called");
+ throw new IllegalStateException("Publisher is null, onResponseHeadersDone() was never called");
+ }
+
+ respBodyPublisher.queueBuffer(bodyBytesIn);
+ respBodyPublisher.publishToSubscribers();
+
+ return 0;
+ }
+
+ @Override
+ public void onResponseComplete(HttpStream stream, int errorCode) {
+ initRespBodyPublisherIfNeeded(stream);
+
+ if (errorCode == CRT.AWS_CRT_SUCCESS) {
+ log.debug(() -> "Response Completed Successfully");
+ respBodyPublisher.setQueueComplete();
+ respBodyPublisher.publishToSubscribers();
+ } else {
+ HttpException error = new HttpException(errorCode);
+ log.error(() -> "Response Encountered an Error.", error);
+
+ // Invoke Error Callback on SdkAsyncHttpResponseHandler
+ sdkRequest.responseHandler().onError(error);
+
+ // Invoke Error Callback on any Subscriber's of the Response Body
+ respBodyPublisher.setError(error);
+ respBodyPublisher.publishToSubscribers();
+ }
+ }
+
+ @Override
+ public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
+ return requestBodySubscriber.transferRequestBody(bodyBytesOut);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtRequestBodySubscriber.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtRequestBodySubscriber.java
new file mode 100644
index 000000000000..b9790dcb1f1d
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtRequestBodySubscriber.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal;
+
+import static software.amazon.awssdk.crt.utils.ByteBufferUtils.transferData;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.utils.Logger;
+import software.amazon.awssdk.utils.Validate;
+
+/**
+ * Implements the Subscriber API to be be callable from AwsCrtAsyncHttpStreamAdapter.sendRequestBody()
+ */
+@SdkInternalApi
+public class AwsCrtRequestBodySubscriber implements Subscriber {
+ private static final Logger log = Logger.loggerFor(AwsCrtRequestBodySubscriber.class);
+
+ private final int windowSize;
+ private final Queue queuedBuffers = new ConcurrentLinkedQueue<>();
+ private final AtomicLong queuedByteCount = new AtomicLong(0);
+ private final AtomicBoolean isComplete = new AtomicBoolean(false);
+ private final AtomicReference error = new AtomicReference<>(null);
+
+ private AtomicReference subscriptionRef = new AtomicReference<>(null);
+
+ /**
+ *
+ * @param windowSize The number bytes to be queued before we stop proactively queuing data
+ */
+ public AwsCrtRequestBodySubscriber(int windowSize) {
+ Validate.isPositive(windowSize, "windowSize is <= 0");
+ this.windowSize = windowSize;
+ }
+
+ protected void requestDataIfNecessary() {
+ Subscription subscription = subscriptionRef.get();
+ if (subscription == null) {
+ log.error(() -> "Subscription is null");
+ return;
+ }
+ if (queuedByteCount.get() < windowSize) {
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ Validate.notNull(s, "Subscription should not be null");
+
+ boolean wasFirstSubscription = subscriptionRef.compareAndSet(null, s);
+
+ if (!wasFirstSubscription) {
+ log.error(() -> "Only one Subscription supported!");
+ s.cancel();
+ return;
+ }
+
+ requestDataIfNecessary();
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+ Validate.notNull(byteBuffer, "ByteBuffer should not be null");
+ queuedBuffers.add(byteBuffer);
+ queuedByteCount.addAndGet(byteBuffer.remaining());
+ requestDataIfNecessary();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error(() -> "onError() received an error: " + t.getMessage());
+ error.compareAndSet(null, t);
+ }
+
+ @Override
+ public void onComplete() {
+ log.debug(() -> "AwsCrtRequestBodySubscriber Completed");
+ isComplete.set(true);
+ }
+
+ /**
+ * Transfers any queued data from the Request Body subscriptionRef to the output buffer
+ * @param out The output ByteBuffer
+ * @return true if Request Body is completely transferred, false otherwise
+ */
+ public synchronized boolean transferRequestBody(ByteBuffer out) {
+ if (error.get() != null) {
+ throw new RuntimeException(error.get());
+ }
+
+ while (out.remaining() > 0 && queuedBuffers.size() > 0) {
+ ByteBuffer nextBuffer = queuedBuffers.peek();
+ int amtTransferred = transferData(nextBuffer, out);
+ queuedByteCount.addAndGet(-amtTransferred);
+
+ if (nextBuffer.remaining() == 0) {
+ queuedBuffers.remove();
+ }
+ }
+
+ boolean endOfStream = isComplete.get() && (queuedBuffers.size() == 0);
+
+ if (!endOfStream) {
+ requestDataIfNecessary();
+ } else {
+ log.debug(() -> "End Of RequestBody reached");
+ }
+
+ return endOfStream;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java
new file mode 100644
index 000000000000..ebc7888634b6
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java
@@ -0,0 +1,285 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongUnaryOperator;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.crt.http.HttpClientConnection;
+import software.amazon.awssdk.crt.http.HttpStream;
+import software.amazon.awssdk.utils.Logger;
+import software.amazon.awssdk.utils.Validate;
+
+/**
+ * Adapts an AWS Common Runtime Response Body stream from CrtHttpStreamHandler to a Publisher
+ */
+@SdkInternalApi
+public class AwsCrtResponseBodyPublisher implements Publisher {
+ private static final Logger log = Logger.loggerFor(AwsCrtResponseBodyPublisher.class);
+ private static final LongUnaryOperator DECREMENT_IF_GREATER_THAN_ZERO = x -> ((x > 0) ? (x - 1) : (x));
+
+ private final HttpClientConnection connection;
+ private final HttpStream stream;
+ private final CompletableFuture responseComplete;
+ private final AtomicLong outstandingRequests = new AtomicLong(0);
+ private final int windowSize;
+ private final AtomicBoolean isCancelled = new AtomicBoolean(false);
+ private final AtomicBoolean areNativeResourcesReleased = new AtomicBoolean(false);
+ private final AtomicBoolean isSubscriptionComplete = new AtomicBoolean(false);
+ private final AtomicBoolean queueComplete = new AtomicBoolean(false);
+ private final AtomicInteger mutualRecursionDepth = new AtomicInteger(0);
+ private final AtomicInteger queuedBytes = new AtomicInteger(0);
+ private final AtomicReference> subscriberRef = new AtomicReference<>(null);
+ private final Queue queuedBuffers = new ConcurrentLinkedQueue<>();
+ private final AtomicReference error = new AtomicReference<>(null);
+
+ /**
+ * Adapts a streaming AWS CRT Http Response Body to a Publisher
+ * @param stream The AWS CRT Http Stream for this Response
+ * @param windowSize The max allowed bytes to be queued. The sum of the sizes of all queued ByteBuffers should
+ * never exceed this value.
+ */
+ public AwsCrtResponseBodyPublisher(HttpClientConnection connection, HttpStream stream,
+ CompletableFuture responseComplete, int windowSize) {
+ Validate.notNull(connection, "HttpConnection must not be null");
+ Validate.notNull(stream, "Stream must not be null");
+ Validate.notNull(responseComplete, "Stream must not be null");
+ Validate.isPositive(windowSize, "windowSize must be > 0");
+ this.connection = connection;
+ this.stream = stream;
+ this.responseComplete = responseComplete;
+ this.windowSize = windowSize;
+ }
+
+ /**
+ * Method for the users consuming the Http Response Body to register a subscriber.
+ * @param subscriber The Subscriber to register.
+ */
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ Validate.notNull(subscriber, "Subscriber must not be null");
+
+ boolean wasFirstSubscriber = subscriberRef.compareAndSet(null, subscriber);
+
+ if (!wasFirstSubscriber) {
+ log.error(() -> "Only one subscriber allowed");
+ subscriber.onError(new IllegalStateException("Only one subscriber allowed"));
+ return;
+ }
+
+ subscriber.onSubscribe(new AwsCrtResponseBodySubscription(this));
+ }
+
+ /**
+ * Adds a Buffer to the Queue to be published to any Subscribers
+ * @param buffer The Buffer to be queued.
+ */
+ public void queueBuffer(byte[] buffer) {
+ Validate.notNull(buffer, "ByteBuffer must not be null");
+
+ if (isCancelled.get()) {
+ // Immediately open HttpStream's IO window so it doesn't see any IO Back-pressure.
+ // AFAIK there's no way to abort an in-progress HttpStream, only free it's memory by calling close()
+ stream.incrementWindow(buffer.length);
+ return;
+ }
+
+ queuedBuffers.add(buffer);
+ int totalBytesQueued = queuedBytes.addAndGet(buffer.length);
+
+ if (totalBytesQueued > windowSize) {
+ throw new IllegalStateException("Queued more than Window Size: queued=" + totalBytesQueued
+ + ", window=" + windowSize);
+ }
+ }
+
+ /**
+ * Function called by Response Body Subscribers to request more Response Body buffers.
+ * @param n The number of buffers requested.
+ */
+ protected void request(long n) {
+ Validate.inclusiveBetween(1, Long.MAX_VALUE, n, "request");
+
+ // Check for overflow of outstanding Requests, and clamp to LONG_MAX.
+ long outstandingReqs;
+ if (n > (Long.MAX_VALUE - outstandingRequests.get())) {
+ outstandingRequests.set(Long.MAX_VALUE);
+ outstandingReqs = Long.MAX_VALUE;
+ } else {
+ outstandingReqs = outstandingRequests.addAndGet(n);
+ }
+
+ /*
+ * Since we buffer, in the case where the subscriber came in after the publication has already begun,
+ * go ahead and flush what we have.
+ */
+ publishToSubscribers();
+
+ log.trace(() -> "Subscriber Requested more Buffers. Outstanding Requests: " + outstandingReqs);
+ }
+
+ public void setError(Throwable t) {
+ log.error(() -> "Error processing Response Body", t);
+ error.compareAndSet(null, t);
+ }
+
+ protected void setCancelled() {
+ isCancelled.set(true);
+ /**
+ * subscriberRef must set to null due to ReactiveStream Spec stating references to Subscribers must be deleted
+ * when onCancel() is called.
+ */
+ subscriberRef.set(null);
+ }
+
+ private synchronized void releaseNativeResources() {
+ boolean alreadyReleased = areNativeResourcesReleased.getAndSet(true);
+
+ if (!alreadyReleased) {
+ stream.close();
+ connection.close();
+ }
+ }
+
+ /**
+ * Called when the final Buffer has been queued and no more data is expected.
+ */
+ public void setQueueComplete() {
+ log.trace(() -> "Response Body Publisher queue marked as completed.");
+ queueComplete.set(true);
+ // We're done with the Native Resources, release them so they can be used by another request.
+ releaseNativeResources();
+ }
+
+ /**
+ * Completes the Subscription by calling either the .onError() or .onComplete() callbacks exactly once.
+ */
+ protected void completeSubscriptionExactlyOnce() {
+ boolean alreadyComplete = isSubscriptionComplete.getAndSet(true);
+
+ if (alreadyComplete) {
+ return;
+ }
+
+ // Subscriber may have cancelled their subscription, in which case this may be null.
+ Optional subscriber = Optional.ofNullable(subscriberRef.getAndSet(null));
+
+ Throwable throwable = error.get();
+
+ // We're done with the Native Resources, release them so they can be used by another request.
+ releaseNativeResources();
+
+ // Complete the Futures
+ if (throwable != null) {
+ log.error(() -> "Error before ResponseBodyPublisher could complete: " + throwable.getMessage());
+ subscriber.ifPresent(s -> s.onError(throwable));
+ responseComplete.completeExceptionally(throwable);
+ } else {
+ log.debug(() -> "ResponseBodyPublisher Completed Successfully");
+ subscriber.ifPresent(s -> s.onComplete());
+ responseComplete.complete(null);
+ }
+ }
+
+ /**
+ * Publishes any queued data to any Subscribers if there is data queued and there is an outstanding Subscriber
+ * request for more data. Will also call onError() or onComplete() callbacks if needed.
+ *
+ * This method MUST be synchronized since it can be called simultaneously from both the Native EventLoop Thread and
+ * the User Thread. If this method wasn't synchronized, it'd be possible for each thread to dequeue a buffer by
+ * calling queuedBuffers.poll(), but then have the 2nd thread call subscriber.onNext(buffer) first, resulting in the
+ * subscriber seeing out-of-order data. To avoid this race condition, this method must be synchronized.
+ */
+ protected synchronized void publishToSubscribers() {
+ if (error.get() != null) {
+ completeSubscriptionExactlyOnce();
+ return;
+ }
+
+ if (isSubscriptionComplete.get() || isCancelled.get()) {
+ log.warn(() -> "Subscription already completed or cancelled, can't publish updates to Subscribers.");
+ return;
+ }
+
+ if (mutualRecursionDepth.get() > 0) {
+ /**
+ * If our depth is > 0, then we already made a call to publishToSubscribers() further up the stack that
+ * will continue publishing to subscribers, and this call should return without completing work to avoid
+ * infinite recursive loop between: "subscription.request() -> subscriber.onNext() -> subscription.request()"
+ */
+ return;
+ }
+
+ int totalAmountTransferred = 0;
+
+ while (outstandingRequests.get() > 0 && queuedBuffers.size() > 0) {
+ byte[] buffer = queuedBuffers.poll();
+ outstandingRequests.getAndUpdate(DECREMENT_IF_GREATER_THAN_ZERO);
+ int amount = buffer.length;
+ publishWithoutMutualRecursion(subscriberRef.get(), ByteBuffer.wrap(buffer));
+ totalAmountTransferred += amount;
+ }
+
+ if (totalAmountTransferred > 0) {
+ queuedBytes.addAndGet(-totalAmountTransferred);
+
+ // We may have released the Native HttpConnection and HttpStream if they completed before the Subscriber
+ // has finished reading the data.
+ if (!areNativeResourcesReleased.get()) {
+ // Open HttpStream's IO window so HttpStream can keep track of IO back-pressure
+ stream.incrementWindow(totalAmountTransferred);
+ }
+ }
+
+ // Check if Complete, consider no subscriber as a completion.
+ if (queueComplete.get() && queuedBuffers.size() == 0) {
+ completeSubscriptionExactlyOnce();
+ }
+ }
+
+ /**
+ * This method is used to avoid a StackOverflow due to the potential infinite loop between
+ * "subscription.request() -> subscriber.onNext() -> subscription.request()" calls. We only call subscriber.onNext()
+ * if the recursion depth is zero, otherwise we return up to the stack frame with depth zero and continue publishing
+ * from there.
+ * @param subscriber The Subscriber to publish to.
+ * @param buffer The buffer to publish to the subscriber.
+ */
+ private synchronized void publishWithoutMutualRecursion(Subscriber super ByteBuffer> subscriber, ByteBuffer buffer) {
+ try {
+ /**
+ * Need to keep track of recursion depth between .onNext() -> .request() calls
+ */
+ int depth = mutualRecursionDepth.getAndIncrement();
+ if (depth == 0) {
+ subscriber.onNext(buffer);
+ }
+ } finally {
+ mutualRecursionDepth.decrementAndGet();
+ }
+ }
+
+}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodySubscription.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodySubscription.java
new file mode 100644
index 000000000000..fe573a901af3
--- /dev/null
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodySubscription.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt.internal;
+
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.utils.Logger;
+
+/**
+ * Helper Class that passes through calls from a Subscription to a AwsCrtResponseBodyPublisher
+ */
+@SdkInternalApi
+public class AwsCrtResponseBodySubscription implements Subscription {
+ private static final Logger log = Logger.loggerFor(AwsCrtResponseBodySubscription.class);
+ private final AwsCrtResponseBodyPublisher publisher;
+
+ public AwsCrtResponseBodySubscription(AwsCrtResponseBodyPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ // Reactive Stream Spec requires us to call onError() callback instead of throwing Exception here.
+ publisher.setError(new IllegalArgumentException("Request is for <= 0 elements: " + n));
+ publisher.publishToSubscribers();
+ return;
+ }
+
+ publisher.request(n);
+ publisher.publishToSubscribers();
+ }
+
+ @Override
+ public void cancel() {
+ publisher.setCancelled();
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java
new file mode 100644
index 000000000000..ee6fa1ef4b59
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientSpiVerificationTest.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.any;
+import static com.github.tomakehurst.wiremock.client.WireMock.binaryEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static java.util.Collections.emptyMap;
+import static org.apache.commons.codec.digest.DigestUtils.sha256Hex;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.github.tomakehurst.wiremock.http.Fault;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.crt.CrtResource;
+import software.amazon.awssdk.crt.io.EventLoopGroup;
+import software.amazon.awssdk.crt.io.HostResolver;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.http.SdkHttpRequest;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.http.async.AsyncExecuteRequest;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
+import software.amazon.awssdk.utils.Logger;
+
+public class AwsCrtHttpClientSpiVerificationTest {
+ private static final Logger log = Logger.loggerFor(AwsCrtHttpClientSpiVerificationTest.class);
+ private static final int TEST_BODY_LEN = 1024;
+
+ @Rule
+ public WireMockRule mockServer = new WireMockRule(wireMockConfig()
+ .dynamicPort()
+ .dynamicHttpsPort());
+
+ private EventLoopGroup eventLoopGroup;
+ private HostResolver hostResolver;
+ private SdkAsyncHttpClient client;
+
+ @Before
+ public void setup() throws Exception {
+ CrtResource.waitForNoResources();
+
+ int numThreads = Runtime.getRuntime().availableProcessors();
+ eventLoopGroup = new EventLoopGroup(numThreads);
+ hostResolver = new HostResolver(eventLoopGroup);
+
+ client = AwsCrtAsyncHttpClient.builder()
+ .eventLoopGroup(eventLoopGroup)
+ .hostResolver(hostResolver)
+ .build();
+ }
+
+ @After
+ public void tearDown() {
+ client.close();
+ hostResolver.close();
+ eventLoopGroup.close();
+ CrtResource.waitForNoResources();
+ }
+
+ private byte[] generateRandomBody(int size) {
+ byte[] randomData = new byte[size];
+ new Random().nextBytes(randomData);
+ return randomData;
+ }
+
+ @Test
+ public void signalsErrorViaOnErrorAndFuture() throws InterruptedException, ExecutionException, TimeoutException {
+ stubFor(any(urlEqualTo("/")).willReturn(aResponse().withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
+
+ CompletableFuture errorSignaled = new CompletableFuture<>();
+
+ SdkAsyncHttpResponseHandler handler = new TestResponseHandler() {
+ @Override
+ public void onError(Throwable error) {
+ errorSignaled.complete(true);
+ }
+ };
+
+ SdkHttpRequest request = createRequest(URI.create("http://localhost:" + mockServer.port()));
+
+ CompletableFuture executeFuture = client.execute(AsyncExecuteRequest.builder()
+ .request(request)
+ .responseHandler(handler)
+ .requestContentPublisher(new EmptyPublisher())
+ .build());
+
+ assertThat(errorSignaled.get(1, TimeUnit.SECONDS)).isTrue();
+ assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(Exception.class);
+
+ }
+
+ @Test
+ public void callsOnStreamForEmptyResponseContent() throws Exception {
+ stubFor(any(urlEqualTo("/")).willReturn(aResponse().withStatus(204).withHeader("foo", "bar")));
+
+ CompletableFuture streamReceived = new CompletableFuture<>();
+ final AtomicReference response = new AtomicReference<>(null);
+
+ SdkAsyncHttpResponseHandler handler = new TestResponseHandler() {
+ @Override
+ public void onHeaders(SdkHttpResponse headers) {
+ response.compareAndSet(null, headers);
+ }
+ @Override
+ public void onStream(Publisher stream) {
+ super.onStream(stream);
+ streamReceived.complete(true);
+ }
+ };
+
+ SdkHttpRequest request = createRequest(URI.create("http://localhost:" + mockServer.port()));
+
+ CompletableFuture future = client.execute(AsyncExecuteRequest.builder()
+ .request(request)
+ .responseHandler(handler)
+ .requestContentPublisher(new EmptyPublisher())
+ .build());
+
+ future.get(60, TimeUnit.SECONDS);
+ assertThat(streamReceived.get(1, TimeUnit.SECONDS)).isTrue();
+ assertThat(response.get() != null).isTrue();
+ assertThat(response.get().statusCode() == 204).isTrue();
+ assertThat(response.get().headers().get("foo").isEmpty()).isFalse();
+ }
+
+ @Test
+ public void testGetRequest() throws Exception {
+ String path = "/testGetRequest";
+ byte[] body = generateRandomBody(TEST_BODY_LEN);
+ String expectedBodyHash = sha256Hex(body).toUpperCase();
+ stubFor(any(urlEqualTo(path)).willReturn(aResponse().withStatus(200)
+ .withHeader("Content-Length", Integer.toString(TEST_BODY_LEN))
+ .withHeader("foo", "bar")
+ .withBody(body)));
+
+ CompletableFuture streamReceived = new CompletableFuture<>();
+ final AtomicReference response = new AtomicReference<>(null);
+ Sha256BodySubscriber bodySha256Subscriber = new Sha256BodySubscriber();
+ final AtomicReference error = new AtomicReference<>(null);
+
+ SdkAsyncHttpResponseHandler handler = new SdkAsyncHttpResponseHandler() {
+ @Override
+ public void onHeaders(SdkHttpResponse headers) {
+ response.compareAndSet(null, headers);
+ }
+ @Override
+ public void onStream(Publisher stream) {
+ stream.subscribe(bodySha256Subscriber);
+ streamReceived.complete(true);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ error.compareAndSet(null, t);
+ }
+ };
+
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ SdkHttpRequest request = createRequest(uri, path, null, SdkHttpMethod.GET, emptyMap());
+
+ CompletableFuture future = client.execute(AsyncExecuteRequest.builder()
+ .request(request)
+ .responseHandler(handler)
+ .requestContentPublisher(new EmptyPublisher())
+ .build());
+
+ future.get(60, TimeUnit.SECONDS);
+ assertThat(error.get()).isNull();
+ assertThat(streamReceived.get(1, TimeUnit.SECONDS)).isTrue();
+ assertThat(bodySha256Subscriber.getFuture().get(60, TimeUnit.SECONDS)).isEqualTo(expectedBodyHash);
+ assertThat(response.get().statusCode()).isEqualTo(200);
+ assertThat(response.get().headers().get("foo").isEmpty()).isFalse();
+ }
+
+
+ private void makePutRequest(String path, byte[] reqBody, int expectedStatus) throws Exception {
+ CompletableFuture streamReceived = new CompletableFuture<>();
+ final AtomicReference response = new AtomicReference<>(null);
+ final AtomicReference error = new AtomicReference<>(null);
+
+ Subscriber subscriber = new Subscriber() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ };
+
+ SdkAsyncHttpResponseHandler handler = new SdkAsyncHttpResponseHandler() {
+ @Override
+ public void onHeaders(SdkHttpResponse headers) {
+ response.compareAndSet(null, headers);
+ }
+ @Override
+ public void onStream(Publisher stream) {
+ stream.subscribe(subscriber);
+ streamReceived.complete(true);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ error.compareAndSet(null, t);
+ }
+ };
+
+ URI uri = URI.create("http://localhost:" + mockServer.port());
+ SdkHttpRequest request = createRequest(uri, path, reqBody, SdkHttpMethod.PUT, emptyMap());
+
+ CompletableFuture future = client.execute(AsyncExecuteRequest.builder()
+ .request(request)
+ .responseHandler(handler)
+ .requestContentPublisher(new SdkTestHttpContentPublisher(reqBody))
+ .build());
+ future.get(60, TimeUnit.SECONDS);
+ assertThat(error.get()).isNull();
+ assertThat(streamReceived.get(60, TimeUnit.SECONDS)).isTrue();
+ assertThat(response.get().statusCode()).isEqualTo(expectedStatus);
+ }
+
+
+ @Test
+ public void testPutRequest() throws Exception {
+ String pathExpect200 = "/testPutRequest/return_200_on_exact_match";
+ byte[] expectedBody = generateRandomBody(TEST_BODY_LEN);
+ stubFor(any(urlEqualTo(pathExpect200)).withRequestBody(binaryEqualTo(expectedBody)).willReturn(aResponse().withStatus(200)));
+ makePutRequest(pathExpect200, expectedBody, 200);
+
+ String pathExpect404 = "/testPutRequest/return_404_always";
+ byte[] randomBody = generateRandomBody(TEST_BODY_LEN);
+ stubFor(any(urlEqualTo(pathExpect404)).willReturn(aResponse().withStatus(404)));
+ makePutRequest(pathExpect404, randomBody, 404);
+ }
+
+ private SdkHttpFullRequest createRequest(URI endpoint) {
+ return createRequest(endpoint, "/", null, SdkHttpMethod.GET, emptyMap());
+ }
+
+ private SdkHttpFullRequest createRequest(URI endpoint,
+ String resourcePath,
+ byte[] body,
+ SdkHttpMethod method,
+ Map params) {
+
+ String contentLength = (body == null) ? null : String.valueOf(body.length);
+ return SdkHttpFullRequest.builder()
+ .uri(endpoint)
+ .method(method)
+ .encodedPath(resourcePath)
+ .applyMutation(b -> params.forEach(b::putRawQueryParameter))
+ .applyMutation(b -> {
+ b.putHeader("Host", endpoint.getHost());
+ if (contentLength != null) {
+ b.putHeader("Content-Length", contentLength);
+ }
+ }).build();
+ }
+
+ private static class TestResponseHandler implements SdkAsyncHttpResponseHandler {
+ @Override
+ public void onHeaders(SdkHttpResponse headers) {
+ }
+
+ @Override
+ public void onStream(Publisher stream) {
+ stream.subscribe(new DrainingSubscriber<>());
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ }
+ }
+
+ private static class DrainingSubscriber implements Subscriber {
+ private Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ this.subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(T t) {
+ this.subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtRequestBodySubscriberReactiveStreamCompatTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtRequestBodySubscriberReactiveStreamCompatTest.java
new file mode 100644
index 000000000000..d2b07542c85c
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtRequestBodySubscriberReactiveStreamCompatTest.java
@@ -0,0 +1,66 @@
+package software.amazon.awssdk.http.crt;
+
+import java.nio.ByteBuffer;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.reactivestreams.tck.SubscriberWhiteboxVerification;
+import org.reactivestreams.tck.TestEnvironment;
+import software.amazon.awssdk.http.crt.internal.AwsCrtRequestBodySubscriber;
+
+public class AwsCrtRequestBodySubscriberReactiveStreamCompatTest extends SubscriberWhiteboxVerification {
+ private static final int DEFAULT_STREAM_WINDOW_SIZE = 16 * 1024 * 1024; // 16 MB Total Buffer size
+
+ public AwsCrtRequestBodySubscriberReactiveStreamCompatTest() {
+ super(new TestEnvironment());
+ }
+
+ @Override
+ public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) {
+ AwsCrtRequestBodySubscriber actualSubscriber = new AwsCrtRequestBodySubscriber(DEFAULT_STREAM_WINDOW_SIZE);
+
+ // Pass Through calls to AwsCrtRequestBodySubscriber, but also register calls to the whitebox probe
+ Subscriber passthroughSubscriber = new Subscriber() {
+ @Override
+ public void onSubscribe(Subscription s) {
+ actualSubscriber.onSubscribe(s);
+ probe.registerOnSubscribe(new SubscriberPuppet() {
+
+ @Override
+ public void triggerRequest(long elements) {
+ s.request(elements);
+ }
+
+ @Override
+ public void signalCancel() {
+ s.cancel();
+ }
+ });
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+ actualSubscriber.onNext(byteBuffer);
+ probe.registerOnNext(byteBuffer);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ actualSubscriber.onError(t);
+ probe.registerOnError(t);
+ }
+
+ @Override
+ public void onComplete() {
+ actualSubscriber.onComplete();
+ probe.registerOnComplete();
+ }
+ };
+
+ return passthroughSubscriber;
+ }
+
+ @Override
+ public ByteBuffer createElement(int element) {
+ return ByteBuffer.wrap(Integer.toString(element).getBytes());
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java
new file mode 100644
index 000000000000..143f1e7b591b
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtResponseBodyPublisherReactiveStreamCompatTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt;
+
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.tck.PublisherVerification;
+import org.reactivestreams.tck.TestEnvironment;
+import software.amazon.awssdk.crt.http.HttpClientConnection;
+import software.amazon.awssdk.crt.http.HttpStream;
+import software.amazon.awssdk.http.crt.internal.AwsCrtResponseBodyPublisher;
+import software.amazon.awssdk.utils.Logger;
+
+public class AwsCrtResponseBodyPublisherReactiveStreamCompatTest extends PublisherVerification {
+ private static final Logger log = Logger.loggerFor(AwsCrtResponseBodyPublisherReactiveStreamCompatTest.class);
+
+ public AwsCrtResponseBodyPublisherReactiveStreamCompatTest() {
+ super(new TestEnvironment());
+ }
+
+ @Override
+ public Publisher createPublisher(long elements) {
+ HttpClientConnection connection = mock(HttpClientConnection.class);
+ HttpStream stream = mock(HttpStream.class);
+ AwsCrtResponseBodyPublisher bodyPublisher = new AwsCrtResponseBodyPublisher(connection, stream, new CompletableFuture<>(), Integer.MAX_VALUE);
+
+ for (long i = 0; i < elements; i++) {
+ bodyPublisher.queueBuffer(UUID.randomUUID().toString().getBytes());
+ }
+
+ bodyPublisher.setQueueComplete();
+ return bodyPublisher;
+ }
+
+ // Some tests try to create INT_MAX elements, which causes OutOfMemory Exceptions. Lower the max allowed number of
+ // queued buffers to 1024.
+ @Override
+ public long maxElementsFromPublisher() {
+ return 1024;
+ }
+
+ @Override
+ public Publisher createFailedPublisher() {
+ return null;
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/EmptyPublisher.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/EmptyPublisher.java
new file mode 100644
index 000000000000..1e85fc43cda6
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/EmptyPublisher.java
@@ -0,0 +1,45 @@
+package software.amazon.awssdk.http.crt;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
+
+public class EmptyPublisher implements SdkHttpContentPublisher {
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ subscriber.onSubscribe(new EmptySubscription(subscriber));
+ }
+
+ @Override
+ public Optional contentLength() {
+ return Optional.of(0L);
+ }
+
+ private static class EmptySubscription implements Subscription {
+ private final Subscriber subscriber;
+ private volatile boolean done;
+
+ EmptySubscription(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void request(long l) {
+ if (!done) {
+ done = true;
+ if (l <= 0) {
+ this.subscriber.onError(new IllegalArgumentException("Demand must be positive"));
+ } else {
+ this.subscriber.onComplete();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ done = true;
+ }
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/SdkTestHttpContentPublisher.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/SdkTestHttpContentPublisher.java
new file mode 100644
index 000000000000..3ad5f08ac0c0
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/SdkTestHttpContentPublisher.java
@@ -0,0 +1,56 @@
+package software.amazon.awssdk.http.crt;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
+
+public class SdkTestHttpContentPublisher implements SdkHttpContentPublisher {
+ private final byte[] body;
+ private final AtomicReference> subscriber = new AtomicReference<>(null);
+ private final AtomicBoolean complete = new AtomicBoolean(false);
+
+ public SdkTestHttpContentPublisher(byte[] body) {
+ this.body = body;
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> s) {
+ boolean wasFirstSubscriber = subscriber.compareAndSet(null, s);
+
+ SdkTestHttpContentPublisher publisher = this;
+
+ if (wasFirstSubscriber) {
+ s.onSubscribe(new Subscription() {
+ @Override
+ public void request(long n) {
+ publisher.request(n);
+ }
+
+ @Override
+ public void cancel() {
+ // Do nothing
+ }
+ });
+ } else {
+ s.onError(new RuntimeException("Only allow one subscriber"));
+ }
+ }
+
+ protected void request(long n) {
+ // Send the whole body if they request >0 ByteBuffers
+ if (n > 0 && !complete.get()) {
+ complete.set(true);
+ subscriber.get().onNext(ByteBuffer.wrap(body));
+ subscriber.get().onComplete();
+ }
+ }
+
+ @Override
+ public Optional contentLength() {
+ return Optional.of((long)body.length);
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/Sha256BodySubscriber.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/Sha256BodySubscriber.java
new file mode 100644
index 000000000000..508deffcb199
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/Sha256BodySubscriber.java
@@ -0,0 +1,44 @@
+package software.amazon.awssdk.http.crt;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.CompletableFuture;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public class Sha256BodySubscriber implements Subscriber {
+ private MessageDigest digest;
+ private CompletableFuture future;
+
+ public Sha256BodySubscriber() throws NoSuchAlgorithmException {
+ digest = MessageDigest.getInstance("SHA-256");
+ future = new CompletableFuture<>();
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ s.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+ digest.update(byteBuffer);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ future.completeExceptionally(t);
+ }
+
+ @Override
+ public void onComplete() {
+ future.complete(encodeHexString(digest.digest()).toUpperCase());
+ }
+
+ public CompletableFuture getFuture() {
+ return future;
+ }
+}
diff --git a/http-clients/netty-nio-client/pom.xml b/http-clients/netty-nio-client/pom.xml
index 795d25b1655e..c792d75d8d5a 100644
--- a/http-clients/netty-nio-client/pom.xml
+++ b/http-clients/netty-nio-client/pom.xml
@@ -20,7 +20,7 @@
http-clients
software.amazon.awssdk
- 2.11.6
+ 2.11.7-SNAPSHOT
4.0.0
diff --git a/http-clients/pom.xml b/http-clients/pom.xml
index 49db92c581a8..4cb0c52247e9 100644
--- a/http-clients/pom.xml
+++ b/http-clients/pom.xml
@@ -21,7 +21,7 @@
aws-sdk-java-pom
software.amazon.awssdk
- 2.11.6
+ 2.11.7-SNAPSHOT
4.0.0
@@ -31,6 +31,7 @@
apache-client
+ aws-crt-client
netty-nio-client
url-connection-client
diff --git a/http-clients/url-connection-client/pom.xml b/http-clients/url-connection-client/pom.xml
index de06a876ba3f..ed004521dfd4 100644
--- a/http-clients/url-connection-client/pom.xml
+++ b/http-clients/url-connection-client/pom.xml
@@ -20,7 +20,7 @@
http-clients
software.amazon.awssdk
- 2.11.6
+ 2.11.7-SNAPSHOT
4.0.0
diff --git a/pom.xml b/pom.xml
index a8f76ad02026..5c5805b9d48c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
4.0.0
software.amazon.awssdk
aws-sdk-java-pom
- 2.11.6
+ 2.11.7-SNAPSHOT
pom
AWS Java SDK :: Parent
The Amazon Web Services SDK for Java provides Java APIs
diff --git a/release-scripts/pom.xml b/release-scripts/pom.xml
index 2d948e6912d3..594905f6f85e 100644
--- a/release-scripts/pom.xml
+++ b/release-scripts/pom.xml
@@ -22,7 +22,7 @@
software.amazon.awssdk
aws-sdk-java-pom
- 2.11.6
+ 2.11.7-SNAPSHOT
../pom.xml
release-scripts
diff --git a/services-custom/dynamodb-enhanced/pom.xml b/services-custom/dynamodb-enhanced/pom.xml
index f7e736ca198d..0d44568ba3d1 100644
--- a/services-custom/dynamodb-enhanced/pom.xml
+++ b/services-custom/dynamodb-enhanced/pom.xml
@@ -21,7 +21,7 @@
software.amazon.awssdk
services-custom
- 2.11.6
+ 2.11.7-SNAPSHOT
dynamodb-enhanced
${awsjavasdk.version}-PREVIEW
diff --git a/services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/mapper/BeanTableSchema.java b/services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/mapper/BeanTableSchema.java
index a19ce1ba5412..02f1ac85237e 100644
--- a/services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/mapper/BeanTableSchema.java
+++ b/services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/mapper/BeanTableSchema.java
@@ -23,6 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
@@ -230,29 +231,56 @@ private static Optional converterProviderAnnotation(
Optional.empty();
}
- @SuppressWarnings("unchecked")
private static StaticAttribute.Builder staticAttributeBuilder(PropertyDescriptor propertyDescriptor,
Class beanClass) {
Type propertyType = propertyDescriptor.getReadMethod().getGenericReturnType();
- EnhancedType> propertyTypeToken = null;
+ EnhancedType> propertyTypeToken = convertTypeToEnhancedType(propertyType);
+ return StaticAttribute.builder(beanClass, propertyTypeToken)
+ .name(attributeNameForProperty(propertyDescriptor))
+ .getter(getterForProperty(propertyDescriptor, beanClass))
+ .setter(setterForProperty(propertyDescriptor, beanClass));
+ }
- if (propertyType instanceof Class) {
- Class> clazz = (Class>) propertyType;
- if (clazz.getAnnotation(DynamoDbBean.class) != null) {
- propertyTypeToken = EnhancedType.documentOf((Class