Skip to content

Commit d78b84d

Browse files
committed
Migrate branch bug16247
1 parent 393c5fa commit d78b84d

File tree

9 files changed

+256
-18
lines changed

9 files changed

+256
-18
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.rabbitmq.client;
2+
3+
public class AlreadyClosedException extends IllegalStateException {
4+
5+
public AlreadyClosedException()
6+
{
7+
super();
8+
}
9+
10+
public AlreadyClosedException(Throwable e)
11+
{
12+
super(e);
13+
}
14+
15+
public AlreadyClosedException(String s)
16+
{
17+
super(s);
18+
}
19+
}

src/com/rabbitmq/client/Channel.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,4 +433,38 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
433433
* @throws java.io.IOException if an error is encountered
434434
*/
435435
Tx.RollbackOk txRollback() throws IOException;
436+
437+
/**
438+
* Add shutdown listener to the channel
439+
*
440+
* @param listener {@link ShutdownListener} for the channel
441+
*/
442+
void addShutdownListener(ShutdownListener listener);
443+
444+
/**
445+
* Remove shutdown listener for the channel.
446+
*
447+
* @param listener {@link ShutdownListener} to be removed
448+
*/
449+
void removeShutdownListener(ShutdownListener listener);
450+
451+
/**
452+
* Get connection channel shutdown reason.
453+
* Return null if channel is still open.
454+
* @see com.rabbitmq.client.ShutdownCause
455+
* @return shutdown reason if channel is closed
456+
*/
457+
ShutdownSignalException getCloseReason();
458+
459+
/**
460+
* Determine if channel is currently open.
461+
* Will return false if we are currently closing or closed.
462+
* Checking this method should be only for information,
463+
* because of the race conditions - state can change after the call.
464+
* Instead just execute and and try to catch AlreadyClosedException
465+
*
466+
* @see com.rabbitmq.client.impl.AMQChannel#isOpen()
467+
* @return true when channel is open, false otherwise
468+
*/
469+
boolean isOpen();
436470
}

src/com/rabbitmq/client/Connection.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,39 @@ public interface Connection { // rename to AMQPConnection later, this is a tempo
127127
* @throws IOException if an I/O problem is encountered
128128
*/
129129
void close(int closeCode, String closeMessage) throws IOException;
130+
131+
/**
132+
* Add connection shutdown listener.
133+
* If the connection is already closed handler is fired immediately
134+
*
135+
* @param listener {@link ShutdownListener} to the connection
136+
*/
137+
void addShutdownListener(ShutdownListener listener);
138+
139+
/**
140+
* Remove shutdown listener for the connection.
141+
*
142+
* @param listener {@link ShutdownListener} to be removed
143+
*/
144+
void removeShutdownListener(ShutdownListener listener);
145+
146+
/**
147+
* Retrieve connection close reason.
148+
*
149+
* @see com.rabbitmq.client.ShutdownCause
150+
* @return information about the cause of closing the connection, or null if connection is still open
151+
*/
152+
ShutdownSignalException getCloseReason();
153+
154+
/**
155+
* Determine whether the connection is currently open.
156+
* Will return false if we are currently closing.
157+
* Checking this method should be only for information,
158+
* because of the race conditions - state can change after the call.
159+
* Instead just execute and and try to catch AlreadyClosedException
160+
*
161+
* @see com.rabbitmq.client.impl.AMQConnection#isOpen()
162+
* @return true when connection is open, false otherwise
163+
*/
164+
boolean isOpen();
130165
}

