Skip to content

Commit 754d525

Browse files
authored
Fix leaked futures when executor rejects Runnable (#4009)
This commit fixes an issue where CompletableFutures are leaked/never completed when the submission to the FUTURE_COMPLETE_EXECUTOR is rejected. By default, the SDK uses 2 * number of cores (with a maximum of 64), and uses bounded queue of size 1000. In cases where the throughput to the client exceeds the executor's ability to keep up, it would reject executions, and previously, this would lead to leaked futures.
1 parent 68329a7 commit 754d525

File tree

3 files changed

+64
-8
lines changed

3 files changed

+64
-8
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "This update fixes an issue where CompletableFutures are leaked/never completed when the submission to the FUTURE_COMPLETE_EXECUTOR is rejected.\n\nBy default, the SDK uses `2 * number of cores` (with a maximum of 64), and uses bounded queue of size 1000. In cases where the throughput to the client exceeds the executor's ability to keep up, it would reject executions. Before this change this would lead to leaked futures."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,26 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
156156
}
157157
});
158158

159-
// Offload the completion of the future returned from this stage onto
160-
// the future completion executor
161-
responseHandlerFuture.whenCompleteAsync((r, t) -> {
162-
if (t == null) {
163-
responseFuture.complete(r);
164-
} else {
165-
responseFuture.completeExceptionally(t);
159+
// Attempt to offload the completion of the future returned from this
160+
// stage onto the future completion executor
161+
CompletableFuture<Response<OutputT>> asyncComplete =
162+
responseHandlerFuture.whenCompleteAsync((r, t) -> completeResponseFuture(responseFuture, r, t),
163+
futureCompletionExecutor);
164+
165+
// It's possible the async execution above fails. If so, log a warning,
166+
// and just complete it synchronously.
167+
asyncComplete.whenComplete((ignored, asyncCompleteError) -> {
168+
if (asyncCompleteError != null) {
169+
log.debug(() -> String.format("Could not complete the service call future on the provided "
170+
+ "FUTURE_COMPLETION_EXECUTOR. The future will be completed synchronously by thread"
171+
+ " %s. This may be an indication that the executor is being overwhelmed by too"
172+
+ " many requests, and it may negatively affect performance. Consider changing "
173+
+ "the configuration of the executor to accommodate the load through the client.",
174+
Thread.currentThread().getName()),
175+
asyncCompleteError);
176+
responseHandlerFuture.whenComplete((r, t) -> completeResponseFuture(responseFuture, r, t));
166177
}
167-
}, futureCompletionExecutor);
178+
});
168179

169180
return responseFuture;
170181
}
@@ -219,6 +230,14 @@ private TimeoutTracker setupAttemptTimer(CompletableFuture<Response<OutputT>> ex
219230
timeoutMillis);
220231
}
221232

233+
private void completeResponseFuture(CompletableFuture<Response<OutputT>> responseFuture, Response<OutputT> r, Throwable t) {
234+
if (t == null) {
235+
responseFuture.complete(r);
236+
} else {
237+
responseFuture.completeExceptionally(t);
238+
}
239+
}
240+
222241
/**
223242
* When an operation has a streaming input, the customer must supply an {@link AsyncRequestBody} to
224243
* provide the request content in a non-blocking manner. This adapts that interface to the

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.ArgumentMatchers.any;
2020
import static org.mockito.ArgumentMatchers.anyLong;
2121
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.Mockito.doThrow;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.never;
2425
import static org.mockito.Mockito.times;
@@ -31,6 +32,8 @@
3132

3233
import java.time.Duration;
3334
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.RejectedExecutionException;
3437
import java.util.concurrent.ScheduledExecutorService;
3538
import java.util.concurrent.ScheduledFuture;
3639
import java.util.concurrent.TimeUnit;
@@ -47,6 +50,7 @@
4750
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
4851
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
4952
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
53+
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
5054
import software.amazon.awssdk.core.internal.http.timers.ClientExecutionAndRequestTimerTestUtils;
5155
import software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils;
5256
import software.amazon.awssdk.http.SdkHttpFullRequest;
@@ -152,6 +156,33 @@ public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest
152156
}
153157
}
154158

159+
@Test
160+
public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() {
161+
ExecutorService mockExecutor = mock(ExecutorService.class);
162+
doThrow(new RejectedExecutionException("Busy")).when(mockExecutor).execute(any(Runnable.class));
163+
164+
SdkClientConfiguration config =
165+
SdkClientConfiguration.builder()
166+
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, mockExecutor)
167+
.option(ASYNC_HTTP_CLIENT, sdkAsyncHttpClient)
168+
.build();
169+
HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build();
170+
171+
TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class);
172+
when(mockHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));
173+
174+
stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies);
175+
176+
CompletableFuture<SdkHttpFullRequest> requestFuture = CompletableFuture.completedFuture(
177+
ValidSdkObjects.sdkHttpFullRequest().build());
178+
179+
CompletableFuture executeFuture = stage.execute(requestFuture, requestContext());
180+
181+
long testThreadId = Thread.currentThread().getId();
182+
executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId)).join();
183+
verify(mockExecutor).execute(any(Runnable.class));
184+
}
185+
155186
private HttpClientDependencies clientDependencies(Duration timeout) {
156187
SdkClientConfiguration configuration = SdkClientConfiguration.builder()
157188
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, Runnable::run)

0 commit comments

Comments
 (0)