Skip to content

Commit dc41c0d

Browse files
authored
Attempt to fix the flakiness of the codegen-generated-classes-test tests. (#2769)
I wasn't able to reproduce the error locally or in codebuild, which manifests as an index-out-of-bounds exception in BaseAsyncCoreMetricsTest>apiCall_allRetryAttemptsFailedOfNetworkError:108 for async sub-classes. This fixes one potential cause of the error: the result future for an operation can theoretically be completed before the metric gathering future (which is the HTTP call completion future). This change prevents completing the operation future until the HTTP call completion future is completed (in addition to the current future that's checked, the response handling future).
1 parent a1d0ce3 commit dc41c0d

File tree

3 files changed

+98
-21
lines changed

3 files changed

+98
-21
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,18 +193,15 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
193193
TimeoutTracker timeoutTracker = setupAttemptTimer(responseFuture, context);
194194
context.apiCallAttemptTimeoutTracker(timeoutTracker);
195195

196-
// Forward the cancellation
197-
responseFuture.whenComplete((r, t) -> {
198-
if (t != null) {
199-
httpClientFuture.completeExceptionally(t);
200-
}
201-
});
196+
// Forward potential cancellations to the upstream futures that our result future depends on.
197+
CompletableFutureUtils.forwardExceptionTo(responseFuture, httpClientFuture);
198+
CompletableFutureUtils.forwardExceptionTo(responseFuture, responseHandlerFuture);
202199

