Skip to content

Updated aws-crt-client to use new 'utils' publishers for its response body logic. #3490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletableFuture<Void> execute(CrtRequestContext executionContext) {

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
HttpStreamResponseHandler crtResponseHandler =
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, executionContext);
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());

// Submit the request on the connection
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http.crt.internal.response;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.CRT;
Expand All @@ -26,10 +27,10 @@
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
Expand All @@ -39,97 +40,110 @@ public final class CrtResponseAdapter implements HttpStreamResponseHandler {
private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class);

private final HttpClientConnection connection;
private final CompletableFuture<Void> responseComplete;
private final AsyncExecuteRequest sdkRequest;
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
private final int windowSize;
private CrtResponseBodyPublisher respBodyPublisher;
private final CompletableFuture<Void> completionFuture;
private final SdkAsyncHttpResponseHandler responseHandler;
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();

private CrtResponseAdapter(HttpClientConnection connection,
CompletableFuture<Void> responseComplete,
AsyncExecuteRequest sdkRequest,
int windowSize) {
this.connection = Validate.notNull(connection, "HttpConnection is null");
this.responseComplete = Validate.notNull(responseComplete, "reqComplete Future is null");
this.sdkRequest = Validate.notNull(sdkRequest, "AsyncExecuteRequest Future is null");
this.windowSize = Validate.isPositive(windowSize, "windowSize is <= 0");
}
private final SdkHttpResponse.Builder responseBuilder = SdkHttpResponse.builder();

public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection connection,
CompletableFuture<Void> responseComplete,
CrtRequestContext request) {
return new CrtResponseAdapter(connection, responseComplete, request.sdkRequest(), request.readBufferSize());
private CrtResponseAdapter(HttpClientConnection connection,
CompletableFuture<Void> completionFuture,
SdkAsyncHttpResponseHandler responseHandler) {
this.connection = Validate.paramNotNull(connection, "connection");
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
}

private void initRespBodyPublisherIfNeeded(HttpStream stream) {
if (respBodyPublisher == null) {
respBodyPublisher = new CrtResponseBodyPublisher(connection, stream, responseComplete, windowSize);
}
public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
CompletableFuture<Void> requestFuture,
SdkAsyncHttpResponseHandler responseHandler) {
return new CrtResponseAdapter(crtConn, requestFuture, responseHandler);
}

@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
initRespBodyPublisherIfNeeded(stream);

for (HttpHeader h : nextHeaders) {
respBuilder.appendHeader(h.getName(), h.getValue());
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
}
}
}

@Override
public void onResponseHeadersDone(HttpStream stream, int headerType) {
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
initRespBodyPublisherIfNeeded(stream);

respBuilder.statusCode(stream.getResponseStatusCode());
sdkRequest.responseHandler().onHeaders(respBuilder.build());
sdkRequest.responseHandler().onStream(respBodyPublisher);
responseBuilder.statusCode(stream.getResponseStatusCode());
responseHandler.onHeaders(responseBuilder.build());
responseHandler.onStream(responsePublisher);
}
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
initRespBodyPublisherIfNeeded(stream);
CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));

if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
return bodyBytesIn.length;
}

respBodyPublisher.queueBuffer(bodyBytesIn);
respBodyPublisher.publishToSubscribers();
writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
failResponseHandlerAndFuture(stream, failure);
return;
}

stream.incrementWindow(bodyBytesIn.length);
});

/*
* Intentionally zero. We manually manage the crt stream's window within the body publisher by updating with
* the exact amount we were able to push to the subcriber.
*
* See the call to stream.incrementWindow() in AwsCrtResponseBodyPublisher.
*/
return 0;
}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
initRespBodyPublisherIfNeeded(stream);

if (HttpStatusFamily.of(respBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
connection.shutdown();
}

if (errorCode == CRT.AWS_CRT_SUCCESS) {
log.debug(() -> "Response Completed Successfully");
respBodyPublisher.setQueueComplete();
respBodyPublisher.publishToSubscribers();
onSuccessfulResponseComplete(stream);
} else {
HttpException error = new HttpException(errorCode);
log.error(() -> "Response Encountered an Error.", error);

// Invoke Error Callback on SdkAsyncHttpResponseHandler
try {
sdkRequest.responseHandler().onError(error);
} catch (Exception e) {
log.error(() -> String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError: %s",
sdkRequest.responseHandler(), e));
onFailedResponseComplete(stream, new HttpException(errorCode));
}
}

private void onSuccessfulResponseComplete(HttpStream stream) {
responsePublisher.complete().whenComplete((result, failure) -> {
if (failure != null) {
failResponseHandlerAndFuture(stream, failure);
return;
}

if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
connection.shutdown();
}

// Invoke Error Callback on any Subscriber's of the Response Body
respBodyPublisher.setError(error);
respBodyPublisher.publishToSubscribers();
connection.close();
stream.close();
completionFuture.complete(null);
});
}

private void onFailedResponseComplete(HttpStream stream, HttpException error) {
log.error(() -> "HTTP response encountered an error.", error);
responsePublisher.error(error);
failResponseHandlerAndFuture(stream, error);
}

private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) {
callResponseHandlerOnError(error);
completionFuture.completeExceptionally(error);
connection.shutdown();
connection.close();
stream.close();
}

private void callResponseHandlerOnError(Throwable error) {
try {
responseHandler.onError(error);
} catch (RuntimeException e) {
log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
}
}
}
Loading