Skip to content

Prevent deadlock/hanging on JDK socket write #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public MetricsCollector getMetricsCollector() {
}

protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ final class Copyright {
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {

private static final Logger LOGGER = LoggerFactory.getLogger(AMQConnection.class);
// we want socket write and channel shutdown timeouts to kick in after
// the heartbeat one, so we use a value of 105% of the effective heartbeat timeout
public static final double CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER = 1.05;

private final ExecutorService consumerWorkServiceExecutor;
private final ScheduledExecutorService heartbeatExecutor;
Expand Down Expand Up @@ -393,6 +396,7 @@ public void start()
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
result.setShutdownExecutor(this.shutdownExecutor);
result.setChannelShutdownTimeout((int) ((requestedHeartbeat * CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER) * 1000));
return result;
}

Expand Down
61 changes: 48 additions & 13 deletions src/main/java/com/rabbitmq/client/impl/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@

package com.rabbitmq.client.impl;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MetricsCollector;
import com.rabbitmq.client.NoOpMetricsCollector;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.IntAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.NoOpMetricsCollector;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.MetricsCollector;
import com.rabbitmq.utility.IntAllocator;
import java.util.concurrent.*;

/**
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
*/
public class ChannelManager {

private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);

/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Object monitor = new Object();
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
Expand All @@ -50,6 +52,8 @@ public class ChannelManager {
private ExecutorService shutdownExecutor;
private final ThreadFactory threadFactory;

private int channelShutdownTimeout = (int) ((ConnectionFactory.DEFAULT_HEARTBEAT * 1.05) * 1000);

protected final MetricsCollector metricsCollector;

public int getChannelMax(){
Expand Down Expand Up @@ -97,14 +101,33 @@ public ChannelN getChannel(int channelNumber) {
* Handle shutdown. All the managed {@link com.rabbitmq.client.Channel Channel}s are shutdown.
* @param signal reason for shutdown
*/
public void handleSignal(ShutdownSignalException signal) {
public void handleSignal(final ShutdownSignalException signal) {
Set<ChannelN> channels;
synchronized(this.monitor) {
channels = new HashSet<ChannelN>(_channelMap.values());
}
for (ChannelN channel : channels) {

for (final ChannelN channel : channels) {
releaseChannelNumber(channel);
channel.processShutdownSignal(signal, true, true);
// async shutdown if possible
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
Runnable channelShutdownRunnable = new Runnable() {
@Override
public void run() {
channel.processShutdownSignal(signal, true, true);
}
};
if(this.shutdownExecutor == null) {
channelShutdownRunnable.run();
} else {
Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable);
try {
channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout);
channelShutdownTask.cancel(true);
}
}
shutdownSet.add(channel.getShutdownLatch());
channel.notifyListeners();
}
Expand Down Expand Up @@ -225,4 +248,16 @@ public ExecutorService getShutdownExecutor() {
public void setShutdownExecutor(ExecutorService shutdownExecutor) {
this.shutdownExecutor = shutdownExecutor;
}

/**
* Set the shutdown timeout for channels.
* This is the amount of time the manager waits for a channel to
* shutdown before giving up.
* Works only when the {@code shutdownExecutor} property is set.
* Default to {@link com.rabbitmq.client.ConnectionFactory#DEFAULT_HEARTBEAT} + 5 % seconds
* @param channelShutdownTimeout shutdown timeout in milliseconds
*/
public void setChannelShutdownTimeout(int channelShutdownTimeout) {
this.channelShutdownTimeout = channelShutdownTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;

public class FrameHandlerFactory {
private final int connectionTimeout;
private final SocketFactory factory;
private final SocketConfigurator configurator;
private final ExecutorService shutdownExecutor;
private final boolean ssl;

public FrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl) {
this(connectionTimeout, factory, configurator, ssl, null);
}

public FrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {
this.connectionTimeout = connectionTimeout;
this.factory = factory;
this.configurator = configurator;
this.ssl = ssl;
this.shutdownExecutor = shutdownExecutor;
}

public FrameHandler create(Address addr) throws IOException {
Expand All @@ -55,7 +62,7 @@ public FrameHandler create(Address addr) throws IOException {

public FrameHandler create(Socket sock) throws IOException
{
return new SocketFrameHandler(sock);
return new SocketFrameHandler(sock, this.shutdownExecutor);
}

private static void quietTrySocketClose(Socket socket) {
Expand Down
34 changes: 33 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.*;

import com.rabbitmq.client.AMQP;

Expand All @@ -34,6 +35,11 @@ public class SocketFrameHandler implements FrameHandler {
/** The underlying socket */
private final Socket _socket;

/**
* Optional {@link ExecutorService} for final flush.
*/
private final ExecutorService _shutdownExecutor;

/** Socket's inputstream - data from the broker - synchronized on */
private final DataInputStream _inputStream;

Expand All @@ -47,7 +53,15 @@ public class SocketFrameHandler implements FrameHandler {
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket) throws IOException {
this(socket, null);
}

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
_socket = socket;
_shutdownExecutor = shutdownExecutor;

_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
Expand Down Expand Up @@ -152,7 +166,25 @@ public void flush() throws IOException {
@SuppressWarnings("unused")
public void close() {
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
try { flush(); } catch (Exception _e) {}
// async flush if possible
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
Callable<Void> flushCallable = new Callable<Void>() {
@Override
public Void call() throws Exception {
flush();
return null;
}
};
try {
if(this._shutdownExecutor == null) {
flushCallable.call();
} else {
Future<Void> flushTask = this._shutdownExecutor.submit(flushCallable);
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
}
} catch(Exception e) {

}
try { _socket.close(); } catch (Exception _e) {}
}
}