Skip to content

Commit 1a23009

Browse files
authored
Switch to handleAsync (#4023)
whenCompleteAsync is incorrect because if the response handler future completes exceptionally, then the returned future will also be completed with that exception which will incorrectly trigger a synchronous completion for the response future.
1 parent 754d525 commit 1a23009

File tree

2 files changed

+63
-6
lines changed

2 files changed

+63
-6
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,12 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
158158

159159
// Attempt to offload the completion of the future returned from this
160160
// stage onto the future completion executor
161-
CompletableFuture<Response<OutputT>> asyncComplete =
162-
responseHandlerFuture.whenCompleteAsync((r, t) -> completeResponseFuture(responseFuture, r, t),
163-
futureCompletionExecutor);
161+
CompletableFuture<Void> asyncComplete =
162+
responseHandlerFuture.handleAsync((r, t) -> {
163+
completeResponseFuture(responseFuture, r, t);
164+
return null;
165+
},
166+
futureCompletionExecutor);
164167

165168
// It's possible the async execution above fails. If so, log a warning,
166169
// and just complete it synchronously.

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
package software.amazon.awssdk.core.internal.http.pipeline.stages;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.ArgumentMatchers.anyLong;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.Mockito.doThrow;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.never;
26+
import static org.mockito.Mockito.spy;
2527
import static org.mockito.Mockito.times;
2628
import static org.mockito.Mockito.verify;
2729
import static org.mockito.Mockito.when;
@@ -33,6 +35,7 @@
3335
import java.time.Duration;
3436
import java.util.concurrent.CompletableFuture;
3537
import java.util.concurrent.ExecutorService;
38+
import java.util.concurrent.Executors;
3639
import java.util.concurrent.RejectedExecutionException;
3740
import java.util.concurrent.ScheduledExecutorService;
3841
import java.util.concurrent.ScheduledFuture;
@@ -58,6 +61,8 @@
5861
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
5962
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
6063
import software.amazon.awssdk.metrics.MetricCollector;
64+
import software.amazon.awssdk.utils.CompletableFutureUtils;
65+
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
6166
import utils.ValidSdkObjects;
6267

6368
@RunWith(MockitoJUnitRunner.class)
@@ -157,7 +162,7 @@ public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest
157162
}
158163

159164
@Test
160-
public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() {
165+
public void execute_handlerFutureCompletedNormally_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() {
161166
ExecutorService mockExecutor = mock(ExecutorService.class);
162167
doThrow(new RejectedExecutionException("Busy")).when(mockExecutor).execute(any(Runnable.class));
163168

@@ -169,7 +174,8 @@ public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompl
169174
HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build();
170175

171176
TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class);
172-
when(mockHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));
177+
CompletableFuture prepareFuture = new CompletableFuture();
178+
when(mockHandler.prepare()).thenReturn(prepareFuture);
173179

174180
stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies);
175181

@@ -179,10 +185,58 @@ public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompl
179185
CompletableFuture executeFuture = stage.execute(requestFuture, requestContext());
180186

181187
long testThreadId = Thread.currentThread().getId();
182-
executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId)).join();
188+
CompletableFuture afterWhenComplete =
189+
executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId));
190+
191+
prepareFuture.complete(null);
192+
193+
afterWhenComplete.join();
194+
183195
verify(mockExecutor).execute(any(Runnable.class));
184196
}
185197

198+
@Test
199+
public void execute_handlerFutureCompletedExceptionally_doesNotAttemptSynchronousComplete() {
200+
String threadNamePrefix = "async-handle-test";
201+
ExecutorService mockExecutor = Executors.newSingleThreadExecutor(
202+
new ThreadFactoryBuilder().threadNamePrefix(threadNamePrefix).build());
203+
204+
SdkClientConfiguration config =
205+
SdkClientConfiguration.builder()
206+
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, mockExecutor)
207+
.option(ASYNC_HTTP_CLIENT, sdkAsyncHttpClient)
208+
.build();
209+
HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build();
210+
211+
TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class);
212+
CompletableFuture prepareFuture = spy(new CompletableFuture());
213+
when(mockHandler.prepare()).thenReturn(prepareFuture);
214+
215+
stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies);
216+
217+
CompletableFuture<SdkHttpFullRequest> requestFuture = CompletableFuture.completedFuture(
218+
ValidSdkObjects.sdkHttpFullRequest().build());
219+
220+
CompletableFuture executeFuture = stage.execute(requestFuture, requestContext());
221+
222+
try {
223+
CompletableFuture afterHandle =
224+
executeFuture.handle((r, t) -> assertThat(Thread.currentThread().getName()).startsWith(threadNamePrefix));
225+
226+
prepareFuture.completeExceptionally(new RuntimeException("parse error"));
227+
228+
afterHandle.join();
229+
230+
assertThatThrownBy(executeFuture::join)
231+
.hasCauseInstanceOf(RuntimeException.class)
232+
.hasMessageContaining("parse error");
233+
234+
verify(prepareFuture, times(0)).whenComplete(any());
235+
} finally {
236+
mockExecutor.shutdown();
237+
}
238+
}
239+
186240
private HttpClientDependencies clientDependencies(Duration timeout) {
187241
SdkClientConfiguration configuration = SdkClientConfiguration.builder()
188242
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, Runnable::run)

0 commit comments

Comments
 (0)