Skip to content

Commit d23991e

Browse files
committed
Fire user hooks before starting recovery of the underlying connection.
Add RecoverableConnection to allow easier use of API.
1 parent bba45a8 commit d23991e

File tree

5 files changed

+195
-15
lines changed

5 files changed

+195
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client;
18+
/**
19+
* Convenience interface which combines a Connection and Recoverable together.
20+
*
21+
* @see Connection
22+
* @see Recoverable
23+
*
24+
*/
25+
public interface RecoverableConnection extends Connection, Recoverable { }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.rabbitmq.client;
2+
3+
public class RecoverableShutdownSignalException extends ShutdownSignalException {
4+
5+
/** Default for non-checking. */
6+
private static final long serialVersionUID = 1L;
7+
8+
private final boolean _recoveryToBeAttempted;
9+
10+
public RecoverableShutdownSignalException(boolean hardError, boolean initiatedByApplication, Method reason,
11+
Object ref) {
12+
this(hardError, initiatedByApplication, reason, ref, "", null, false);
13+
}
14+
15+
/**
16+
* Construct a RecoverableShutdownSignalException from the arguments.
17+
* @param hardError the relevant hard error
18+
* @param initiatedByApplication if the shutdown was client-initiated
19+
* @param reason AMQP method describing the exception reason
20+
* @param ref Reference to Connection or Channel that fired the signal
21+
* @param messagePrefix prefix to add to exception message
22+
*/
23+
public RecoverableShutdownSignalException(boolean hardError,
24+
boolean initiatedByApplication,
25+
Method reason, Object ref, String messagePrefix, Throwable cause)
26+
{
27+
this(hardError, initiatedByApplication, reason, ref, messagePrefix, cause, false);
28+
}
29+
30+
public RecoverableShutdownSignalException(ShutdownSignalException sse, boolean recoveryToBeAttempted) {
31+
this(sse.isHardError(), sse.isInitiatedByApplication(), sse.getReason(), sse.getReference(), sse.getMessagePrefix(), sse.getCause(), recoveryToBeAttempted);
32+
}
33+
34+
public RecoverableShutdownSignalException(boolean hardError,
35+
boolean initiatedByApplication,
36+
Method reason, Object ref, String messagePrefix, Throwable cause, boolean recoveryToBeAttempted)
37+
{
38+
super(hardError, initiatedByApplication, reason, ref, messagePrefix, cause);
39+
this._recoveryToBeAttempted = recoveryToBeAttempted;
40+
}
41+
42+
public boolean isRecoveryToBeAttempted() {
43+
return _recoveryToBeAttempted;
44+
}
45+
}

src/main/java/com/rabbitmq/client/ShutdownSignalException.java

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class ShutdownSignalException extends RuntimeException implements Sensibl
4545

4646
/** Either Channel or Connection instance, depending on _hardError */
4747
private final Object _ref;
48+
49+
private final String _messagePrefix;
4850

4951
/**
5052
* Construct a ShutdownSignalException from the arguments.
@@ -78,6 +80,7 @@ public ShutdownSignalException(boolean hardError,
7880
this._reason = reason;
7981
// Depending on hardError what we got is either Connection or Channel reference
8082
this._ref = ref;
83+
this._messagePrefix = messagePrefix;
8184
}
8285

8386
private static String composeMessage(boolean hardError, boolean initiatedByApplication,
@@ -112,6 +115,9 @@ private static String composeMessage(boolean hardError, boolean initiatedByAppli
112115
/** @return Reference to Connection or Channel object that fired the signal **/
113116
public Object getReference() { return _ref; }
114117

