Skip to content

Commit 59a9618

Browse files
author
Hubert Plociniczak
committed
Merge default into bug16247
2 parents d78b84d + c60e2e9 commit 59a9618

11 files changed

+164
-256
lines changed

.hgignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
syntax: glob
2+
*~
3+
4+
syntax: regexp
5+
^build/
Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,12 @@
11
package com.rabbitmq.client;
22

3-
public class AlreadyClosedException extends IllegalStateException {
4-
5-
public AlreadyClosedException()
3+
/*
4+
* Thrown when application tries to perform an action on connection/channel
5+
* which was already closed
6+
*/
7+
public class AlreadyClosedException extends ShutdownSignalException {
8+
public AlreadyClosedException(String s, Object ref)
69
{
7-
super();
8-
}
9-
10-
public AlreadyClosedException(Throwable e)
11-
{
12-
super(e);
13-
}
14-
15-
public AlreadyClosedException(String s)
16-
{
17-
super(s);
10+
super(true, true, s, ref);
1811
}
1912
}

src/com/rabbitmq/client/Channel.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
*
5757
*/
5858

59-
public interface Channel {
59+
public interface Channel extends ShutdownNotifier{
6060
/**
6161
* Retrieve this channel's channel number.
6262
* @return the channel number
@@ -433,38 +433,4 @@ Queue.DeclareOk queueDeclare(int ticket, String queue, boolean passive, boolean
433433
* @throws java.io.IOException if an error is encountered
434434
*/
435435
Tx.RollbackOk txRollback() throws IOException;
436-
437-
/**
438-
* Add shutdown listener to the channel
439-
*
440-
* @param listener {@link ShutdownListener} for the channel
441-
*/
442-
void addShutdownListener(ShutdownListener listener);
443-
444-
/**
445-
* Remove shutdown listener for the channel.
446-
*
447-
* @param listener {@link ShutdownListener} to be removed
448-
*/
449-
void removeShutdownListener(ShutdownListener listener);
450-
451-
/**
452-
* Get connection channel shutdown reason.
453-
* Return null if channel is still open.
454-
* @see com.rabbitmq.client.ShutdownCause
455-
* @return shutdown reason if channel is closed
456-
*/
457-
ShutdownSignalException getCloseReason();
458-
459-
/**
460-
* Determine if channel is currently open.
461-
* Will return false if we are currently closing or closed.
462-
* Checking this method should be only for information,
463-
* because of the race conditions - state can change after the call.
464-
* Instead just execute and and try to catch AlreadyClosedException
465-
*
466-
* @see com.rabbitmq.client.impl.AMQChannel#isOpen()
467-
* @return true when channel is open, false otherwise
468-
*/
469-
boolean isOpen();
470436
}

src/com/rabbitmq/client/Connection.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* Current implementations are thread-safe for code at the client API level,
5050
* and in fact thread-safe internally except for code within RPC calls.
5151
*/
52-
public interface Connection { // rename to AMQPConnection later, this is a temporary name
52+
public interface Connection extends ShutdownNotifier { // rename to AMQPConnection later, this is a temporary name
5353
/**
5454
* Retrieve the host.
5555
* @return the hostname of the peer we're connected to.
@@ -127,39 +127,4 @@ public interface Connection { // rename to AMQPConnection later, this is a tempo
127127
* @throws IOException if an I/O problem is encountered
128128
*/
129129
void close(int closeCode, String closeMessage) throws IOException;
130-
131-
/**
132-
* Add connection shutdown listener.
133-
* If the connection is already closed handler is fired immediately
134-
*
135-
* @param listener {@link ShutdownListener} to the connection
136-
*/
137-
void addShutdownListener(ShutdownListener listener);
138-
139-
/**
140-
* Remove shutdown listener for the connection.
141-
*
142-
* @param listener {@link ShutdownListener} to be removed
143-
*/
144-
void removeShutdownListener(ShutdownListener listener);
145-
146-
/**
147-
* Retrieve connection close reason.
148-
*
149-
* @see com.rabbitmq.client.ShutdownCause
150-
* @return information about the cause of closing the connection, or null if connection is still open
151-
*/
152-
ShutdownSignalException getCloseReason();
153-
154-
/**
155-
* Determine whether the connection is currently open.
156-
* Will return false if we are currently closing.
157-
* Checking this method should be only for information,
158-
* because of the race conditions - state can change after the call.
159-
* Instead just execute and and try to catch AlreadyClosedException
160-
*
161-
* @see com.rabbitmq.client.impl.AMQConnection#isOpen()
162-
* @return true when connection is open, false otherwise
163-
*/
164-
boolean isOpen();
165130
}

src/com/rabbitmq/client/ShutdownListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44

55
public interface ShutdownListener extends EventListener {
66

7-
public void service(ShutdownSignalException cause);
7+
public void shutdownCompleted(ShutdownSignalException cause);
88

99
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Interface for components that are shutdown capable and
5+
* that allow listeners to be added for shutdown signals
6+
*
7+
* @see ShutdownListener
8+
* @see ShutdownSignalException
9+
*/
10+
public interface ShutdownNotifier {
11+
/**
12+
* Add shutdown listener.
13+
* If the component is already closed, handler is fired immediately
14+
*
15+
* @param listener {@link ShutdownListener} to the component
16+
*/
17+
public void addShutdownListener(ShutdownListener listener);
18+
19+
/**
20+
* Remove shutdown listener for the component.
21+
*
22+
* @param listener {@link ShutdownListener} to be removed
23+
*/
24+
public void removeShutdownListener(ShutdownListener listener);
25+
26+
/**
27+
* Get the shutdown reason object
28+
* @return ShutdownSignalException if component is closed, null otherwise
29+
*/
30+
public ShutdownSignalException getCloseReason();
31+
32+
/**
33+
* Protected API - notify the listeners attached to the component
34+
* @see com.rabbitmq.client.ShutdownListener
35+
*/
36+
public void notifyListeners();
37+
38+
/**
39+
* Determine whether the component is currently open.
40+
* Will return false if we are currently closing.
41+
* Checking this method should be only for information,
42+
* because of the race conditions - state can change after the call.
43+
* Instead just execute and try to catch ShutdownSignalException
44+
* and IOException
45+
*
46+
* @return true when component is open, false otherwise
47+
*/
48+
boolean isOpen();
49+
}

src/com/rabbitmq/client/ShutdownSignalException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* this exception.
3333
*/
3434

35-
public class ShutdownSignalException extends Exception {
35+
public class ShutdownSignalException extends RuntimeException {
3636
/** True if the connection is shut down, or false if this signal refers to a channel */
3737
private final boolean _hardError;
3838

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.rabbitmq.client.AlreadyClosedException;
3131
import com.rabbitmq.client.Command;
3232
import com.rabbitmq.client.Connection;
33-
import com.rabbitmq.client.ShutdownListener;
3433
import com.rabbitmq.client.ShutdownSignalException;
3534
import com.rabbitmq.utility.BlockingValueOrException;
3635
import com.rabbitmq.utility.SingleShotLinearTimer;
@@ -43,7 +42,7 @@
4342
* @see ChannelN
4443
* @see Connection
4544
*/
46-
public abstract class AMQChannel {
45+
public abstract class AMQChannel extends ShutdownNotifierComponent {
4746
/** The connection this channel is associated with. */
4847
public final AMQConnection _connection;
4948

@@ -55,9 +54,6 @@ public abstract class AMQChannel {
5554

5655
/** The current outstanding RPC request, if any. (Could become a queue in future.) */
5756
public RpcContinuation _activeRpc = null;
58-
59-
/** Reason for closing the channel, null if still open */
60-
public volatile ShutdownSignalException _cause;
6157

6258
/**
6359
* Construct a channel on the given connection, with the given channel number.
@@ -119,6 +115,10 @@ public AMQCommand exnWrappingRpc(Method m)
119115
{
120116
try {
121117
return rpc(m);
118+
} catch (AlreadyClosedException ace) {
119+
// Do not wrap it since it means that connection/channel
120+
// was closed in some action in the past
121+
throw ace;
122122
} catch (ShutdownSignalException ex) {
123123
throw wrap(ex);
124124
}
@@ -169,29 +169,11 @@ public synchronized RpcContinuation nextOutstandingRpc()
169169
return result;
170170
}
171171

172-
/**
173-
* Public API - Indicates whether this channel is in an open state
174-
* @return true if channel is open, false otherwise
175-
*/
176-
public boolean isOpen()
177-
{
178-
return _cause == null;
179-
}
180-
181-
/**
182-
* Public API - Get the reason for closing the channel
183-
* @return object having information about the shutdown, or null if still open
184-
*/
185-
public ShutdownSignalException getCloseReason()
186-
{
187-
return _cause;
188-
}
189-
190172
public void ensureIsOpen()
191173
throws AlreadyClosedException
192174
{
193175
if (!isOpen()) {
194-
throw new AlreadyClosedException("Attempt to use closed channel");
176+
throw new AlreadyClosedException("Attempt to use closed channel", this);
195177
}
196178
}
197179

@@ -278,7 +260,7 @@ public void run() {
278260
public void processShutdownSignal(ShutdownSignalException signal) {
279261
synchronized (this) {
280262
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
281-
_cause = signal;
263+
_shutdownCause = signal;
282264
}
283265
RpcContinuation k = nextOutstandingRpc();
284266
if (k != null) {

0 commit comments

Comments
 (0)