diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java index eef40278fd0d..091f6c62c29c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java @@ -199,7 +199,7 @@ public CompletableFuture execute(TransformingAsyncResponseHan .first(SigningStage::new) .then(BeforeTransmissionExecutionInterceptorsStage::new) .then(d -> new MakeAsyncHttpRequestStage<>(responseHandler, errorResponseHandler, d)) - .wrappedWith(AsyncRetryableStage::new) + .wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps, wrapped)) .then(async(() -> new UnwrapResponseContainer<>())) .then(async(() -> new AfterExecutionInterceptorsStage<>())) .wrappedWith(AsyncExecutionFailureExceptionReportingStage::new) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/SyncResponseHandlerAdapter.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/SyncResponseHandlerAdapter.java index ab6fc4cabfcc..aee3b7344bca 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/SyncResponseHandlerAdapter.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/SyncResponseHandlerAdapter.java @@ -44,7 +44,7 @@ */ @SdkInternalApi public final class SyncResponseHandlerAdapter implements TransformingAsyncResponseHandler { - private final CompletableFuture streamFuture = new CompletableFuture<>(); + private volatile CompletableFuture streamFuture; private final HttpResponseHandler responseHandler; private final ExecutionAttributes executionAttributes; private final Function crc32Validator; @@ -75,6 +75,7 @@ public void onError(Throwable err) { @Override public CompletableFuture prepare() { + streamFuture = new CompletableFuture<>(); return streamFuture.thenComposeAsync(baos -> { ByteArrayInputStream content = new ByteArrayInputStream(baos.toByteArray()); // Ignore aborts - we already have all of the content. diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java index b28a08549006..1eace3f018b1 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.core.internal.Response; import software.amazon.awssdk.core.internal.http.HttpClientDependencies; import software.amazon.awssdk.core.internal.http.RequestExecutionContext; +import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler; import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline; import software.amazon.awssdk.core.internal.retry.RetryHandler; import software.amazon.awssdk.core.internal.util.CapacityManager; @@ -49,14 +50,17 @@ public final class AsyncRetryableStage implements RequestPipeline responseHandler; private final RequestPipeline>> requestPipeline; private final ScheduledExecutorService scheduledExecutor; private final HttpClientDependencies dependencies; private final CapacityManager retryCapacity; private final RetryPolicy retryPolicy; - public AsyncRetryableStage(HttpClientDependencies dependencies, + public AsyncRetryableStage(TransformingAsyncResponseHandler responseHandler, + HttpClientDependencies dependencies, RequestPipeline>> requestPipeline) { + this.responseHandler = responseHandler; this.dependencies = dependencies; this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE); this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY); @@ -128,6 +132,11 @@ private void retryResponseIfNeeded(Response resp, CompletableFuture> doExecute() throws Exception { return requestPipeline.execute(retryHandler.addRetryInfoHeader(request, requestCount), context); } } -} \ No newline at end of file +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java index cc9107d11997..f6d83c0c84a5 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java @@ -90,10 +90,12 @@ private CompletableFuture> executeHttpRequest(SdkHttpFullReque CompletableFuture errorResponseFuture = errorResponseHandler == null ? null : errorResponseHandler.prepare(); + CompletableFuture> responseFuture = new CompletableFuture<>(); + //FIXME(dongie): We need to be careful to only call responseHandler.prepare() exactly once per execute() call //because it calls prepare() under the hood and we guarantee that we call that once per execution. It would be good //to find a way to prevent multiple calls to prepare() within a single execution to only call prepare() once. - ResponseHandler handler = new ResponseHandler(responseHandler.prepare(), errorResponseFuture); + ResponseHandler handler = new ResponseHandler(responseFuture, responseHandler.prepare(), errorResponseFuture); SdkHttpContentPublisher requestProvider = context.requestProvider() == null ? new SimpleHttpContentPublisher(request) @@ -112,7 +114,6 @@ private CompletableFuture> executeHttpRequest(SdkHttpFullReque CompletableFuture> transformFuture = handler.prepare(); - CompletableFuture> responseFuture = new CompletableFuture<>(); TimeoutTracker timeoutTracker = setupAttemptTimer(responseFuture, context); context.apiCallAttemptTimeoutTracker(timeoutTracker); @@ -173,13 +174,16 @@ private TimeoutTracker setupAttemptTimer(CompletableFuture> ex */ private class ResponseHandler implements TransformingAsyncResponseHandler> { + private final CompletableFuture> responseFuture; private final CompletableFuture headersFuture = new CompletableFuture<>(); private final CompletableFuture transformFuture; private final CompletableFuture errorTransformFuture; private volatile SdkHttpFullResponse response; - ResponseHandler(CompletableFuture transformFuture, + ResponseHandler(CompletableFuture> responseFuture, + CompletableFuture transformFuture, CompletableFuture errorTransformFuture) { + this.responseFuture = responseFuture; this.transformFuture = transformFuture; this.errorTransformFuture = errorTransformFuture; } @@ -199,18 +203,12 @@ public void onHeaders(SdkHttpResponse response) { @Override public void onError(Throwable error) { - // If we already have the headers we've chosen one of the two - // handlers so notify the correct handler. Otherwise, just complete - // the future exceptionally - if (response != null) { - if (response.isSuccessful()) { - responseHandler.onError(error); - } else { - errorResponseHandler.onError(error); - } - } else { - headersFuture.completeExceptionally(error); - } + // Note: We don't notify the wrapped handlers' onError here because + // we need context about retries; we only want to notify onError if + // we know that the request will be retried. Let the + // AsyncRetryableStage handle it. + responseFuture.completeExceptionally(error); + headersFuture.completeExceptionally(error); } @Override @@ -228,7 +226,11 @@ public CompletableFuture> prepare() { if (headers.isSuccessful()) { return transformFuture.thenApply(r -> Response.fromSuccess(r, response)); } else { - return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response)); + if (errorTransformFuture != null) { + return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response)); + } else { + return CompletableFuture.completedFuture(Response.fromFailure(null, response)); + } } }); } @@ -248,4 +250,4 @@ private static SdkHttpFullResponse toFullResponse(SdkHttpResponse response) { response.statusText().ifPresent(builder::statusText); return builder.build(); } -} \ No newline at end of file +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java index e6bdda1a39b9..dc78f5f5fc61 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java @@ -53,6 +53,7 @@ import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription; import software.amazon.awssdk.utils.FunctionalUtils.UnsafeRunnable; import software.amazon.awssdk.utils.async.DelegatingSubscription; @@ -239,6 +240,9 @@ public void onError(Throwable t) { try { runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()), () -> subscriber.onError(t)); + SdkAsyncHttpResponseHandler handler = requestContext.handler(); + runAndLogError(String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError.", handler), + () -> handler.onError(t)); executeFuture.completeExceptionally(t); } finally { runAndLogError("Could not release channel back to the pool",