Skip to content

Commit 3622883

Browse files
committed
Don't notify onError in MakeAsyncHttpRequestStage
Move the notification back on AsyncRetryable stage so we can decide whether to call onError based on whether we are retrying or not.
1 parent 07b72ee commit 3622883

File tree

5 files changed

+42
-21
lines changed

5 files changed

+42
-21
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public <OutputT> CompletableFuture<OutputT> execute(TransformingAsyncResponseHan
199199
.first(SigningStage::new)
200200
.then(BeforeTransmissionExecutionInterceptorsStage::new)
201201
.then(d -> new MakeAsyncHttpRequestStage<>(responseHandler, errorResponseHandler, d))
202-
.wrappedWith(AsyncRetryableStage::new)
202+
.wrappedWith((deps, wrapped) -> new AsyncRetryableStage<>(responseHandler, deps, wrapped))
203203
.then(async(() -> new UnwrapResponseContainer<>()))
204204
.then(async(() -> new AfterExecutionInterceptorsStage<>()))
205205
.wrappedWith(AsyncExecutionFailureExceptionReportingStage::new)

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/SyncResponseHandlerAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*/
4545
@SdkInternalApi
4646
public final class SyncResponseHandlerAdapter<T> implements TransformingAsyncResponseHandler<T> {
47-
private final CompletableFuture<ByteArrayOutputStream> streamFuture = new CompletableFuture<>();
47+
private volatile CompletableFuture<ByteArrayOutputStream> streamFuture;
4848
private final HttpResponseHandler<T> responseHandler;
4949
private final ExecutionAttributes executionAttributes;
5050
private final Function<SdkHttpFullResponse, SdkHttpFullResponse> crc32Validator;
@@ -75,6 +75,7 @@ public void onError(Throwable err) {
7575

7676
@Override
7777
public CompletableFuture<T> prepare() {
78+
streamFuture = new CompletableFuture<>();
7879
return streamFuture.thenComposeAsync(baos -> {
7980
ByteArrayInputStream content = new ByteArrayInputStream(baos.toByteArray());
8081
// Ignore aborts - we already have all of the content.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.core.internal.Response;
3131
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
3232
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
33+
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
3334
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
3435
import software.amazon.awssdk.core.internal.retry.RetryHandler;
3536
import software.amazon.awssdk.core.internal.util.CapacityManager;
@@ -49,14 +50,17 @@ public final class AsyncRetryableStage<OutputT> implements RequestPipeline<SdkHt
4950

5051
private static final Logger log = LoggerFactory.getLogger(AsyncRetryableStage.class);
5152

53+
private final TransformingAsyncResponseHandler<OutputT> responseHandler;
5254
private final RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline;
5355
private final ScheduledExecutorService scheduledExecutor;
5456
private final HttpClientDependencies dependencies;
5557
private final CapacityManager retryCapacity;
5658
private final RetryPolicy retryPolicy;
5759

58-
public AsyncRetryableStage(HttpClientDependencies dependencies,
60+
public AsyncRetryableStage(TransformingAsyncResponseHandler<OutputT> responseHandler,
61+
HttpClientDependencies dependencies,
5962
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline) {
63+
this.responseHandler = responseHandler;
6064
this.dependencies = dependencies;
6165
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
6266
this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
@@ -128,6 +132,11 @@ private void retryResponseIfNeeded(Response<OutputT> resp, CompletableFuture<Res
128132
}
129133

130134
if (shouldRetry(resp.httpResponse(), resp.exception())) {
135+
// We only notify onError if we are retrying the request.
136+
// Otherwise we rely on the generated code in the in the
137+
// client class to forward exception to the handler's
138+
// exceptionOcurred method.
139+
responseHandler.onError(err);
131140
retryHandler.setLastRetriedException(err);
132141
executeRetry(future);
133142
} else {
@@ -143,6 +152,11 @@ private void retryErrorIfNeeded(SdkException err, CompletableFuture<Response<Out
143152
}
144153

145154
if (shouldRetry(null, err)) {
155+
// We only notify onError if we are retrying the request.
156+
// Otherwise we rely on the generated code in the in the client
157+
// class to forward exception to the handler's exceptionOcurred
158+
// method.
159+
responseHandler.onError(err);
146160
retryHandler.setLastRetriedException(err);
147161
executeRetry(future);
148162
} else {
@@ -178,4 +192,4 @@ private CompletableFuture<Response<OutputT>> doExecute() throws Exception {
178192
return requestPipeline.execute(retryHandler.addRetryInfoHeader(request, requestCount), context);
179193
}
180194
}
181-
}
195+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,12 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
9090
CompletableFuture<? extends SdkException> errorResponseFuture =
9191
errorResponseHandler == null ? null : errorResponseHandler.prepare();
9292

93+
CompletableFuture<Response<OutputT>> responseFuture = new CompletableFuture<>();
94+
9395
//FIXME(dongie): We need to be careful to only call responseHandler.prepare() exactly once per execute() call
9496
//because it calls prepare() under the hood and we guarantee that we call that once per execution. It would be good
9597
//to find a way to prevent multiple calls to prepare() within a single execution to only call prepare() once.
96-
ResponseHandler handler = new ResponseHandler(responseHandler.prepare(), errorResponseFuture);
98+
ResponseHandler handler = new ResponseHandler(responseFuture, responseHandler.prepare(), errorResponseFuture);
9799

98100
SdkHttpContentPublisher requestProvider = context.requestProvider() == null
99101
? new SimpleHttpContentPublisher(request)
@@ -112,7 +114,6 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
112114

113115
CompletableFuture<Response<OutputT>> transformFuture = handler.prepare();
114116

115-
CompletableFuture<Response<OutputT>> responseFuture = new CompletableFuture<>();
116117
TimeoutTracker timeoutTracker = setupAttemptTimer(responseFuture, context);
117118
context.apiCallAttemptTimeoutTracker(timeoutTracker);
118119

@@ -173,13 +174,16 @@ private TimeoutTracker setupAttemptTimer(CompletableFuture<Response<OutputT>> ex
173174
*/
174175

175176
private class ResponseHandler implements TransformingAsyncResponseHandler<Response<OutputT>> {
177+
private final CompletableFuture<Response<OutputT>> responseFuture;
176178
private final CompletableFuture<SdkHttpResponse> headersFuture = new CompletableFuture<>();
177179
private final CompletableFuture<OutputT> transformFuture;
178180
private final CompletableFuture<? extends SdkException> errorTransformFuture;
179181
private volatile SdkHttpFullResponse response;
180182

181-
ResponseHandler(CompletableFuture<OutputT> transformFuture,
183+
ResponseHandler(CompletableFuture<Response<OutputT>> responseFuture,
184+
CompletableFuture<OutputT> transformFuture,
182185
CompletableFuture<? extends SdkException> errorTransformFuture) {
186+
this.responseFuture = responseFuture;
183187
this.transformFuture = transformFuture;
184188
this.errorTransformFuture = errorTransformFuture;
185189
}
@@ -199,18 +203,12 @@ public void onHeaders(SdkHttpResponse response) {
199203

200204
@Override
201205
public void onError(Throwable error) {
202-
// If we already have the headers we've chosen one of the two
203-
// handlers so notify the correct handler. Otherwise, just complete
204-
// the future exceptionally
205-
if (response != null) {
206-
if (response.isSuccessful()) {
207-
responseHandler.onError(error);
208-
} else {
209-
errorResponseHandler.onError(error);
210-
}
211-
} else {
212-
headersFuture.completeExceptionally(error);
213-
}
206+
// Note: We don't notify the wrapped handlers' onError here because
207+
// we need context about retries; we only want to notify onError if
208+
// we know that the request will be retried. Let the
209+
// AsyncRetryableStage handle it.
210+
responseFuture.completeExceptionally(error);
211+
headersFuture.completeExceptionally(error);
214212
}
215213

216214
@Override
@@ -228,7 +226,11 @@ public CompletableFuture<Response<OutputT>> prepare() {
228226
if (headers.isSuccessful()) {
229227
return transformFuture.thenApply(r -> Response.fromSuccess(r, response));
230228
} else {
231-
return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response));
229+
if (errorTransformFuture != null) {
230+
return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response));
231+
} else {
232+
return CompletableFuture.completedFuture(Response.fromFailure(null, response));
233+
}
232234
}
233235
});
234236
}
@@ -248,4 +250,4 @@ private static SdkHttpFullResponse toFullResponse(SdkHttpResponse response) {
248250
response.statusText().ifPresent(builder::statusText);
249251
return builder.build();
250252
}
251-
}
253+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import software.amazon.awssdk.http.SdkCancellationException;
5454
import software.amazon.awssdk.http.SdkHttpFullResponse;
5555
import software.amazon.awssdk.http.SdkHttpResponse;
56+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
5657
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription;
5758
import software.amazon.awssdk.utils.FunctionalUtils.UnsafeRunnable;
5859
import software.amazon.awssdk.utils.async.DelegatingSubscription;
@@ -239,6 +240,9 @@ public void onError(Throwable t) {
239240
try {
240241
runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()),
241242
() -> subscriber.onError(t));
243+
SdkAsyncHttpResponseHandler handler = requestContext.handler();
244+
runAndLogError(String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError.", handler),
245+
() -> handler.onError(t));
242246
executeFuture.completeExceptionally(t);
243247
} finally {
244248
runAndLogError("Could not release channel back to the pool",

0 commit comments

Comments
 (0)