Skip to content

Commit 6654666

Browse files
committed
Stop RpcServer when its thread is interrupted
Fixes #428 (cherry picked from commit 07b86ca)
1 parent e2c69b7 commit 6654666

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

src/main/java/com/rabbitmq/client/RpcServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ protected QueueingConsumer setupConsumer()
9292
* Public API - main server loop. Call this to begin processing
9393
* requests. Request processing will continue until the Channel
9494
* (or its underlying Connection) is shut down, or until
95-
* terminateMainloop() is called.
95+
* terminateMainloop() is called, or until the thread running the loop
96+
* is interrupted.
9697
*
9798
* Note that if the mainloop is blocked waiting for a request, the
9899
* termination flag is not checked until a request is received, so
@@ -110,6 +111,8 @@ public ShutdownSignalException mainloop()
110111
try {
111112
request = _consumer.nextDelivery();
112113
} catch (InterruptedException ie) {
114+
Thread.currentThread().interrupt();
115+
_mainloopRunning = false;
113116
continue;
114117
}
115118
processRequest(request);

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import com.rabbitmq.client.impl.recovery.RecordedQueue;
3535
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3636
import com.rabbitmq.tools.Host;
37+
import org.awaitility.Awaitility;
38+
import org.awaitility.Duration;
3739
import org.junit.After;
3840
import org.junit.Before;
3941
import org.junit.Test;
@@ -47,6 +49,7 @@
4749
import java.util.concurrent.TimeoutException;
4850
import java.util.concurrent.atomic.AtomicInteger;
4951

52+
import static org.awaitility.Awaitility.waitAtMost;
5053
import static org.junit.Assert.assertEquals;
5154
import static org.junit.Assert.assertTrue;
5255
import static org.junit.Assert.fail;
@@ -315,6 +318,28 @@ public void handleRecoveryStarted(Recoverable recoverable) {
315318
}
316319
}
317320

321+
@Test public void interruptingServerThreadShouldStopIt() throws Exception {
322+
rpcServer = new TestRpcServer(serverChannel, queue);
323+
Thread serverThread = new Thread(() -> {
324+
try {
325+
rpcServer.mainloop();
326+
} catch (Exception e) {
327+
// safe to ignore when loops ends/server is canceled
328+
}
329+
});
330+
serverThread.start();
331+
RpcClient client = new RpcClient(new RpcClientParams()
332+
.channel(clientChannel).exchange("").routingKey(queue).timeout(1000));
333+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
334+
assertEquals("*** hello ***", new String(response.getBody()));
335+
336+
serverThread.interrupt();
337+
338+
waitAtMost(Duration.ONE_SECOND).until(() -> !serverThread.isAlive()) ;
339+
340+
client.close();
341+
}
342+
318343
private static class TestRpcServer extends RpcServer {
319344

320345
public TestRpcServer(Channel channel, String queueName) throws IOException {

0 commit comments

Comments
 (0)