Skip to content

Commit 466658f

Browse files
committed
HTTPCORE-420: Blocking HttpServer does not close out persistent connection when shut down
git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpcore/branches/4.4.x@1740279 13f79535-47bb-0310-9956-ffa450edef68
1 parent 5a9e3e0 commit 466658f

File tree

2 files changed

+94
-19
lines changed

2 files changed

+94
-19
lines changed

httpcore/src/main/java/org/apache/http/impl/bootstrap/HttpServer.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import java.io.IOException;
3030
import java.net.InetAddress;
3131
import java.net.ServerSocket;
32-
import java.util.List;
33-
import java.util.concurrent.ExecutorService;
34-
import java.util.concurrent.Executors;
32+
import java.util.Set;
33+
import java.util.concurrent.SynchronousQueue;
34+
import java.util.concurrent.ThreadPoolExecutor;
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicReference;
3737

@@ -60,9 +60,9 @@ enum Status { READY, ACTIVE, STOPPING }
6060
private final HttpConnectionFactory<? extends DefaultBHttpServerConnection> connectionFactory;
6161
private final SSLServerSetupHandler sslSetupHandler;
6262
private final ExceptionLogger exceptionLogger;
63-
private final ExecutorService listenerExecutorService;
63+
private final ThreadPoolExecutor listenerExecutorService;
6464
private final ThreadGroup workerThreads;
65-
private final ExecutorService workerExecutorService;
65+
private final WorkerPoolExecutor workerExecutorService;
6666
private final AtomicReference<Status> status;
6767

6868
private volatile ServerSocket serverSocket;
@@ -85,10 +85,14 @@ enum Status { READY, ACTIVE, STOPPING }
8585
this.connectionFactory = connectionFactory;
8686
this.sslSetupHandler = sslSetupHandler;
8787
this.exceptionLogger = exceptionLogger;
88-
this.listenerExecutorService = Executors.newSingleThreadExecutor(
88+
this.listenerExecutorService = new ThreadPoolExecutor(
89+
1, 1, 0L, TimeUnit.MILLISECONDS,
90+
new SynchronousQueue<Runnable>(),
8991
new ThreadFactoryImpl("HTTP-listener-" + this.port));
9092
this.workerThreads = new ThreadGroup("HTTP-workers");
91-
this.workerExecutorService = Executors.newCachedThreadPool(
93+
this.workerExecutorService = new WorkerPoolExecutor(
94+
0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS,
95+
new SynchronousQueue<Runnable>(),
9296
new ThreadFactoryImpl("HTTP-worker", this.workerThreads));
9397
this.status = new AtomicReference<Status>(Status.READY);
9498
}
@@ -135,6 +139,8 @@ public void start() throws IOException {
135139

136140
public void stop() {
137141
if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPING)) {
142+
this.listenerExecutorService.shutdown();
143+
this.workerExecutorService.shutdown();
138144
final RequestListener local = this.requestListener;
139145
if (local != null) {
140146
try {
@@ -144,8 +150,6 @@ public void stop() {
144150
}
145151
}
146152
this.workerThreads.interrupt();
147-
this.listenerExecutorService.shutdown();
148-
this.workerExecutorService.shutdown();
149153
}
150154
}
151155

@@ -162,16 +166,13 @@ public void shutdown(final long gracePeriod, final TimeUnit timeUnit) {
162166
Thread.currentThread().interrupt();
163167
}
164168
}
165-
final List<Runnable> runnables = this.workerExecutorService.shutdownNow();
166-
for (Runnable runnable: runnables) {
167-
if (runnable instanceof Worker) {
168-
final Worker worker = (Worker) runnable;
169-
final HttpServerConnection conn = worker.getConnection();
170-
try {
171-
conn.shutdown();
172-
} catch (IOException ex) {
173-
this.exceptionLogger.log(ex);
174-
}
169+
final Set<Worker> workers = this.workerExecutorService.getWorkers();
170+
for (Worker worker: workers) {
171+
final HttpServerConnection conn = worker.getConnection();
172+
try {
173+
conn.shutdown();
174+
} catch (IOException ex) {
175+
this.exceptionLogger.log(ex);
175176
}
176177
}
177178
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.http.impl.bootstrap;
28+
29+
import java.util.HashSet;
30+
import java.util.Map;
31+
import java.util.Set;
32+
import java.util.concurrent.BlockingQueue;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.concurrent.ThreadFactory;
35+
import java.util.concurrent.ThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
37+
38+
/**
39+
* @since 4.4
40+
*/
41+
class WorkerPoolExecutor extends ThreadPoolExecutor {
42+
43+
private final Map<Worker, Boolean> workerSet;
44+
45+
public WorkerPoolExecutor(
46+
final int corePoolSize,
47+
final int maximumPoolSize,
48+
final long keepAliveTime,
49+
final TimeUnit unit,
50+
final BlockingQueue<Runnable> workQueue,
51+
final ThreadFactory threadFactory) {
52+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
53+
this.workerSet = new ConcurrentHashMap<Worker, Boolean>();
54+
}
55+
56+
@Override
57+
protected void beforeExecute(final Thread t, final Runnable r) {
58+
if (r instanceof Worker) {
59+
this.workerSet.put((Worker) r, Boolean.TRUE);
60+
}
61+
}
62+
63+
@Override
64+
protected void afterExecute(final Runnable r, final Throwable t) {
65+
if (r instanceof Worker) {
66+
this.workerSet.remove(r);
67+
}
68+
}
69+
70+
public Set<Worker> getWorkers() {
71+
return new HashSet<Worker>(this.workerSet.keySet());
72+
}
73+
74+
}

0 commit comments

Comments
 (0)