203-
// Offload the completion of the future returned from this stage onto
204-
// the future completion executor
205-
responseHandlerFuture.whenCompleteAsync((r, t) -> {
200+
// When the response handler and HTTP client are done processing the request, use the future completion executor
201+
// to complete the future returned by this function.
202+
CompletableFuture.allOf(responseHandlerFuture, httpClientFuture).whenCompleteAsync((r, t) -> {
206203
if (t == null) {
207-
responseFuture.complete(r);
204+
responseFuture.complete(responseHandlerFuture.join());
208205
} else {
209206
responseFuture.completeExceptionally(t);
210207
}
@@ -218,7 +215,6 @@ private CompletableFuture<Void> doExecuteHttpRequest(RequestExecutionContext con
218215
long callStart = System.nanoTime();
219216
CompletableFuture<Void> httpClientFuture = sdkAsyncHttpClient.execute(executeRequest);
220217

221-
// Offload the metrics reporting from this stage onto the future completion executor
222218
CompletableFuture<Void> result = httpClientFuture.whenComplete((r, t) -> {
223219
long duration = System.nanoTime() - callStart;
224220
metricCollector.reportMetric(CoreMetric.SERVICE_CALL_DURATION, Duration.ofNanos(duration));

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,24 @@
4040
import org.mockito.ArgumentCaptor;
4141
import org.mockito.Mock;
4242
import org.mockito.runners.MockitoJUnitRunner;
43+
import software.amazon.awssdk.core.Response;
44+
import software.amazon.awssdk.core.async.EmptyPublisher;
4345
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
4446
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
4547
import software.amazon.awssdk.core.http.ExecutionContext;
4648
import software.amazon.awssdk.core.http.NoopTestRequest;
4749
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
4850
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
4951
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
52+
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
5053
import software.amazon.awssdk.core.internal.http.timers.ClientExecutionAndRequestTimerTestUtils;
5154
import software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils;
5255
import software.amazon.awssdk.http.SdkHttpFullRequest;
5356
import software.amazon.awssdk.http.SdkHttpMethod;
5457
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
5558
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5659
import software.amazon.awssdk.metrics.MetricCollector;
60+
import software.amazon.awssdk.metrics.NoOpMetricCollector;
5761
import utils.ValidSdkObjects;
5862

5963
@RunWith(MockitoJUnitRunner.class)
@@ -65,7 +69,7 @@ public class MakeAsyncHttpRequestStageTest {
6569
@Mock
6670
private ScheduledExecutorService timeoutExecutor;
6771

68-
private CompletableFuture<Void> clientExecuteFuture = CompletableFuture.completedFuture(null);
72+
private CompletableFuture<Void> clientExecuteFuture = new CompletableFuture<>();
6973

7074
@Mock
7175
private ScheduledFuture future;
@@ -107,8 +111,86 @@ public void apiCallAttemptTimeoutNotEnabled_shouldNotInvokeExecutor() throws Exc
107111
verify(timeoutExecutor, never()).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
108112
}
109113

114+
@Test
115+
public void success_stageShouldNotCompleteBeforeHttpClientFutureIsCompleted() throws Exception {
116+
TransformingAsyncResponseHandler<Response<Object>> handler =
117+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
118+
119+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
120+
CompletableFuture<SdkHttpFullRequest> requestFuture =
121+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
122+
123+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
124+
125+
assertThat(result.isDone()).isFalse();
126+
handler.onStream(new EmptyPublisher<>());
127+
assertThat(result.isDone()).isFalse();
128+
clientExecuteFuture.complete(null);
129+
assertThat(result.isDone()).isTrue();
130+
assertThat(result.isCompletedExceptionally()).isFalse();
131+
}
132+
133+
@Test
134+
public void success_stageShouldNotCompleteBeforeResponseHandlerFutureIsCompleted() throws Exception {
135+
TransformingAsyncResponseHandler<Response<Object>> handler =
136+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
137+
138+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
139+
CompletableFuture<SdkHttpFullRequest> requestFuture =
140+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
141+
142+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
143+
144+
assertThat(result.isDone()).isFalse();
145+
clientExecuteFuture.complete(null);
146+
assertThat(result.isDone()).isFalse();
147+
handler.onStream(new EmptyPublisher<>());
148+
assertThat(result.isDone()).isTrue();
149+
assertThat(result.isCompletedExceptionally()).isFalse();
150+
}
151+
152+
@Test
153+
public void failure_stageShouldNotCompleteBeforeHttpClientFutureIsCompleted() throws Exception {
154+
TransformingAsyncResponseHandler<Response<Object>> handler =
155+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
156+
157+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
158+
CompletableFuture<SdkHttpFullRequest> requestFuture =
159+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
160+
161+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
162+
163+
assertThat(result.isDone()).isFalse();
164+
handler.onError(new Throwable());
165+
assertThat(result.isDone()).isFalse();
166+
clientExecuteFuture.complete(null);
167+
assertThat(result.isDone()).isTrue();
168+
assertThat(result.isCompletedExceptionally()).isTrue();
169+
}
170+
171+
@Test
172+
public void failure_stageShouldNotCompleteBeforeResponseHandlerFutureIsCompleted() throws Exception {
173+
TransformingAsyncResponseHandler<Response<Object>> handler =
174+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
175+
176+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
177+
CompletableFuture<SdkHttpFullRequest> requestFuture =
178+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
179+
180+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
181+
182+
assertThat(result.isDone()).isFalse();
183+
clientExecuteFuture.completeExceptionally(new Throwable());
184+
assertThat(result.isDone()).isFalse();
185+
handler.onStream(new EmptyPublisher<>());
186+
assertThat(result.isDone()).isTrue();
187+
assertThat(result.isCompletedExceptionally()).isTrue();
188+
}
189+
110190
@Test
111191
public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest() {
192+
clientExecuteFuture.complete(null);
193+
112194
stage = new MakeAsyncHttpRequestStage<>(
113195
combinedAsyncResponseHandler(AsyncResponseHandlerTestUtils.noOpResponseHandler(),
114196
AsyncResponseHandlerTestUtils.noOpResponseHandler()),
@@ -168,9 +250,11 @@ private HttpClientDependencies clientDependencies(Duration timeout) {
168250

169251
private RequestExecutionContext requestContext() {
170252
ExecutionContext executionContext = ClientExecutionAndRequestTimerTestUtils.executionContext(ValidSdkObjects.sdkHttpFullRequest().build());
171-
return RequestExecutionContext.builder()
172-
.executionContext(executionContext)
173-
.originalRequest(NoopTestRequest.builder().build())
174-
.build();
253+
RequestExecutionContext requestContext = RequestExecutionContext.builder()
254+
.executionContext(executionContext)
255+
.originalRequest(NoopTestRequest.builder().build())
256+
.build();
257+
requestContext.attemptMetricCollector(NoOpMetricCollector.create());
258+
return requestContext;
175259
}
176260
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import software.amazon.awssdk.annotations.SdkInternalApi;
5757
import software.amazon.awssdk.http.HttpStatusFamily;
5858
import software.amazon.awssdk.http.Protocol;
59-
import software.amazon.awssdk.http.SdkCancellationException;
6059
import software.amazon.awssdk.http.SdkHttpFullResponse;
6160
import software.amazon.awssdk.http.SdkHttpResponse;
6261
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
@@ -239,10 +238,8 @@ private void onCancel() {
239238
return;
240239
}
241240
try {
242-
SdkCancellationException e = new SdkCancellationException(
243-
"Subscriber cancelled before all events were published");
244-
log.warn("Subscriber cancelled before all events were published");
245-
executeFuture.completeExceptionally(e);
241+
log.warn("Subscriber cancelled before all events were published.");
242+
executeFuture.complete(null);
246243
} finally {
247244
runAndLogError("Could not release channel back to the pool",
248245
() -> closeAndRelease(channelContext));

0 commit comments

Comments
 (0)