From b1782f082b1319e35c754e85831c4fbddc66e975 Mon Sep 17 00:00:00 2001 From: Michael Dent Date: Fri, 12 Feb 2021 22:28:02 -0600 Subject: [PATCH 1/4] binding failure retry should recover all queue bindings --- .../recovery/AutorecoveringConnection.java | 4 + .../recovery/TopologyRecoveryRetryLogic.java | 23 ++- .../functional/TopologyRecoveryRetry.java | 151 +++++++++++++++++- 3 files changed, 175 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 94b24aa7c2..d5d1719b02 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -1091,6 +1091,10 @@ public Map getRecordedExchanges() { public List getRecordedBindings() { return recordedBindings; } + + public Map getRecordedConsumers() { + return consumers; + } @Override public String toString() { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java index 6a91a5202a..3bb471215b 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -77,7 +77,28 @@ public abstract class TopologyRecoveryRetryLogic { * Recover a binding. */ public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING = context -> { - context.binding().recover(); + if (context.entity() instanceof RecordedQueueBinding) { + // recover all bindings for the queue. + // need to do this incase some bindings have already been recovered successfully before this binding failed + String queue = context.binding().getDestination(); + for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) { + if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) { + recordedBinding.recover(); + } + } + } else if (context.entity() instanceof RecordedExchangeBinding) { + // recover all bindings for the exchange + // need to do this incase some bindings have already been recovered successfully before this binding failed + String exchange = context.binding().getDestination(); + for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) { + if (recordedBinding instanceof RecordedExchangeBinding && exchange.equals(recordedBinding.getDestination())) { + recordedBinding.recover(); + } + } + } else { + // should't be possible to get here, but just in case recover just this binding + context.binding().recover(); + } return null; }; diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java index ffc744bb86..484c67c9f8 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java @@ -17,11 +17,24 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; import com.rabbitmq.client.test.BrokerTestCase; import com.rabbitmq.client.test.TestUtils; +import com.rabbitmq.tools.Host; +import org.junit.After; import org.junit.Test; - +import java.io.IOException; import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER; import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; @@ -32,6 +45,13 @@ */ public class TopologyRecoveryRetry extends BrokerTestCase { + private Consumer backoffConsumer; + + @After + public void cleanup() { + backoffConsumer = null; + } + @Test public void topologyRecoveryRetry() throws Exception { int nbQueues = 200; @@ -40,6 +60,7 @@ public void topologyRecoveryRetry() throws Exception { String queue = prefix + i; channel.queueDeclare(queue, false, false, true, new HashMap<>()); channel.queueBind(queue, "amq.direct", queue); + channel.queueBind(queue, "amq.direct", queue + "2"); channel.basicConsume(queue, true, new DefaultConsumer(channel)); } @@ -47,11 +68,137 @@ public void topologyRecoveryRetry() throws Exception { assertTrue(channel.isOpen()); } + + @Test + public void topologyRecoveryBindingFailure() throws Exception { + final String queue = "topology-recovery-retry-binding-failure" + System.currentTimeMillis(); + channel.queueDeclare(queue, false, false, true, new HashMap<>()); + channel.queueBind(queue, "amq.topic", "topic1"); + channel.queueBind(queue, "amq.topic", "topic2"); + final CountDownLatch messagesReceivedLatch = new CountDownLatch(2); + channel.basicConsume(queue, true, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { + System.out.println("Got message=" + new String(body)); + messagesReceivedLatch.countDown(); + } + }); + final CountDownLatch recoveryLatch = new CountDownLatch(1); + ((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() { + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // no-op + } + @Override + public void handleRecovery(Recoverable recoverable) { + recoveryLatch.countDown(); + } + }); + + // we want recovery to fail when recovering the 2nd binding + // give the 2nd recorded binding a bad queue name so it fails + final RecordedBinding binding2 = ((AutorecoveringConnection)connection).getRecordedBindings().get(1); + binding2.destination(UUID.randomUUID().toString()); + + // use the backoffConsumer to know that it has failed + // then delete the real queue & fix the recorded binding + // it should fail once more because queue is gone, and then succeed + final CountDownLatch backoffLatch = new CountDownLatch(1); + backoffConsumer = attempt -> { + binding2.destination(queue); + try { + Host.rabbitmqctl("delete_queue " + queue); + Thread.sleep(2000); + } catch (Exception e) { + e.printStackTrace(); + } + backoffLatch.countDown(); + }; + + // close connection + Host.closeAllConnections(); + + // assert backoff was called + assertTrue(backoffLatch.await(90, TimeUnit.SECONDS)); + // wait for full recovery + assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS)); + + // publish messages to verify both bindings were recovered + basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1"); + basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2"); + + assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS)); + } + + @Test + public void topologyRecoveryConsumerFailure() throws Exception { + final String queue = "topology-recovery-retry-consumer-failure" + System.currentTimeMillis(); + channel.queueDeclare(queue, false, false, true, new HashMap<>()); + channel.queueBind(queue, "amq.topic", "topic1"); + channel.queueBind(queue, "amq.topic", "topic2"); + final CountDownLatch messagesReceivedLatch = new CountDownLatch(2); + channel.basicConsume(queue, true, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { + System.out.println("Got message=" + new String(body)); + messagesReceivedLatch.countDown(); + } + }); + final CountDownLatch recoveryLatch = new CountDownLatch(1); + ((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() { + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // no-op + } + @Override + public void handleRecovery(Recoverable recoverable) { + recoveryLatch.countDown(); + } + }); + + // we want recovery to fail when recovering the consumer + // give the recorded consumer a bad queue name so it fails + final RecordedConsumer consumer = ((AutorecoveringConnection)connection).getRecordedConsumers().values().iterator().next(); + consumer.setQueue(UUID.randomUUID().toString()); + + // use the backoffConsumer to know that it has failed + // then delete the real queue & fix the recorded consumer + // it should fail once more because queue is gone, and then succeed + final CountDownLatch backoffLatch = new CountDownLatch(1); + backoffConsumer = attempt -> { + consumer.setQueue(queue); + try { + Host.rabbitmqctl("delete_queue " + queue); + Thread.sleep(2000); + } catch (Exception e) { + e.printStackTrace(); + } + backoffLatch.countDown(); + }; + + // close connection + Host.closeAllConnections(); + + // assert backoff was called + assertTrue(backoffLatch.await(90, TimeUnit.SECONDS)); + // wait for full recovery + assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS)); + + // publish messages to verify both bindings & consumer were recovered + basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1"); + basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2"); + + assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS)); + } @Override protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); - connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build()); + connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.backoffPolicy(attempt -> { + if (backoffConsumer != null) { + backoffConsumer.accept(attempt); + } + }).build()); connectionFactory.setNetworkRecoveryInterval(1000); return connectionFactory; } From 6aea5b2eec38a923f7c4eca379e6e643283c982c Mon Sep 17 00:00:00 2001 From: Michael Dent Date: Fri, 12 Feb 2021 22:40:57 -0600 Subject: [PATCH 2/4] only drop on 1st retry --- .../functional/TopologyRecoveryRetry.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java index 484c67c9f8..923b139210 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java @@ -45,7 +45,7 @@ */ public class TopologyRecoveryRetry extends BrokerTestCase { - private Consumer backoffConsumer; + private volatile Consumer backoffConsumer; @After public void cleanup() { @@ -105,12 +105,14 @@ public void handleRecovery(Recoverable recoverable) { // it should fail once more because queue is gone, and then succeed final CountDownLatch backoffLatch = new CountDownLatch(1); backoffConsumer = attempt -> { - binding2.destination(queue); - try { - Host.rabbitmqctl("delete_queue " + queue); - Thread.sleep(2000); - } catch (Exception e) { - e.printStackTrace(); + if (attempt == 1) { + binding2.destination(queue); + try { + Host.rabbitmqctl("delete_queue " + queue); + Thread.sleep(2000); + } catch (Exception e) { + e.printStackTrace(); + } } backoffLatch.countDown(); }; @@ -166,12 +168,14 @@ public void handleRecovery(Recoverable recoverable) { // it should fail once more because queue is gone, and then succeed final CountDownLatch backoffLatch = new CountDownLatch(1); backoffConsumer = attempt -> { - consumer.setQueue(queue); - try { - Host.rabbitmqctl("delete_queue " + queue); - Thread.sleep(2000); - } catch (Exception e) { - e.printStackTrace(); + if (attempt == 1) { + consumer.setQueue(queue); + try { + Host.rabbitmqctl("delete_queue " + queue); + Thread.sleep(2000); + } catch (Exception e) { + e.printStackTrace(); + } } backoffLatch.countDown(); }; From ae5972a4aca51537b9ee6b01346ccb74396808c2 Mon Sep 17 00:00:00 2001 From: Michael Dent Date: Sat, 13 Feb 2021 21:08:09 -0600 Subject: [PATCH 3/4] short circuit the queue binding recovery. Removed e2e binding logic as it doesn't apply in this usecase --- .../recovery/TopologyRecoveryRetryLogic.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java index 3bb471215b..1eb230e24c 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -22,6 +22,8 @@ import java.util.function.BiPredicate; import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; +import ch.qos.logback.core.Context; + /** * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}. * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}. @@ -77,27 +79,27 @@ public abstract class TopologyRecoveryRetryLogic { * Recover a binding. */ public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING = context -> { + context.binding().recover(); + return null; + }; + + /** + * Recover earlier bindings that share the same queue as this retry context + */ + public static final DefaultRetryHandler.RetryOperation RECOVER_PREVIOUS_QUEUE_BINDINGS = context -> { if (context.entity() instanceof RecordedQueueBinding) { - // recover all bindings for the queue. - // need to do this incase some bindings have already been recovered successfully before this binding failed + // recover all bindings for the same queue that were recovered before this current binding + // need to do this incase some bindings had already been recovered successfully before the queue was deleted & this binding failed String queue = context.binding().getDestination(); for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) { - if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) { - recordedBinding.recover(); - } - } - } else if (context.entity() instanceof RecordedExchangeBinding) { - // recover all bindings for the exchange - // need to do this incase some bindings have already been recovered successfully before this binding failed - String exchange = context.binding().getDestination(); - for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) { - if (recordedBinding instanceof RecordedExchangeBinding && exchange.equals(recordedBinding.getDestination())) { + if (recordedBinding == context.entity()) { + // we have gotten to the binding in this context. Since this is an ordered list we can now break + // as we know we have recovered all the earlier bindings that may have existed on this queue + break; + } else if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) { recordedBinding.recover(); } } - } else { - // should't be possible to get here, but just in case recover just this binding - context.binding().recover(); } return null; }; @@ -148,7 +150,8 @@ public abstract class TopologyRecoveryRetryLogic { public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder() .bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) .consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) - .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)) + .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING) + .andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)) .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER) .andThen(RECOVER_CONSUMER_QUEUE_BINDINGS))); } From 30447a74d5da510a8018ecf6b0727395941ea338 Mon Sep 17 00:00:00 2001 From: Michael Dent Date: Sat, 13 Feb 2021 21:09:23 -0600 Subject: [PATCH 4/4] remove unused import --- .../client/impl/recovery/TopologyRecoveryRetryLogic.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java index 1eb230e24c..94c347f7fa 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -22,8 +22,6 @@ import java.util.function.BiPredicate; import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; -import ch.qos.logback.core.Context; - /** * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}. * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}.