Skip to content

Commit 1bcc2bf

Browse files
committed
Add Recoverable[Connection/Channel] interfaces
Fixes #137
1 parent 94fd11b commit 1bcc2bf

File tree

5 files changed

+49
-21
lines changed

5 files changed

+49
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.rabbitmq.client;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Recoverable;
5+
6+
/**
7+
* Convenient interface when working against auto-recovery channels.
8+
*
9+
* @since 4.0.0
10+
*/
11+
public interface RecoverableChannel extends Recoverable, Channel {
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.rabbitmq.client;
2+
3+
/**
4+
* Convenient interface when working against auto-recovery connections.
5+
*
6+
* @since 4.0.0
7+
*/
8+
public interface RecoverableConnection extends Recoverable, Connection {
9+
10+
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client.impl.recovery;
1717

1818
import com.rabbitmq.client.*;
19+
import com.rabbitmq.client.RecoverableChannel;
1920

2021
import java.io.IOException;
2122
import java.util.List;
@@ -29,7 +30,7 @@
2930
*
3031
* @since 3.3.0
3132
*/
32-
public class AutorecoveringChannel implements Channel, Recoverable {
33+
public class AutorecoveringChannel implements RecoverableChannel {
3334
private volatile RecoveryAwareChannelN delegate;
3435
private volatile AutorecoveringConnection connection;
3536
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* @see com.rabbitmq.client.ConnectionFactory#setTopologyRecoveryEnabled(boolean)
5151
* @since 3.3.0
5252
*/
53-
public class AutorecoveringConnection implements Connection, Recoverable, NetworkConnection {
53+
public class AutorecoveringConnection implements RecoverableConnection, NetworkConnection {
5454
private final RecoveryAwareAMQConnectionFactory cf;
5555
private final Map<Integer, AutorecoveringChannel> channels;
5656
private final ConnectionParams params;

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

+23-19
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
package com.rabbitmq.client.test.functional;
1717

1818
import com.rabbitmq.client.*;
19-
import com.rabbitmq.client.impl.recovery.*;
19+
import com.rabbitmq.client.impl.NetworkConnection;
20+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
21+
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
22+
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;
23+
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
2024
import com.rabbitmq.client.test.BrokerTestCase;
2125
import com.rabbitmq.client.test.TestUtils;
2226
import com.rabbitmq.tools.Host;
@@ -49,7 +53,7 @@ public class ConnectionRecovery extends BrokerTestCase {
4953
@Test public void namedConnectionRecovery()
5054
throws IOException, InterruptedException, TimeoutException {
5155
String connectionName = "custom name";
52-
AutorecoveringConnection c = newRecoveringConnection(connectionName);
56+
RecoverableConnection c = newRecoveringConnection(connectionName);
5357
try {
5458
assertTrue(c.isOpen());
5559
assertEquals(connectionName, c.getClientProvidedName());
@@ -70,7 +74,7 @@ public class ConnectionRecovery extends BrokerTestCase {
7074
@Test public void connectionRecoveryWithArrayOfAddresses()
7175
throws IOException, InterruptedException, TimeoutException {
7276
final Address[] addresses = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)};
73-
AutorecoveringConnection c = newRecoveringConnection(addresses);
77+
RecoverableConnection c = newRecoveringConnection(addresses);
7478
try {
7579
assertTrue(c.isOpen());
7680
closeAndWaitForRecovery(c);
@@ -86,7 +90,7 @@ public class ConnectionRecovery extends BrokerTestCase {
8690

8791
final List<Address> addresses = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672));
8892

89-
AutorecoveringConnection c = newRecoveringConnection(addresses);
93+
RecoverableConnection c = newRecoveringConnection(addresses);
9094
try {
9195
assertTrue(c.isOpen());
9296
closeAndWaitForRecovery(c);
@@ -98,7 +102,7 @@ public class ConnectionRecovery extends BrokerTestCase {
98102

99103
@Test public void connectionRecoveryWithDisabledTopologyRecovery()
100104
throws IOException, InterruptedException, TimeoutException {
101-
AutorecoveringConnection c = newRecoveringConnection(true);
105+
RecoverableConnection c = newRecoveringConnection(true);
102106
Channel ch = c.createChannel();
103107
String q = "java-client.test.recovery.q2";
104108
ch.queueDeclare(q, false, true, false, null);
@@ -107,7 +111,7 @@ public class ConnectionRecovery extends BrokerTestCase {
107111
try {
108112
CountDownLatch shutdownLatch = prepareForShutdown(c);
109113
CountDownLatch recoveryLatch = prepareForRecovery(c);
110-
Host.closeConnection(c);
114+
Host.closeConnection((NetworkConnection) c);
111115
wait(shutdownLatch);
112116
wait(recoveryLatch);
113117
assertTrue(c.isOpen());
@@ -144,7 +148,7 @@ public void recoveryCanBegin(ShutdownSignalException cause) {
144148
recoveryCanBeginLatch.countDown();
145149
}
146150
});
147-
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
151+
((RecoverableConnection)connection).addRecoveryListener(new RecoveryListener() {
148152
@Override
149153
public void handleRecovery(Recoverable recoverable) {
150154
latch.countDown();
@@ -635,9 +639,9 @@ public void handleRecoveryStarted(Recoverable recoverable) {
635639
startLatch.countDown();
636640
}
637641
};
638-
AutorecoveringChannel ch1 = (AutorecoveringChannel) connection.createChannel();
642+
RecoverableChannel ch1 = (RecoverableChannel) connection.createChannel();
639643
ch1.addRecoveryListener(listener);
640-
AutorecoveringChannel ch2 = (AutorecoveringChannel) connection.createChannel();
644+
RecoverableChannel ch2 = (RecoverableChannel) connection.createChannel();
641645
ch2.addRecoveryListener(listener);
642646

643647
assertTrue(ch1.isOpen());
@@ -677,7 +681,7 @@ public void handleDelivery(String consumerTag,
677681

678682
String q = channel.queueDeclare().getQueue();
679683
channel.basicConsume(q, consumer);
680-
AutorecoveringConnection publishingConnection = newRecoveringConnection(false);
684+
RecoverableConnection publishingConnection = newRecoveringConnection(false);
681685
Channel publishingChannel = publishingConnection.createChannel();
682686
for (int i = 0; i < n; i++) {
683687
publishingChannel.basicPublish("", q, null, "msg".getBytes());
@@ -772,9 +776,9 @@ private void closeAndWaitForRecovery() throws IOException, InterruptedException
772776
closeAndWaitForRecovery((AutorecoveringConnection)this.connection);
773777
}
774778

775-
private void closeAndWaitForRecovery(AutorecoveringConnection connection) throws IOException, InterruptedException {
779+
private void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException {
776780
CountDownLatch latch = prepareForRecovery(connection);
777-
Host.closeConnection(connection);
781+
Host.closeConnection((NetworkConnection) connection);
778782
wait(latch);
779783
}
780784

@@ -799,37 +803,37 @@ protected ConnectionFactory newConnectionFactory() {
799803
return buildConnectionFactoryWithRecoveryEnabled(false);
800804
}
801805

802-
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery)
806+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery)
803807
throws IOException, TimeoutException {
804808
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
805809
return (AutorecoveringConnection) cf.newConnection();
806810
}
807811

808-
private AutorecoveringConnection newRecoveringConnection(Address[] addresses)
812+
private RecoverableConnection newRecoveringConnection(Address[] addresses)
809813
throws IOException, TimeoutException {
810814
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false);
811815
// specifically use the Address[] overload
812816
return (AutorecoveringConnection) cf.newConnection(addresses);
813817
}
814818

815-
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
819+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
816820
throws IOException, TimeoutException {
817821
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
818822
return (AutorecoveringConnection) cf.newConnection(addresses);
819823
}
820824

821-
private AutorecoveringConnection newRecoveringConnection(List<Address> addresses)
825+
private RecoverableConnection newRecoveringConnection(List<Address> addresses)
822826
throws IOException, TimeoutException {
823827
return newRecoveringConnection(false, addresses);
824828
}
825829

826-
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName)
830+
private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, String connectionName)
827831
throws IOException, TimeoutException {
828832
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
829-
return (AutorecoveringConnection) cf.newConnection(connectionName);
833+
return (RecoverableConnection) cf.newConnection(connectionName);
830834
}
831835

832-
private AutorecoveringConnection newRecoveringConnection(String connectionName)
836+
private RecoverableConnection newRecoveringConnection(String connectionName)
833837
throws IOException, TimeoutException {
834838
return newRecoveringConnection(false, connectionName);
835839
}

0 commit comments

Comments
 (0)