Skip to content

Commit 43af65d

Browse files
Merge pull request #668 from vikinghawk/topology_listener
Add topology recovery started method to RecoveryListener
2 parents 136c796 + b3b8642 commit 43af65d

File tree

3 files changed

+24
-2
lines changed

3 files changed

+24
-2
lines changed

src/main/java/com/rabbitmq/client/RecoveryListener.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,12 @@ public interface RecoveryListener {
3636
* @param recoverable a {@link Recoverable} connection.
3737
*/
3838
void handleRecoveryStarted(Recoverable recoverable);
39+
40+
/**
41+
* Invoked before automatic topology recovery starts.
42+
* This means that the connection and channel recovery has completed
43+
* and that exchange/queue/binding/consumer recovery is about to begin.
44+
* @param recoverable a {@link Recoverable} connection.
45+
*/
46+
default void handleTopologyRecoveryStarted(Recoverable recoverable) {}
3947
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,10 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
560560
}
561561

562562
private synchronized void beginAutomaticRecovery() throws InterruptedException {
563-
this.wait(this.params.getRecoveryDelayHandler().getDelay(0));
563+
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
564+
if (delay > 0) {
565+
this.wait(delay);
566+
}
564567

565568
this.notifyRecoveryListenersStarted();
566569

@@ -576,6 +579,7 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
576579
// don't assign new delegate connection until channel recovery is complete
577580
this.delegate = newConn;
578581
if (this.params.isTopologyRecoveryEnabled()) {
582+
notifyTopologyRecoveryListenersStarted();
579583
recoverTopology(params.getTopologyRecoveryExecutor());
580584
}
581585
this.notifyRecoveryListenersComplete();
@@ -650,6 +654,12 @@ private void notifyRecoveryListenersStarted() {
650654
}
651655
}
652656

657+
private void notifyTopologyRecoveryListenersStarted() {
658+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
659+
f.handleTopologyRecoveryStarted(this);
660+
}
661+
}
662+
653663
private void recoverTopology(final ExecutorService executor) {
654664
// The recovery sequence is the following:
655665
// 1. Recover exchanges

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public String getPassword() {
170170
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
171171
@Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
172172
final List<String> events = new CopyOnWriteArrayList<String>();
173-
final CountDownLatch latch = new CountDownLatch(2); // one when started, another when complete
173+
final CountDownLatch latch = new CountDownLatch(3); // one when started, another when complete
174174
connection.addShutdownListener(new ShutdownListener() {
175175
@Override
176176
public void shutdownCompleted(ShutdownSignalException cause) {
@@ -202,6 +202,10 @@ public void handleRecovery(Recoverable recoverable) {
202202
public void handleRecoveryStarted(Recoverable recoverable) {
203203
latch.countDown();
204204
}
205+
@Override
206+
public void handleTopologyRecoveryStarted(Recoverable recoverable) {
207+
latch.countDown();
208+
}
205209
});
206210
assertThat(connection.isOpen()).isTrue();
207211
closeAndWaitForRecovery();

0 commit comments

Comments
 (0)