Skip to content

Commit 7b6c66e

Browse files
committed
Fixed issue with leased connection leaks when threads executing HTTP connections with Apache HttpClient were interrupted while the connection was in progress.
1 parent d678720 commit 7b6c66e

File tree

3 files changed

+288
-0
lines changed

3 files changed

+288
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Apache HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed issue with leased connection leaks when threads executing HTTP connections with Apache HttpClient were interrupted while the connection was in progress."
6+
}

http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/ApacheHttpClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;
2525

2626
import java.io.IOException;
27+
import java.io.InterruptedIOException;
2728
import java.net.InetAddress;
2829
import java.security.KeyManagementException;
2930
import java.security.NoSuchAlgorithmException;
@@ -229,6 +230,10 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
229230
@Override
230231
public HttpExecuteResponse call() throws IOException {
231232
HttpExecuteResponse executeResponse = execute(apacheRequest, metricCollector);
233+
if (Thread.interrupted()) {
234+
abort();
235+
throw new InterruptedIOException(Thread.currentThread().getName() + " was interrupted");
236+
}
232237
collectPoolMetric(metricCollector);
233238
return executeResponse;
234239
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.apache;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.get;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
21+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
22+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
23+
24+
import com.github.tomakehurst.wiremock.WireMockServer;
25+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
26+
import java.io.IOException;
27+
import java.net.URI;
28+
import java.time.Duration;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import org.apache.http.conn.ConnectionPoolTimeoutException;
35+
import org.apache.http.impl.execchain.RequestAbortedException;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
import software.amazon.awssdk.http.HttpExecuteRequest;
39+
import software.amazon.awssdk.http.HttpExecuteResponse;
40+
import software.amazon.awssdk.http.SdkHttpClient;
41+
import software.amazon.awssdk.http.SdkHttpMethod;
42+
import software.amazon.awssdk.http.SdkHttpRequest;
43+
import software.amazon.awssdk.utils.IoUtils;
44+
45+
public class ApacheHttpClientThreadInterruptTest {
46+
47+
private final WireMockServer mockServer = new WireMockServer(new WireMockConfiguration()
48+
.dynamicPort()
49+
.dynamicHttpsPort());
50+
51+
@BeforeEach
52+
public void setup() {
53+
mockServer.start();
54+
mockServer.stubFor(get(urlMatching(".*")).willReturn(aResponse()
55+
.withStatus(200).withBody("hello")));
56+
57+
}
58+
59+
@Test
60+
void connectionPoolsGetsReusedWhenInterrupted() throws Exception {
61+
62+
Integer LONG_DELAY = 5000;
63+
mockServer.stubFor(get(urlMatching("/test/longDelay")).willReturn(aResponse().withFixedDelay(LONG_DELAY)
64+
.withStatus(200).withBody("delay")));
65+
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(3)
66+
.connectionTimeout(Duration.ofMillis(1000))
67+
.build();
68+
String urlIndex = "/test/longDelay";
69+
SdkHttpMethod httpMethod = SdkHttpMethod.GET;
70+
Runnable asyncHttpCall = () -> {
71+
try {
72+
HttpExecuteResponse httpExecuteResponse =
73+
httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call();
74+
IoUtils.toUtf8String(httpExecuteResponse.responseBody().get());
75+
} catch (IOException e) {
76+
throw new RuntimeException(e);
77+
}
78+
};
79+
80+
ExecutorService executorService = Executors.newCachedThreadPool();
81+
executorService.submit(asyncHttpCall);
82+
executorService.submit(asyncHttpCall);
83+
Future<?> toBeInterruptedFuture = executorService.submit(asyncHttpCall);
84+
// Make sure thread start the Http connections
85+
Thread.sleep(100);
86+
toBeInterruptedFuture.cancel(true);
87+
HttpExecuteResponse httpExecuteResponse = httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call();
88+
String actualResult = IoUtils.toUtf8String(httpExecuteResponse.responseBody().get());
89+
assertThat(actualResult).isEqualTo("delay");
90+
executorService.shutdownNow();
91+
}
92+
93+
@Test
94+
void timeOutOfTheHttpConnectionWhenLeasingIsCompletelyUsed() throws Exception {
95+
96+
97+
Integer LONG_DELAY = 5000;
98+
mockServer.stubFor(get(urlMatching("/test/longDelay")).willReturn(aResponse().withFixedDelay(LONG_DELAY)
99+
.withStatus(200).withBody("hello")));
100+
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(3)
101+
.connectionTimeout(Duration.ofMillis(1000))
102+
.build();
103+
int maxConnection = 3;
104+
String urlIndex = "/test/longDelay";
105+
SdkHttpMethod httpMethod = SdkHttpMethod.GET;
106+
Runnable asyncHttpCall = () -> {
107+
try {
108+
httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call();
109+
} catch (IOException e) {
110+
throw new RuntimeException(e);
111+
}
112+
};
113+
114+
ExecutorService executorService = Executors.newCachedThreadPool();
115+
executorService.submit(asyncHttpCall);
116+
executorService.submit(asyncHttpCall);
117+
executorService.submit(asyncHttpCall);
118+
119+
// Make sure thread start the Http connections
120+
Thread.sleep(100);
121+
122+
assertThatExceptionOfType(ConnectionPoolTimeoutException.class)
123+
.isThrownBy(() -> httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call())
124+
.withMessage("Timeout waiting for connection from pool");
125+
}
126+
127+
128+
@Test
129+
void interruptWhileWaitingOnLease() throws Exception {
130+
AtomicBoolean isRequestAborted = new AtomicBoolean(false);
131+
Integer LONG_DELAY = 3000;
132+
mockServer.stubFor(get(urlMatching("/test/longDelay")).willReturn(aResponse().withFixedDelay(LONG_DELAY)
133+
.withStatus(200).withBody("delay")));
134+
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(3)
135+
.connectionTimeout(Duration.ofMillis(1000))
136+
.build();
137+
String urlIndex = "/test/longDelay";
138+
SdkHttpMethod httpMethod = SdkHttpMethod.GET;
139+
// Three thread started that will consume the leasing concurrency.
140+
new Thread(() -> callHttpApiWithUrlIndex(httpClient, urlIndex, httpMethod)).start();
141+
new Thread(() -> callHttpApiWithUrlIndex(httpClient, urlIndex, httpMethod)).start();
142+
new Thread(() -> callHttpApiWithUrlIndex(httpClient, urlIndex, httpMethod)).start();
143+
// Sleep to ensure Http connections are leased and connection is established
144+
Thread.sleep(200);
145+
CompletableFuture.supplyAsync(() -> {
146+
Thread thread = new Thread(() -> {
147+
try {
148+
httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call();
149+
} catch (IOException e) {
150+
e.printStackTrace();
151+
if (e instanceof RequestAbortedException) {
152+
isRequestAborted.set(true);
153+
}
154+
}
155+
});
156+
thread.start();
157+
158+
return thread;
159+
}).thenAcceptAsync(thread ->
160+
{
161+
// Sleep to make sure The Http get call is in leasing state
162+
sleep(100L);
163+
thread.interrupt();
164+
})
165+
.join();
166+
167+
Thread.sleep(200);
168+
assertThat(isRequestAborted.get()).isTrue();
169+
}
170+
171+
172+
/**
173+
* 1. Start HTTP request in a separate thread say T1
174+
* 2. Start 2 more thread , make sure they read the Response body such that it is closed at the end.
175+
* 3. Start 4th Thread in main
176+
* 4. Interrupt the first thread after its execution time
177+
* 5, The 4th Thread should be given the pool access and complete without error
178+
*/
179+
@Test
180+
void interruptingAThreadWhichAlreadyFinishedHttpCall() throws Exception {
181+
AtomicBoolean isRequestAborted = new AtomicBoolean(false);
182+
Integer LONG_DELAY = 100;
183+
mockServer.stubFor(get(urlMatching("/test/longDelay")).willReturn(aResponse().withFixedDelay(LONG_DELAY)
184+
.withStatus(200).withBody("delay")));
185+
SdkHttpClient httpClient = ApacheHttpClient.builder().maxConnections(3)
186+
.connectionTimeout(Duration.ofMillis(3000))
187+
.build();
188+
String urlIndex = "/test/longDelay";
189+
SdkHttpMethod httpMethod = SdkHttpMethod.GET;
190+
191+
CompletableFuture<Void> interuptedAfteCompletion = CompletableFuture.supplyAsync(() -> {
192+
Thread thread = new Thread(() -> {
193+
try {
194+
httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call();
195+
} catch (IOException e) {
196+
e.printStackTrace();
197+
if (e instanceof RequestAbortedException) {
198+
isRequestAborted.set(true);
199+
}
200+
}
201+
});
202+
thread.start();
203+
204+
return thread;
205+
}).thenAcceptAsync(thread -> {
206+
// Sleep to make sure The Http get call is in leasing state
207+
sleep(1000L);
208+
thread.interrupt();
209+
});
210+
Thread.sleep(10);
211+
// Three thread started that will consume the leasing concurrency.
212+
new Thread(() -> {
213+
HttpExecuteResponse httpExecuteResponse = callHttpApiWithUrlIndex(httpClient, urlIndex, httpMethod);
214+
try {
215+
IoUtils.toUtf8String(httpExecuteResponse.responseBody().get());
216+
} catch (IOException e) {
217+
throw new RuntimeException(e);
218+
}
219+
}
220+
).start();
221+
new Thread(() -> {
222+
HttpExecuteResponse httpExecuteResponse = callHttpApiWithUrlIndex(httpClient, urlIndex, httpMethod);
223+
try {
224+
IoUtils.toUtf8String(httpExecuteResponse.responseBody().get());
225+
} catch (IOException e) {
226+
throw new RuntimeException(e);
227+
}
228+
}
229+
).start();
230+
231+
232+
Thread.sleep(10);
233+
HttpExecuteResponse httpExecuteResponse = callHttpApiWithUrlIndex(httpClient, urlIndex, httpMethod);
234+
String actualResult = IoUtils.toUtf8String(httpExecuteResponse.responseBody().get());
235+
assertThat(actualResult).isEqualTo("delay");
236+
Thread.sleep(200);
237+
interuptedAfteCompletion.join();
238+
assertThat(isRequestAborted.get()).isFalse();
239+
}
240+
241+
/**
242+
* Aborting the request twice should not cause Exception
243+
*/
244+
void interuptRequestTwice()
245+
{
246+
247+
248+
}
249+
250+
private HttpExecuteResponse callHttpApiWithUrlIndex(SdkHttpClient httpClient, String urlIndex, SdkHttpMethod httpMethod) {
251+
try {
252+
253+
return httpClient.prepareRequest(getSdkHttpRequest(urlIndex, httpMethod)).call();
254+
} catch (IOException e) {
255+
throw new RuntimeException(e);
256+
}
257+
}
258+
259+
private HttpExecuteRequest getSdkHttpRequest(String urlIndex, SdkHttpMethod httpMethod) {
260+
SdkHttpRequest sdkRequest = SdkHttpRequest.builder()
261+
.uri(URI.create("http://localhost:" + mockServer.port() + urlIndex))
262+
.method(httpMethod)
263+
.build();
264+
265+
return HttpExecuteRequest.builder().request(sdkRequest).build();
266+
}
267+
268+
269+
private void sleep(Long sleepInMillis) {
270+
try {
271+
Thread.sleep(sleepInMillis);
272+
} catch (InterruptedException e) {
273+
throw new RuntimeException(e);
274+
}
275+
}
276+
277+
}

0 commit comments

Comments
 (0)