Skip to content

Close asynchronously if called in NIO loop thread #1084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
private final ScheduledExecutorService heartbeatExecutor;
private final ExecutorService shutdownExecutor;
private Thread mainLoopThread;
private final AtomicBoolean ioLoopThreadSet = new AtomicBoolean(false);
private volatile Thread ioLoopThread;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private String id;

Expand Down Expand Up @@ -504,6 +506,7 @@ public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
ioLoopThread(mainLoopThread);
mainLoopThread.start();
}

Expand Down Expand Up @@ -1104,7 +1107,7 @@ public void close(int closeCode,
boolean abort)
throws IOException
{
boolean sync = !(Thread.currentThread() == mainLoopThread);
boolean sync = !(Thread.currentThread() == ioLoopThread);

try {
AMQP.Connection.Close reason =
Expand Down Expand Up @@ -1195,6 +1198,12 @@ public void setId(String id) {
this.id = id;
}

public void ioLoopThread(Thread thread) {
if (this.ioLoopThreadSet.compareAndSet(false, true)) {
this.ioLoopThread = thread;
}
}

public int getChannelRpcTimeout() {
return channelRpcTimeout;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void run() {

if (frame != null) {
try {
state.getConnection().ioLoopThread(Thread.currentThread());
boolean noProblem = state.getConnection().handleReadFrame(frame);
if (noProblem && (!state.getConnection().isRunning() || state.getConnection().hasBrokerInitiatedShutdown())) {
// looks like the frame was Close-Ok or Close
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client.test;

import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
import static com.rabbitmq.client.test.TestUtils.waitAtMost;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class BlockedConnectionTest extends BrokerTestCase {

@ParameterizedTest
@ValueSource(booleans = {true, false})
void errorInBlockListenerShouldCloseConnection(boolean nio) throws Exception {
ConnectionFactory cf = TestUtils.connectionFactory();
if (nio) {
cf.useNio();
} else {
cf.useBlockingIo();
}
Connection c = cf.newConnection();
CountDownLatch shutdownLatch = new CountDownLatch(1);
c.addShutdownListener(cause -> shutdownLatch.countDown());
CountDownLatch blockedLatch = new CountDownLatch(1);
c.addBlockedListener(
reason -> {
blockedLatch.countDown();
throw new RuntimeException("error in blocked listener!");
},
() -> {});
try {
block();
Channel ch = c.createChannel();
ch.basicPublish("", "", null, "dummy".getBytes());
assertThat(blockedLatch).is(completed());
} finally {
unblock();
}
assertThat(shutdownLatch).is(completed());
waitAtMost(() -> !c.isOpen());
}

}
3 changes: 2 additions & 1 deletion src/test/java/com/rabbitmq/client/test/ClientTestSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
OAuth2ClientCredentialsGrantCredentialsProviderTest.class,
RefreshCredentialsTest.class,
AMQConnectionRefreshCredentialsTest.class,
ValueWriterTest.class
ValueWriterTest.class,
BlockedConnectionTest.class
})
public class ClientTestSuite {

Expand Down