Skip to content

Commit 2388e64

Browse files
authored
Merge branch 'master' into httpclient-proxy-config
2 parents 3f34e5c + dc41c0d commit 2388e64

File tree

3 files changed

+98
-21
lines changed

3 files changed

+98
-21
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,18 +193,15 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
193193
TimeoutTracker timeoutTracker = setupAttemptTimer(responseFuture, context);
194194
context.apiCallAttemptTimeoutTracker(timeoutTracker);
195195

196-
// Forward the cancellation
197-
responseFuture.whenComplete((r, t) -> {
198-
if (t != null) {
199-
httpClientFuture.completeExceptionally(t);
200-
}
201-
});
196+
// Forward potential cancellations to the upstream futures that our result future depends on.
197+
CompletableFutureUtils.forwardExceptionTo(responseFuture, httpClientFuture);
198+
CompletableFutureUtils.forwardExceptionTo(responseFuture, responseHandlerFuture);
202199

203-
// Offload the completion of the future returned from this stage onto
204-
// the future completion executor
205-
responseHandlerFuture.whenCompleteAsync((r, t) -> {
200+
// When the response handler and HTTP client are done processing the request, use the future completion executor
201+
// to complete the future returned by this function.
202+
CompletableFuture.allOf(responseHandlerFuture, httpClientFuture).whenCompleteAsync((r, t) -> {
206203
if (t == null) {
207-
responseFuture.complete(r);
204+
responseFuture.complete(responseHandlerFuture.join());
208205
} else {
209206
responseFuture.completeExceptionally(t);
210207
}
@@ -218,7 +215,6 @@ private CompletableFuture<Void> doExecuteHttpRequest(RequestExecutionContext con
218215
long callStart = System.nanoTime();
219216
CompletableFuture<Void> httpClientFuture = sdkAsyncHttpClient.execute(executeRequest);
220217

221-
// Offload the metrics reporting from this stage onto the future completion executor
222218
CompletableFuture<Void> result = httpClientFuture.whenComplete((r, t) -> {
223219
long duration = System.nanoTime() - callStart;
224220
metricCollector.reportMetric(CoreMetric.SERVICE_CALL_DURATION, Duration.ofNanos(duration));

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,24 @@
4040
import org.mockito.ArgumentCaptor;
4141
import org.mockito.Mock;
4242
import org.mockito.runners.MockitoJUnitRunner;
43+
import software.amazon.awssdk.core.Response;
44+
import software.amazon.awssdk.core.async.EmptyPublisher;
4345
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
4446
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
4547
import software.amazon.awssdk.core.http.ExecutionContext;
4648
import software.amazon.awssdk.core.http.NoopTestRequest;
4749
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
4850
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
4951
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
52+
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
5053
import software.amazon.awssdk.core.internal.http.timers.ClientExecutionAndRequestTimerTestUtils;
5154
import software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils;
5255
import software.amazon.awssdk.http.SdkHttpFullRequest;
5356
import software.amazon.awssdk.http.SdkHttpMethod;
5457
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
5558
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5659
import software.amazon.awssdk.metrics.MetricCollector;
60+
import software.amazon.awssdk.metrics.NoOpMetricCollector;
5761
import utils.ValidSdkObjects;
5862

5963
@RunWith(MockitoJUnitRunner.class)
@@ -65,7 +69,7 @@ public class MakeAsyncHttpRequestStageTest {
6569
@Mock
6670
private ScheduledExecutorService timeoutExecutor;
6771

68-
private CompletableFuture<Void> clientExecuteFuture = CompletableFuture.completedFuture(null);
72+
private CompletableFuture<Void> clientExecuteFuture = new CompletableFuture<>();
6973

7074
@Mock
7175
private ScheduledFuture future;
@@ -107,8 +111,86 @@ public void apiCallAttemptTimeoutNotEnabled_shouldNotInvokeExecutor() throws Exc
107111
verify(timeoutExecutor, never()).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
108112
}
109113

114+
@Test
115+
public void success_stageShouldNotCompleteBeforeHttpClientFutureIsCompleted() throws Exception {
116+
TransformingAsyncResponseHandler<Response<Object>> handler =
117+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
118+
119+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
120+
CompletableFuture<SdkHttpFullRequest> requestFuture =
121+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
122+
123+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
124+
125+
assertThat(result.isDone()).isFalse();
126+
handler.onStream(new EmptyPublisher<>());
127+
assertThat(result.isDone()).isFalse();
128+
clientExecuteFuture.complete(null);
129+
assertThat(result.isDone()).isTrue();
130+
assertThat(result.isCompletedExceptionally()).isFalse();
131+
}
132+
133+
@Test
134+
public void success_stageShouldNotCompleteBeforeResponseHandlerFutureIsCompleted() throws Exception {
135+
TransformingAsyncResponseHandler<Response<Object>> handler =
136+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
137+
138+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
139+
CompletableFuture<SdkHttpFullRequest> requestFuture =
140+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
141+
142+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
143+
144+
assertThat(result.isDone()).isFalse();
145+
clientExecuteFuture.complete(null);
146+
assertThat(result.isDone()).isFalse();
147+
handler.onStream(new EmptyPublisher<>());
148+
assertThat(result.isDone()).isTrue();
149+
assertThat(result.isCompletedExceptionally()).isFalse();
150+
}
151+
152+
@Test
153+
public void failure_stageShouldNotCompleteBeforeHttpClientFutureIsCompleted() throws Exception {
154+
TransformingAsyncResponseHandler<Response<Object>> handler =
155+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
156+
157+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
158+
CompletableFuture<SdkHttpFullRequest> requestFuture =
159+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
160+
161+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
162+
163+
assertThat(result.isDone()).isFalse();
164+
handler.onError(new Throwable());
165+
assertThat(result.isDone()).isFalse();
166+
clientExecuteFuture.complete(null);
167+
assertThat(result.isDone()).isTrue();
168+
assertThat(result.isCompletedExceptionally()).isTrue();
169+
}
170+
171+
@Test
172+
public void failure_stageShouldNotCompleteBeforeResponseHandlerFutureIsCompleted() throws Exception {
173+
TransformingAsyncResponseHandler<Response<Object>> handler =
174+
AsyncResponseHandlerTestUtils.noOpResponseHandler();
175+
176+
stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
177+
CompletableFuture<SdkHttpFullRequest> requestFuture =
178+
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());
179+
180+
CompletableFuture<?> result = stage.execute(requestFuture, requestContext());
181+
182+
assertThat(result.isDone()).isFalse();
183+
clientExecuteFuture.completeExceptionally(new Throwable());
184+
assertThat(result.isDone()).isFalse();
185+
handler.onStream(new EmptyPublisher<>());
186+
assertThat(result.isDone()).isTrue();
187+
assertThat(result.isCompletedExceptionally()).isTrue();
188+
}
189+
110190
@Test
111191
public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest() {
192+
clientExecuteFuture.complete(null);
193+
112194
stage = new MakeAsyncHttpRequestStage<>(
113195
combinedAsyncResponseHandler(AsyncResponseHandlerTestUtils.noOpResponseHandler(),
114196
AsyncResponseHandlerTestUtils.noOpResponseHandler()),
@@ -168,9 +250,11 @@ private HttpClientDependencies clientDependencies(Duration timeout) {
168250

169251
private RequestExecutionContext requestContext() {
170252
ExecutionContext executionContext = ClientExecutionAndRequestTimerTestUtils.executionContext(ValidSdkObjects.sdkHttpFullRequest().build());
171-
return RequestExecutionContext.builder()
172-
.executionContext(executionContext)
173-
.originalRequest(NoopTestRequest.builder().build())
174-
.build();
253+
RequestExecutionContext requestContext = RequestExecutionContext.builder()
254+
.executionContext(executionContext)
255+
.originalRequest(NoopTestRequest.builder().build())
256+
.build();
257+
requestContext.attemptMetricCollector(NoOpMetricCollector.create());
258+
return requestContext;
175259
}
176260
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import software.amazon.awssdk.annotations.SdkInternalApi;
5757
import software.amazon.awssdk.http.HttpStatusFamily;
5858
import software.amazon.awssdk.http.Protocol;
59-
import software.amazon.awssdk.http.SdkCancellationException;
6059
import software.amazon.awssdk.http.SdkHttpFullResponse;
6160
import software.amazon.awssdk.http.SdkHttpResponse;
6261
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
@@ -239,10 +238,8 @@ private void onCancel() {
239238
return;
240239
}
241240
try {
242-
SdkCancellationException e = new SdkCancellationException(
243-
"Subscriber cancelled before all events were published");
244-
log.warn("Subscriber cancelled before all events were published");
245-
executeFuture.completeExceptionally(e);
241+
log.warn("Subscriber cancelled before all events were published.");
242+
executeFuture.complete(null);
246243
} finally {
247244
runAndLogError("Could not release channel back to the pool",
248245
() -> closeAndRelease(channelContext));

0 commit comments

Comments
 (0)