Skip to content

Fixed issue with leased connection leaks when threads executing HTTP … #4066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-ApacheHTTPClient-bf0095f.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Apache HTTP Client",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not just applicable to Apache HTTP client right? Should we use AWS SDK for Java v2 or whatever the right category for core?

"contributor": "",
"description": "Fixed issue with leased connection leaks when threads executing HTTP connections with Apache HttpClient were interrupted while the connection was in progress."
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AfterTransmissionExecutionInterceptorsStage
@Override
public Pair<SdkHttpFullRequest, SdkHttpFullResponse> execute(Pair<SdkHttpFullRequest, SdkHttpFullResponse> input,
RequestExecutionContext context) throws Exception {
InterruptMonitor.checkInterrupted();
InterruptMonitor.checkInterrupted(input.right());
// Update interceptor context
InterceptorContext interceptorContext =
context.executionContext().interceptorContext().copy(b -> b.httpResponse(input.right())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ private HttpExecuteResponse executeHttpRequest(SdkHttpFullRequest request, Reque
context.apiCallAttemptTimeoutTracker().abortable(requestCallable);

Pair<HttpExecuteResponse, Duration> measuredExecute = MetricUtils.measureDurationUnsafe(requestCallable);

attemptMetricCollector.reportMetric(CoreMetric.SERVICE_CALL_DURATION, measuredExecute.right());

return measuredExecute.left();
}

Expand Down
6 changes: 6 additions & 0 deletions test/protocol-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.protocol.tests.connection;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static org.assertj.core.api.Assertions.assertThat;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import java.net.URI;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClient;
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClientBuilder;
import software.amazon.awssdk.services.protocolrestjson.model.AllTypesResponse;

/**
* Tests to verify Interruption of Threads while Http Connection is in progress to make sure Resources are released.
*/
class SyncClientConnectionInterruptionTest {
public static final String SAMPLE_BODY = "{\"StringMember"
+ "\":\"resultString\"}";
private final WireMockServer mockServer = new WireMockServer(new WireMockConfiguration()
.bindAddress("localhost")
.dynamicPort());
@BeforeEach
public void setup() {
mockServer.start();
stubPostRequest(".*", aResponse(), "{}");
}

@Test
void connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection() throws Exception {
Integer LONG_DELAY = 1500;

String urlRegex = "/2016-03-11/allTypes";
stubPostRequest(urlRegex, aResponse().withFixedDelay(LONG_DELAY), SAMPLE_BODY);
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(1).build();
ProtocolRestJsonClient client = getClient(httpClient, Duration.ofMillis(2L * LONG_DELAY)).build();

ExecutorService executorService = Executors.newFixedThreadPool(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the executor across all tests? This may speed up tests a bit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Future<?> toBeInterruptedFuture = executorService.submit(() -> client.allTypes());
unInterruptedSleep(LONG_DELAY - LONG_DELAY / 5);
toBeInterruptedFuture.cancel(true);
// Make sure thread start the Http connections
unInterruptedSleep(50);
AllTypesResponse allTypesResponse = client.allTypes();
assertThat(allTypesResponse.stringMember()).isEqualTo("resultString");
executorService.shutdownNow();
}

@Test
void connectionPoolsGetsReusedWhenInterruptedWith_Multiple_MaxConnection() throws Exception {
Copy link
Contributor

@zoewangg zoewangg Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this test? It seems we have verified the behavior (connection gets released back to the pool) in connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection. My concern with tests covering the same area is that it could increase the build time and the likelihood of test flakiness

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this test case

Integer LONG_DELAY = 1000;
Integer VERY_VERY_LONG_DELAY = LONG_DELAY * 5;
stubPostRequest("/2016-03-11/allTypes", aResponse().withFixedDelay(LONG_DELAY), SAMPLE_BODY);
stubPostRequest("/2016-03-11/JsonValuesOperation", aResponse().withFixedDelay(VERY_VERY_LONG_DELAY), SAMPLE_BODY);
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(3).build();
Duration timeOutDuration = Duration.ofMillis(2L * LONG_DELAY);
ProtocolRestJsonClient client = getClient(httpClient, timeOutDuration).build();

ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<?> toBeInterruptedFuture0 = executorService.submit(() -> client.allTypes());
Future<?> toBeInterruptedFuture1 = executorService.submit(() -> client.allTypes());
Future<?> toBeInterruptedFuture2 = executorService.submit(() -> client.allTypes());
unInterruptedSleep(50);
executorService.submit(() -> client.jsonValuesOperation());
unInterruptedSleep(LONG_DELAY / 2);
toBeInterruptedFuture0.cancel(true);
toBeInterruptedFuture1.cancel(true);
toBeInterruptedFuture2.cancel(true);
unInterruptedSleep(LONG_DELAY / 2);
// Make sure thread start the Http connections
AllTypesResponse allTypesResponse = client.allTypes();
assertThat(allTypesResponse.stringMember()).isEqualTo("resultString");
executorService.shutdownNow();
}

@Test
void interruptionWhenWaitingForLease_AbortsImmediately() throws InterruptedException {
Integer LONG_DELAY = 5000;
ExceptionInThreadRun exceptionInThreadRun = new ExceptionInThreadRun();
AtomicLong leaseWaitingTime = new AtomicLong(LONG_DELAY);
stubPostRequest("/2016-03-11/allTypes", aResponse().withFixedDelay(LONG_DELAY), SAMPLE_BODY);
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(1).build();
ProtocolRestJsonClient client = getClient(httpClient, Duration.ofMillis(2L * LONG_DELAY)).build();
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(() -> client.allTypes());
unInterruptedSleep(100);
Thread leaseWaitingThread = new Thread(() -> {

try {
client.allTypes(l -> l.overrideConfiguration(
b -> b
.apiCallAttemptTimeout(Duration.ofSeconds(10))
.addMetricPublisher(new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
System.out.println(metricCollection);
Optional<MetricRecord<?>> apiCallDuration =
metricCollection.stream().filter(o -> "ApiCallDuration" .equals(o.metric().name())).findAny();
leaseWaitingTime.set(Duration.parse(apiCallDuration.get().value().toString()).toMillis());
}

@Override
public void close() {
}
})
));

} catch (Exception exception) {
exceptionInThreadRun.setException(exception);

}
});

leaseWaitingThread.start();
unInterruptedSleep(100);
leaseWaitingThread.interrupt();
leaseWaitingThread.join();
assertThat(leaseWaitingTime.get()).isNotEqualTo(LONG_DELAY.longValue());
assertThat(leaseWaitingTime.get()).isLessThan(LONG_DELAY.longValue());
assertThat(exceptionInThreadRun.getException()).isInstanceOf(AbortedException.class);
client.close();
}

private static Stream<Arguments> httpClientImplementation() {
return Stream.of(Arguments.of(ApacheHttpClient.create()),
Arguments.of(UrlConnectionHttpClient.create()));
}

/**
* Service Latency is set to high value say X.
* Api timeout value id set to 1/3 of X.
* And we interrupt the thread at 90% of X.
* In this case since the ApiTimeOut first happened we should get ApiTimeOut Exception and not the interrupt.
*/
@ParameterizedTest
@MethodSource("httpClientImplementation")
void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException(SdkHttpClient httpClient) throws InterruptedException {
Integer SERVER_RESPONSE_DELAY = 3000;
stubPostRequest("/2016-03-11/allTypes", aResponse().withFixedDelay(SERVER_RESPONSE_DELAY), SAMPLE_BODY);
ExceptionInThreadRun exception = new ExceptionInThreadRun();
ProtocolRestJsonClient client =
getClient(httpClient, Duration.ofMillis(10)).overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none())).build();
unInterruptedSleep(100);
// We need to creat a separate thread to interrupt it externally.
Thread leaseWaitingThread = new Thread(() -> {
try {
client.allTypes(l -> l.overrideConfiguration(b -> b.apiCallAttemptTimeout(Duration.ofMillis(SERVER_RESPONSE_DELAY / 3))));
} catch (Exception e) {
exception.setException(e);
}
});
leaseWaitingThread.start();
unInterruptedSleep(SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10);
leaseWaitingThread.interrupt();
leaseWaitingThread.join();
assertThat(exception.getException()).isInstanceOf(ApiCallAttemptTimeoutException.class);
client.close();
}

