Skip to content

Commit 15abae1

Browse files
authored
Fixed issue with leased connection leaks when threads executing HTTP … (#4066)
* Fixed issue with leased connection leaks when threads executing HTTP connections with Apache HttpClient were interrupted while the connection was in progress. * Added logic in MakeHttpRequestStage to check and abort request if interrupted * Add test cases for UrlConnectionHttpClient * Moved the fix to AfterTransmissionExecutionInterceptorsStage to just close the stream instaed of aborting the reqyest in MakeHttpRequestStage * Removing test cases related to UrlConnectionHttp since adding depenency in protocol-test for urlConnectionClient cause failues since it uses default Client all the places * Updated after Zoe's comments
1 parent 69dcfc4 commit 15abae1

File tree

3 files changed

+212
-1
lines changed

3 files changed

+212
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed issue with leased connection leaks when threads executing HTTP connections with Apache HttpClient were interrupted while the connection was in progress."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AfterTransmissionExecutionInterceptorsStage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class AfterTransmissionExecutionInterceptorsStage
3131
@Override
3232
public Pair<SdkHttpFullRequest, SdkHttpFullResponse> execute(Pair<SdkHttpFullRequest, SdkHttpFullResponse> input,
3333
RequestExecutionContext context) throws Exception {
34-
InterruptMonitor.checkInterrupted();
34+
InterruptMonitor.checkInterrupted(input.right());
3535
// Update interceptor context
3636
InterceptorContext interceptorContext =
3737
context.executionContext().interceptorContext().copy(b -> b.httpResponse(input.right())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.protocol.tests.connection;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.post;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.github.tomakehurst.wiremock.WireMockServer;
24+
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
25+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
26+
import java.net.URI;
27+
import java.time.Duration;
28+
import java.util.List;
29+
import java.util.Optional;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
import org.junit.jupiter.api.AfterAll;
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
39+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
40+
import software.amazon.awssdk.core.exception.AbortedException;
41+
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
42+
import software.amazon.awssdk.core.retry.RetryPolicy;
43+
import software.amazon.awssdk.http.SdkHttpClient;
44+
import software.amazon.awssdk.http.apache.ApacheHttpClient;
45+
import software.amazon.awssdk.metrics.MetricCollection;
46+
import software.amazon.awssdk.metrics.MetricPublisher;
47+
import software.amazon.awssdk.metrics.MetricRecord;
48+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClient;
49+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClientBuilder;
50+
import software.amazon.awssdk.services.protocolrestjson.model.AllTypesResponse;
51+
52+
/**
53+
* Tests to verify Interruption of Threads while Http Connection is in progress to make sure Resources are released.
54+
*/
55+
class SyncClientConnectionInterruptionTest {
56+
public static final String SAMPLE_BODY = "{\"StringMember"
57+
+ "\":\"resultString\"}";
58+
private final WireMockServer mockServer = new WireMockServer(new WireMockConfiguration()
59+
.bindAddress("localhost").dynamicPort());
60+
61+
private static final ExecutorService executorService = Executors.newCachedThreadPool();
62+
63+
@BeforeEach
64+
public void setup() {
65+
mockServer.start();
66+
stubPostRequest(".*", aResponse(), "{}");
67+
}
68+
69+
@AfterAll
70+
public static void cleanUp(){
71+
executorService.shutdownNow();
72+
}
73+
74+
@Test
75+
void connectionPoolsGetsReusedWhenInterruptedWith_1_MaxConnection() throws Exception {
76+
Integer responseDelay = 1500;
77+
78+
String urlRegex = "/2016-03-11/allTypes";
79+
stubPostRequest(urlRegex, aResponse().withFixedDelay(responseDelay), SAMPLE_BODY);
80+
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(1).build();
81+
ProtocolRestJsonClient client = getClient(httpClient, Duration.ofMillis(2L * responseDelay)).build();
82+
83+
Future<?> toBeInterruptedFuture = executorService.submit(() -> client.allTypes());
84+
unInterruptedSleep(responseDelay - responseDelay / 5);
85+
toBeInterruptedFuture.cancel(true);
86+
// Make sure thread start the Http connections
87+
unInterruptedSleep(50);
88+
AllTypesResponse allTypesResponse = client.allTypes();
89+
assertThat(allTypesResponse.stringMember()).isEqualTo("resultString");
90+
executorService.shutdownNow();
91+
}
92+
93+
@Test
94+
void interruptionWhenWaitingForLease_AbortsImmediately() throws InterruptedException {
95+
Integer responseDelay = 50000;
96+
ExceptionInThreadRun exceptionInThreadRun = new ExceptionInThreadRun();
97+
AtomicLong leaseWaitingTime = new AtomicLong(responseDelay);
98+
stubPostRequest("/2016-03-11/allTypes", aResponse().withFixedDelay(responseDelay), SAMPLE_BODY);
99+
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(1).build();
100+
ProtocolRestJsonClient client = getClient(httpClient, Duration.ofMillis(2L * responseDelay)).build();
101+
executorService.submit(() -> client.allTypes());
102+
// 1 Sec sleep to make sure Thread 1 is picked for executing Http connection
103+
unInterruptedSleep(1000);
104+
Thread leaseWaitingThread = new Thread(() -> {
105+
106+
try {
107+
client.allTypes(l -> l.overrideConfiguration(
108+
b -> b
109+
.addMetricPublisher(new MetricPublisher() {
110+
@Override
111+
public void publish(MetricCollection metricCollection) {
112+
Optional<MetricRecord<?>> apiCallDuration =
113+
metricCollection.stream().filter(o -> "ApiCallDuration".equals(o.metric().name())).findAny();
114+
leaseWaitingTime.set(Duration.parse(apiCallDuration.get().value().toString()).toMillis());
115+
}
116+
117+
@Override
118+
public void close() {
119+
}
120+
})
121+
));
122+
123+
} catch (Exception exception) {
124+
exceptionInThreadRun.setException(exception);
125+
126+
}
127+
});
128+
129+
leaseWaitingThread.start();
130+
// 1 sec sleep to make sure Http connection execution is initialized for Thread 2 , in this case it will wait for lease
131+
// and immediately terminate on interrupt
132+
unInterruptedSleep(1000);
133+
leaseWaitingThread.interrupt();
134+
leaseWaitingThread.join();
135+
assertThat(leaseWaitingTime.get()).isNotEqualTo(responseDelay.longValue());
136+
assertThat(leaseWaitingTime.get()).isLessThan(responseDelay.longValue());
137+
assertThat(exceptionInThreadRun.getException()).isInstanceOf(AbortedException.class);
138+
client.close();
139+
}
140+
141+
/**
142+
* Service Latency is set to high value say X.
143+
* Api timeout value id set to 1/3 of X.
144+
* And we interrupt the thread at 90% of X.
145+
* In this case since the ApiTimeOut first happened we should get ApiTimeOut Exception and not the interrupt.
146+
*/
147+
@Test
148+
void interruptionDueToApiTimeOut_followed_byInterruptCausesOnlyTimeOutException() throws InterruptedException {
149+
SdkHttpClient httpClient = ApacheHttpClient.create();
150+
Integer responseDelay = 3000;
151+
stubPostRequest("/2016-03-11/allTypes", aResponse().withFixedDelay(responseDelay), SAMPLE_BODY);
152+
ExceptionInThreadRun exception = new ExceptionInThreadRun();
153+
ProtocolRestJsonClient client =
154+
getClient(httpClient, Duration.ofMillis(10)).overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none())).build();
155+
unInterruptedSleep(100);
156+
// We need to creat a separate thread to interrupt it externally.
157+
Thread leaseWaitingThread = new Thread(() -> {
158+
try {
159+
client.allTypes(l -> l.overrideConfiguration(b -> b.apiCallAttemptTimeout(Duration.ofMillis(responseDelay / 3))));
160+
} catch (Exception e) {
161+
exception.setException(e);
162+
}
163+
});
164+
leaseWaitingThread.start();
165+
unInterruptedSleep(responseDelay - responseDelay / 10);
166+
leaseWaitingThread.interrupt();
167+
leaseWaitingThread.join();
168+
assertThat(exception.getException()).isInstanceOf(ApiCallAttemptTimeoutException.class);
169+
client.close();
170+
}
171+
172+
private class ExceptionInThreadRun {
173+
private Exception exception;
174+
public Exception getException() {
175+
return exception;
176+
}
177+
public void setException(Exception exception) {
178+
this.exception = exception;
179+
}
180+
}
181+
182+
static void unInterruptedSleep(long millis){
183+
try {
184+
Thread.sleep(millis);
185+
} catch (InterruptedException e) {
186+
throw new IllegalStateException("This test sleep is not be interrupted");
187+
}
188+
}
189+
190+
private void stubPostRequest(String urlRegex, ResponseDefinitionBuilder LONG_DELAY, String body) {
191+
mockServer.stubFor(post(urlMatching(urlRegex))
192+
.willReturn(LONG_DELAY
193+
.withStatus(200)
194+
.withBody(body)));
195+
}
196+
private ProtocolRestJsonClientBuilder getClient(SdkHttpClient httpClient, Duration timeOutDuration) {
197+
return ProtocolRestJsonClient.builder()
198+
.credentialsProvider(
199+
StaticCredentialsProvider.create(AwsBasicCredentials.create("akid", "skid")))
200+
.endpointOverride(URI.create("http://localhost:" + mockServer.port()))
201+
.httpClient(httpClient)
202+
.overrideConfiguration(o -> o.apiCallTimeout(timeOutDuration));
203+
204+
}
205+
}

0 commit comments

Comments
 (0)