Skip to content

Commit 07b86ca

Browse files
committed
Stop RpcServer when its thread is interrupted
Fixes #428
1 parent 6c3e72d commit 07b86ca

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

src/main/java/com/rabbitmq/client/RpcServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ protected RpcConsumer setupConsumer()
9696
* Public API - main server loop. Call this to begin processing
9797
* requests. Request processing will continue until the Channel
9898
* (or its underlying Connection) is shut down, or until
99-
* terminateMainloop() is called.
99+
* terminateMainloop() is called, or until the thread running the loop
100+
* is interrupted.
100101
*
101102
* Note that if the mainloop is blocked waiting for a request, the
102103
* termination flag is not checked until a request is received, so
@@ -114,6 +115,8 @@ public ShutdownSignalException mainloop()
114115
try {
115116
request = _consumer.nextDelivery();
116117
} catch (InterruptedException ie) {
118+
Thread.currentThread().interrupt();
119+
_mainloopRunning = false;
117120
continue;
118121
}
119122
processRequest(request);

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.rabbitmq.client.impl.recovery.RecordedQueue;
2525
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2626
import com.rabbitmq.tools.Host;
27+
import org.awaitility.Awaitility;
28+
import org.awaitility.Duration;
2729
import org.junit.After;
2830
import org.junit.Before;
2931
import org.junit.Test;
@@ -37,6 +39,7 @@
3739
import java.util.concurrent.TimeoutException;
3840
import java.util.concurrent.atomic.AtomicInteger;
3941

42+
import static org.awaitility.Awaitility.waitAtMost;
4043
import static org.junit.Assert.assertEquals;
4144
import static org.junit.Assert.assertTrue;
4245
import static org.junit.Assert.fail;
@@ -281,6 +284,28 @@ public void handleRecoveryStarted(Recoverable recoverable) {
281284
}
282285
}
283286

287+
@Test public void interruptingServerThreadShouldStopIt() throws Exception {
288+
rpcServer = new TestRpcServer(serverChannel, queue);
289+
Thread serverThread = new Thread(() -> {
290+
try {
291+
rpcServer.mainloop();
292+
} catch (Exception e) {
293+
// safe to ignore when loops ends/server is canceled
294+
}
295+
});
296+
serverThread.start();
297+
RpcClient client = new RpcClient(new RpcClientParams()
298+
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000));
299+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
300+
assertEquals("*** hello ***", new String(response.getBody()));
301+
302+
serverThread.interrupt();
303+
304+
waitAtMost(Duration.ONE_SECOND).until(() -> !serverThread.isAlive()) ;
305+
306+
client.close();
307+
}
308+
284309
private static class TestRpcServer extends RpcServer {
285310

286311
public TestRpcServer(Channel channel, String queueName) throws IOException {

0 commit comments

Comments
 (0)