@ParameterizedTest
@MethodSource("httpClientImplementation")
void sdkClientInterrupted_while_connectionIsInProgress(SdkHttpClient httpClient) throws InterruptedException {
Integer SERVER_RESPONSE_DELAY = 3000;
stubPostRequest("/2016-03-11/allTypes", aResponse().withFixedDelay(SERVER_RESPONSE_DELAY), SAMPLE_BODY);
ExceptionInThreadRun exception = new ExceptionInThreadRun();
ProtocolRestJsonClient client =
getClient(httpClient, Duration.ofMillis(10)).overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none())).build();
unInterruptedSleep(100);
// We need to creat a separate thread to interrupt it externally.
Thread leaseWaitingThread = new Thread(() -> {
try {
client.allTypes(l -> l.overrideConfiguration(b -> b.apiCallAttemptTimeout(Duration.ofMillis(SERVER_RESPONSE_DELAY * 3))));
} catch (Exception e) {
exception.setException(e);
}
});
leaseWaitingThread.start();
unInterruptedSleep(SERVER_RESPONSE_DELAY - SERVER_RESPONSE_DELAY / 10);
leaseWaitingThread.interrupt();
leaseWaitingThread.join();
assertThat(exception.getException()).isInstanceOf(AbortedException.class);
client.close();
}

private class ExceptionInThreadRun {
private Exception exception;
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
}

static void unInterruptedSleep(long millis){
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new IllegalStateException("This test sleep is not be interrupted");
}
}

private void stubPostRequest(String urlRegex, ResponseDefinitionBuilder LONG_DELAY, String body) {
mockServer.stubFor(post(urlMatching(urlRegex))
.willReturn(LONG_DELAY
.withStatus(200)
.withBody(body)));
}
private ProtocolRestJsonClientBuilder getClient(SdkHttpClient httpClient, Duration timeOutDuration) {
return ProtocolRestJsonClient.builder()
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("akid", "skid")))
.endpointOverride(URI.create("http://localhost:" + mockServer.port()))
.httpClient(httpClient)
.overrideConfiguration(o -> o.apiCallTimeout(timeOutDuration));

}
}