src/com/rabbitmq/client/RpcClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message)
190190
ShutdownSignalException wrapper =
191191
new ShutdownSignalException(sig.isHardError(),
192192
sig.isInitiatedByApplication(),
193-
sig.getReason());
193+
sig.getReason(),
194+
sig.getReference());
194195
wrapper.initCause(sig);
195196
throw wrapper;
196197
} else {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.EventListener;
4+
5+
public interface ShutdownListener extends EventListener {
6+
7+
public void service(ShutdownSignalException cause);
8+
9+
}

src/com/rabbitmq/client/ShutdownSignalException.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626

2727
/**
2828
* Encapsulates a shutdown condition for a connection to an AMQP broker.
29+
* Depending on HardError when calling
30+
* {@link com.rabbitmq.client.ShutdownSignalException#getReference()} we will
31+
* either get a reference to the Connection or Channel instance that fired
32+
* this exception.
2933
*/
3034

3135
public class ShutdownSignalException extends Exception {
@@ -42,30 +46,41 @@ public class ShutdownSignalException extends Exception {
4246
/** Possible explanation */
4347
private final Object _reason;
4448

49+
/** Either Channel or Connection instance, depending on _hardError */
50+
private final Object _ref;
51+
4552
/**
4653
* Construct a ShutdownSignalException from the arguments.
4754
* @param hardError the relevant hard error
4855
* @param initiatedByApplication if the shutdown was client-initiated
4956
* @param reason Object describing the origin of the exception
57+
* @param ref Reference to Connection or Channel that fired the signal
5058
*/
5159
public ShutdownSignalException(boolean hardError,
5260
boolean initiatedByApplication,
53-
Object reason)
61+
Object reason, Object ref)
5462
{
5563
this._hardError = hardError;
5664
this._initiatedByApplication = initiatedByApplication;
5765
this._reason = reason;
66+
// Depending on hardError what we got is either Connection or Channel reference
67+
this._ref = ref;
5868
}
5969

6070
/** @return true if this signals a connection error, or false if a channel error */
6171
public boolean isHardError() { return _hardError; }
72+
6273
/** @return true if this exception was caused by explicit application
6374
* action; false if it originated with the broker or as a result
6475
* of detectable non-deliberate application failure
6576
*/
6677
public boolean isInitiatedByApplication() { return _initiatedByApplication; }
78+
6779
/** @return the reason object, if any */
6880
public Object getReason() { return _reason; }
81+
82+
/** @return Reference to Connection or Channel object that fired the signal **/
83+
public Object getReference() { return _ref; }
6984

7085
public String toString() {
7186
return super.toString() + " (" +

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727

2828
import java.io.IOException;
2929

30+
import com.rabbitmq.client.AlreadyClosedException;
3031
import com.rabbitmq.client.Command;
3132
import com.rabbitmq.client.Connection;
33+
import com.rabbitmq.client.ShutdownListener;
3234
import com.rabbitmq.client.ShutdownSignalException;
3335
import com.rabbitmq.utility.BlockingValueOrException;
3436
import com.rabbitmq.utility.SingleShotLinearTimer;
@@ -53,9 +55,9 @@ public abstract class AMQChannel {
5355

5456
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5557
public RpcContinuation _activeRpc = null;
56-
57-
/** Indicates whether this channel is in a state to handle further activity. */
58-
public volatile boolean _isOpen = true;
58+
59+
/** Reason for closing the channel, null if still open */
60+
public volatile ShutdownSignalException _cause;
5961

6062
/**
6163
* Construct a channel on the given connection, with the given channel number.
@@ -160,21 +162,36 @@ public synchronized void transmitAndEnqueue(Method m, RpcContinuation k)
160162
transmit(m);
161163
}
162164

163-
public synchronized RpcContinuation nextOutstandingRpc() {
165+
public synchronized RpcContinuation nextOutstandingRpc()
166+
{
164167
RpcContinuation result = _activeRpc;
165168
_activeRpc = null;
166169
return result;
167170
}
168171

169-
public boolean isOpen() {
170-
return _isOpen;
172+
/**
173+
* Public API - Indicates whether this channel is in an open state
174+
* @return true if channel is open, false otherwise
175+
*/
176+
public boolean isOpen()
177+
{
178+
return _cause == null;
179+
}
180+
181+
/**
182+
* Public API - Get the reason for closing the channel
183+
* @return object having information about the shutdown, or null if still open
184+
*/
185+
public ShutdownSignalException getCloseReason()
186+
{
187+
return _cause;
171188
}
172189

173190
public void ensureIsOpen()
174-
throws IllegalStateException
191+
throws AlreadyClosedException
175192
{
176193
if (!isOpen()) {
177-
throw new IllegalStateException("Attempt to use closed channel");
194+
throw new AlreadyClosedException("Attempt to use closed channel");
178195
}
179196
}
180197

@@ -261,7 +278,7 @@ public void run() {
261278
public void processShutdownSignal(ShutdownSignalException signal) {
262279
synchronized (this) {
263280
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
264-
_isOpen = false;
281+
_cause = signal;
265282
}
266283
RpcContinuation k = nextOutstandingRpc();
267284
if (k != null) {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,21 @@
2828
import java.io.EOFException;
2929
import java.io.IOException;
3030
import java.net.SocketException;
31+
import java.util.Collections;
32+
import java.util.LinkedList;
33+
import java.util.List;
3134
import java.util.Map;
3235

3336
import com.rabbitmq.client.AMQP;
3437
import com.rabbitmq.client.Address;
38+
import com.rabbitmq.client.AlreadyClosedException;
3539
import com.rabbitmq.client.Channel;
3640
import com.rabbitmq.client.Command;
3741
import com.rabbitmq.client.Connection;
3842
import com.rabbitmq.client.ConnectionParameters;
3943
import com.rabbitmq.client.MissedHeartbeatException;
4044
import com.rabbitmq.client.RedirectException;
45+
import com.rabbitmq.client.ShutdownListener;
4146
import com.rabbitmq.client.ShutdownSignalException;
4247
import com.rabbitmq.utility.Utility;
4348

@@ -100,7 +105,11 @@ public class AMQConnection implements Connection {
100105

101106
/** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
102107
public final ExceptionHandler _exceptionHandler;
103-
108+
109+
/** List of all shutdown listeners associated with the connection */
110+
public List<ShutdownListener> listeners
111+
= Collections.synchronizedList(new LinkedList<ShutdownListener>());
112+
104113
/**
105114
* Protected API - respond, in the driver thread, to a ShutdownSignal.
106115
* @param channelNumber the number of the channel to disconnect
@@ -109,15 +118,19 @@ public final void disconnectChannel(int channelNumber) {
109118
_channelManager.disconnectChannel(channelNumber);
110119
}
111120

121+
/**
122+
* Public API - Determine whether the connection is open
123+
* @return true if haven't yet received shutdown signal, false otherwise
124+
*/
112125
public boolean isOpen() {
113126
return _shutdownCause == null;
114127
}
115128

116129
public void ensureIsOpen()
117-
throws IllegalStateException
130+
throws AlreadyClosedException
118131
{
119132
if (!isOpen()) {
120-
throw new IllegalStateException("Attempt to use closed connection");
133+
throw new AlreadyClosedException("Attempt to use closed connection");
121134
}
122135
}
123136

@@ -576,7 +589,7 @@ public void shutdown(Object reason,
576589
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
577590
_shutdownCause = new ShutdownSignalException(true,
578591
initiatedByApplication,
579-
reason);
592+
reason, this);
580593
}
581594
if (cause != null) {
582595
_shutdownCause.initCause(cause);
@@ -616,6 +629,48 @@ public void close(int closeCode,
616629
} finally {
617630
_running = false;
618631
}
632+
633+
synchronized(listeners) {
634+
for (ShutdownListener l: listeners)
635+
l.service(getCloseReason());
636+
}
637+
}
638+
639+
/**
640+
* Public API - Add shutdown listener fired when closing the connection
641+
* @see com.rabbitmq.client.Connection#addShutdownListener()
642+
*/
643+
public void addShutdownListener(ShutdownListener listener)
644+
{
645+
646+
boolean closed = false;
647+
synchronized(listeners) {
648+
closed = !isOpen();
649+
listeners.add(listener);
650+
}
651+
if (closed)
652+
listener.service(_shutdownCause);
653+
}
654+
655+
/**
656+
* Public API - Remove shutdown listener for this connection
657+
* Removing only the first found object
658+
* @see com.rabbitmq.client.Connection#removeShutdownListener()
659+
*/
660+
public void removeShutdownListener(ShutdownListener listener)
661+
{
662+
synchronized(listeners) {
663+
listeners.remove(listener);
664+
}
665+
}
666+
667+
/**
668+
* Public API - Get reason for shutdown, or null if open
669+
* @see com.rabbitmq.client.Connection#getShutdownReason()
670+
*/
671+
public ShutdownSignalException getCloseReason()
672+
{
673+
return _shutdownCause;
619674
}
620675

621676
@Override public String toString() {

0 commit comments

Comments
 (0)