Skip to content

Don't notify onError in MakeAsyncHttpRequestStage #779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public <OutputT> CompletableFuture<OutputT> execute(TransformingAsyncResponseHan
.first(SigningStage::new)
.then(BeforeTransmissionExecutionInterceptorsStage::new)
.then(d -> new MakeAsyncHttpRequestStage<>(responseHandler, errorResponseHandler, d))
.wrappedWith(AsyncRetryableStage::new)
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps, wrapped))
.then(async(() -> new UnwrapResponseContainer<>()))
.then(async(() -> new AfterExecutionInterceptorsStage<>()))
.wrappedWith(AsyncExecutionFailureExceptionReportingStage::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@SdkInternalApi
public final class SyncResponseHandlerAdapter<T> implements TransformingAsyncResponseHandler<T> {
private final CompletableFuture<ByteArrayOutputStream> streamFuture = new CompletableFuture<>();
private volatile CompletableFuture<ByteArrayOutputStream> streamFuture;
private final HttpResponseHandler<T> responseHandler;
private final ExecutionAttributes executionAttributes;
private final Function<SdkHttpFullResponse, SdkHttpFullResponse> crc32Validator;
Expand Down Expand Up @@ -75,6 +75,7 @@ public void onError(Throwable err) {

@Override
public CompletableFuture<T> prepare() {
streamFuture = new CompletableFuture<>();
return streamFuture.thenComposeAsync(baos -> {
ByteArrayInputStream content = new ByteArrayInputStream(baos.toByteArray());
// Ignore aborts - we already have all of the content.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.core.internal.Response;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.retry.RetryHandler;
import software.amazon.awssdk.core.internal.util.CapacityManager;
Expand All @@ -49,14 +50,17 @@ public final class AsyncRetryableStage<OutputT> implements RequestPipeline<SdkHt

private static final Logger log = LoggerFactory.getLogger(AsyncRetryableStage.class);

private final TransformingAsyncResponseHandler<OutputT> responseHandler;
private final RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline;
private final ScheduledExecutorService scheduledExecutor;
private final HttpClientDependencies dependencies;
private final CapacityManager retryCapacity;
private final RetryPolicy retryPolicy;

public AsyncRetryableStage(HttpClientDependencies dependencies,
public AsyncRetryableStage(TransformingAsyncResponseHandler<OutputT> responseHandler,
HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline) {
this.responseHandler = responseHandler;
this.dependencies = dependencies;
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
Expand Down Expand Up @@ -128,6 +132,11 @@ private void retryResponseIfNeeded(Response<OutputT> resp, CompletableFuture<Res
}

if (shouldRetry(resp.httpResponse(), resp.exception())) {
// We only notify onError if we are retrying the request.
// Otherwise we rely on the generated code in the in the
// client class to forward exception to the handler's
// exceptionOcurred method.
responseHandler.onError(err);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment about the last error being delivered in the client? That's a bit tricky and not obvious from the code.

retryHandler.setLastRetriedException(err);
executeRetry(future);
} else {
Expand All @@ -143,6 +152,11 @@ private void retryErrorIfNeeded(SdkException err, CompletableFuture<Response<Out
}

if (shouldRetry(null, err)) {
// We only notify onError if we are retrying the request.
// Otherwise we rely on the generated code in the in the client
// class to forward exception to the handler's exceptionOcurred
// method.
responseHandler.onError(err);
retryHandler.setLastRetriedException(err);
executeRetry(future);
} else {
Expand Down Expand Up @@ -178,4 +192,4 @@ private CompletableFuture<Response<OutputT>> doExecute() throws Exception {
return requestPipeline.execute(retryHandler.addRetryInfoHeader(request, requestCount), context);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
CompletableFuture<? extends SdkException> errorResponseFuture =
errorResponseHandler == null ? null : errorResponseHandler.prepare();

CompletableFuture<Response<OutputT>> responseFuture = new CompletableFuture<>();

//FIXME(dongie): We need to be careful to only call responseHandler.prepare() exactly once per execute() call
//because it calls prepare() under the hood and we guarantee that we call that once per execution. It would be good
//to find a way to prevent multiple calls to prepare() within a single execution to only call prepare() once.
ResponseHandler handler = new ResponseHandler(responseHandler.prepare(), errorResponseFuture);
ResponseHandler handler = new ResponseHandler(responseFuture, responseHandler.prepare(), errorResponseFuture);

SdkHttpContentPublisher requestProvider = context.requestProvider() == null
? new SimpleHttpContentPublisher(request)
Expand All @@ -112,7 +114,6 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque

CompletableFuture<Response<OutputT>> transformFuture = handler.prepare();

CompletableFuture<Response<OutputT>> responseFuture = new CompletableFuture<>();
TimeoutTracker timeoutTracker = setupAttemptTimer(responseFuture, context);
context.apiCallAttemptTimeoutTracker(timeoutTracker);

Expand Down Expand Up @@ -173,13 +174,16 @@ private TimeoutTracker setupAttemptTimer(CompletableFuture<Response<OutputT>> ex
*/

private class ResponseHandler implements TransformingAsyncResponseHandler<Response<OutputT>> {
private final CompletableFuture<Response<OutputT>> responseFuture;
private final CompletableFuture<SdkHttpResponse> headersFuture = new CompletableFuture<>();
private final CompletableFuture<OutputT> transformFuture;
private final CompletableFuture<? extends SdkException> errorTransformFuture;
private volatile SdkHttpFullResponse response;

ResponseHandler(CompletableFuture<OutputT> transformFuture,
ResponseHandler(CompletableFuture<Response<OutputT>> responseFuture,
CompletableFuture<OutputT> transformFuture,
CompletableFuture<? extends SdkException> errorTransformFuture) {
this.responseFuture = responseFuture;
this.transformFuture = transformFuture;
this.errorTransformFuture = errorTransformFuture;
}
Expand All @@ -199,18 +203,12 @@ public void onHeaders(SdkHttpResponse response) {

@Override
public void onError(Throwable error) {
// If we already have the headers we've chosen one of the two
// handlers so notify the correct handler. Otherwise, just complete
// the future exceptionally
if (response != null) {
if (response.isSuccessful()) {
responseHandler.onError(error);
} else {
errorResponseHandler.onError(error);
}
} else {
headersFuture.completeExceptionally(error);
}
// Note: We don't notify the wrapped handlers' onError here because
// we need context about retries; we only want to notify onError if
// we know that the request will be retried. Let the
// AsyncRetryableStage handle it.
responseFuture.completeExceptionally(error);
headersFuture.completeExceptionally(error);
}

@Override
Expand All @@ -228,7 +226,11 @@ public CompletableFuture<Response<OutputT>> prepare() {
if (headers.isSuccessful()) {
return transformFuture.thenApply(r -> Response.fromSuccess(r, response));
} else {
return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response));
if (errorTransformFuture != null) {
return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response));
} else {
return CompletableFuture.completedFuture(Response.fromFailure(null, response));
}
}
});
}
Expand All @@ -248,4 +250,4 @@ private static SdkHttpFullResponse toFullResponse(SdkHttpResponse response) {
response.statusText().ifPresent(builder::statusText);
return builder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription;
import software.amazon.awssdk.utils.FunctionalUtils.UnsafeRunnable;
import software.amazon.awssdk.utils.async.DelegatingSubscription;
Expand Down Expand Up @@ -239,6 +240,9 @@ public void onError(Throwable t) {
try {
runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()),
() -> subscriber.onError(t));
SdkAsyncHttpResponseHandler handler = requestContext.handler();
runAndLogError(String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError.", handler),
() -> handler.onError(t));
executeFuture.completeExceptionally(t);
} finally {
runAndLogError("Could not release channel back to the pool",
Expand Down