118+
/** @return The original message prefix used to create this exception **/
119+
protected String getMessagePrefix() { return _messagePrefix; }
120+
115121
public ShutdownSignalException sensibleClone() {
116122
try {
117123
return (ShutdownSignalException)super.clone();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client.impl;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
import com.rabbitmq.client.ShutdownListener;
24+
import com.rabbitmq.client.ShutdownNotifier;
25+
import com.rabbitmq.client.ShutdownSignalException;
26+
27+
/**
28+
* A class that manages {@link ShutdownListener}s and remembers the reason for a shutdown. Both
29+
* {@link com.rabbitmq.client.Channel Channel}s and {@link com.rabbitmq.client.Connection
30+
* Connection}s have shutdown listeners.
31+
*
32+
* The ForwardingShutdownNotifier will only notify listeners when notifyListeners is called,
33+
* and may be called multiple times.
34+
*
35+
* This notifier will always report that it is open.
36+
*/
37+
public class ForwardingShutdownNotifier implements ShutdownNotifier {
38+
39+
/** Monitor for shutdown listeners and shutdownCause */
40+
private final Object monitor = new Object();
41+
42+
/** List of all shutdown listeners associated with the component */
43+
private final List<ShutdownListener> shutdownListeners = new ArrayList<ShutdownListener>();
44+
45+
private volatile ShutdownSignalException lastShutdownCause = null;
46+
47+
public void addShutdownListener(ShutdownListener listener)
48+
{
49+
synchronized(this.monitor) {
50+
this.shutdownListeners.add(listener);
51+
}
52+
}
53+
54+
/**
55+
* Returns the last close reason notified to listeners.
56+
*/
57+
public ShutdownSignalException getCloseReason() {
58+
synchronized(this.monitor) {
59+
return this.lastShutdownCause;
60+
}
61+
}
62+
63+
/**
64+
*
65+
* Do not call this directly, use notifyListeners(cause)
66+
*/
67+
public void notifyListeners()
68+
{
69+
70+
}
71+
72+
public void removeShutdownListener(ShutdownListener listener)
73+
{
74+
synchronized(this.monitor) {
75+
this.shutdownListeners.remove(listener);
76+
}
77+
}
78+
79+
/** Always reports open */
80+
public boolean isOpen() {
81+
return true;
82+
}
83+
84+
/**
85+
* Notifies the registered listeners of this cause.
86+
* @param sse
87+
*/
88+
public void notifyListeners(final ShutdownSignalException sse) {
89+
ShutdownListener[] sdls = null;
90+
synchronized(this.monitor) {
91+
sdls = this.shutdownListeners
92+
.toArray(new ShutdownListener[this.shutdownListeners.size()]);
93+
this.lastShutdownCause = sse;
94+
}
95+
for (ShutdownListener l: sdls) {
96+
try {
97+
l.shutdownCompleted(sse);
98+
} catch (Exception e) {
99+
// FIXME: proper logging
100+
}
101+
}
102+
103+
}
104+
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77
import com.rabbitmq.client.Connection;
88
import com.rabbitmq.client.MissedHeartbeatException;
99
import com.rabbitmq.client.Recoverable;
10+
import com.rabbitmq.client.RecoverableConnection;
11+
import com.rabbitmq.client.RecoverableShutdownSignalException;
1012
import com.rabbitmq.client.RecoveryListener;
1113
import com.rabbitmq.client.ShutdownListener;
1214
import com.rabbitmq.client.ShutdownSignalException;
1315
import com.rabbitmq.client.TopologyRecoveryException;
1416
import com.rabbitmq.client.impl.ConnectionParams;
1517
import com.rabbitmq.client.ExceptionHandler;
18+
import com.rabbitmq.client.impl.ForwardingShutdownNotifier;
1619
import com.rabbitmq.client.impl.FrameHandlerFactory;
1720
import com.rabbitmq.client.impl.NetworkConnection;
1821

@@ -56,7 +59,7 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
5659
private final ConnectionParams params;
5760
private RecoveryAwareAMQConnection delegate;
5861

59-
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>();
62+
private final ForwardingShutdownNotifier shutdownHooks = new ForwardingShutdownNotifier();
6063
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>();
6164
private final List<BlockedListener> blockedListeners = new ArrayList<BlockedListener>();
6265

@@ -299,16 +302,16 @@ public void close(int closeCode, String closeMessage) throws IOException {
299302
* @see Connection#addShutdownListener(com.rabbitmq.client.ShutdownListener)
300303
*/
301304
public void addShutdownListener(ShutdownListener listener) {
302-
this.shutdownHooks.add(listener);
303-
delegate.addShutdownListener(listener);
305+
this.shutdownHooks.addShutdownListener(listener);
306+
//delegate.addShutdownListener(listener);
304307
}
305308

306309
/**
307310
* @see com.rabbitmq.client.ShutdownNotifier#removeShutdownListener(com.rabbitmq.client.ShutdownListener)
308311
*/
309312
public void removeShutdownListener(ShutdownListener listener) {
310-
this.shutdownHooks.remove(listener);
311-
delegate.removeShutdownListener(listener);
313+
this.shutdownHooks.removeShutdownListener(listener);
314+
//delegate.removeShutdownListener(listener);
312315
}
313316

314317
/**
@@ -378,21 +381,20 @@ private void addAutomaticRecoveryListener() {
378381
final AutorecoveringConnection c = this;
379382
ShutdownListener automaticRecoveryListener = new ShutdownListener() {
380383
public void shutdownCompleted(ShutdownSignalException cause) {
384+
boolean attemptRecovery = shouldTriggerConnectionRecovery(cause);
385+
//Send a notice to user added hooks.
386+
c.shutdownHooks.notifyListeners(new RecoverableShutdownSignalException(cause, attemptRecovery));
387+
381388
try {
382-
if (shouldTriggerConnectionRecovery(cause)) {
389+
if (attemptRecovery) {
383390
c.beginAutomaticRecovery();
384391
}
385392
} catch (Exception e) {
386393
c.delegate.getExceptionHandler().handleConnectionRecoveryException(c, e);
387394
}
388395
}
389396
};
390-
synchronized (this) {
391-
if(!this.shutdownHooks.contains(automaticRecoveryListener)) {
392-
this.shutdownHooks.add(automaticRecoveryListener);
393-
}
394-
this.delegate.addShutdownListener(automaticRecoveryListener);
395-
}
397+
this.delegate.addShutdownListener(automaticRecoveryListener);
396398
}
397399

398400
protected boolean shouldTriggerConnectionRecovery(ShutdownSignalException cause) {
@@ -456,9 +458,7 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException,
456458
}
457459

458460
private void recoverShutdownListeners() {
459-
for (ShutdownListener sh : this.shutdownHooks) {
460-
this.delegate.addShutdownListener(sh);
461-
}
461+
addAutomaticRecoveryListener();
462462
}
463463

464464
private void recoverBlockedListeners() {

0 commit comments

Comments
 (0)