Skip to content

Commit b4e0a26

Browse files
Merge branch 'vikinghawk-5.x.x-stable' into 5.x.x-stable
Conflicts: src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
2 parents 5b740ce + f3af73e commit b4e0a26

File tree

3 files changed

+161
-69
lines changed

3 files changed

+161
-69
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 95 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -767,45 +767,64 @@ public void recoverExchange(RecordedExchange x, boolean retry) {
767767
}
768768
}
769769

770-
770+
/**
771+
* Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
772+
* @param oldName queue name
773+
* @param q recorded queue
774+
* @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
775+
*/
771776
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
772777
try {
773-
if (topologyRecoveryFilter.filterQueue(q)) {
774-
LOGGER.debug("Recovering {}", q);
775-
if (retry) {
776-
final RecordedQueue entity = q;
777-
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
778-
entity.recover();
779-
return null;
780-
}).getRecordedEntity();
781-
} else {
782-
q.recover();
783-
}
784-
String newName = q.getName();
785-
if (!oldName.equals(newName)) {
786-
// make sure server-named queues are re-added with
787-
// their new names. MK.
788-
synchronized (this.recordedQueues) {
789-
this.propagateQueueNameChangeToBindings(oldName, newName);
790-
this.propagateQueueNameChangeToConsumers(oldName, newName);
791-
// bug26552:
792-
// remove old name after we've updated the bindings and consumers,
793-
deleteRecordedQueue(oldName);
794-
this.recordedQueues.put(newName, q);
795-
}
796-
}
797-
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
798-
qrl.queueRecovered(oldName, newName);
799-
}
800-
LOGGER.debug("{} has recovered", q);
801-
}
778+
internalRecoverQueue(oldName, q, retry);
802779
} catch (Exception cause) {
803780
final String message = "Caught an exception while recovering queue " + oldName +
804781
": " + cause.getMessage();
805782
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, q);
806783
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
807784
}
808785
}
786+
787+
/**
788+
* Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
789+
* @param oldName queue name
790+
* @param q recorded queue
791+
* @throws Exception if an error occurs recovering the queue
792+
*/
793+
void recoverQueue(final String oldName, RecordedQueue q) throws Exception {
794+
internalRecoverQueue(oldName, q, false);
795+
}
796+
797+
private void internalRecoverQueue(final String oldName, RecordedQueue q, boolean retry) throws Exception {
798+
if (topologyRecoveryFilter.filterQueue(q)) {
799+
LOGGER.debug("Recovering {}", q);
800+
if (retry) {
801+
final RecordedQueue entity = q;
802+
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
803+
entity.recover();
804+
return null;
805+
}).getRecordedEntity();
806+
} else {
807+
q.recover();
808+
}
809+
String newName = q.getName();
810+
if (!oldName.equals(newName)) {
811+
// make sure queues are re-added with
812+
// their new names, if applicable. MK.
813+
synchronized (this.recordedQueues) {
814+
this.propagateQueueNameChangeToBindings(oldName, newName);
815+
this.propagateQueueNameChangeToConsumers(oldName, newName);
816+
// bug26552:
817+
// remove old name after we've updated the bindings and consumers,
818+
deleteRecordedQueue(oldName);
819+
this.recordedQueues.put(newName, q);
820+
}
821+
}
822+
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
823+
qrl.queueRecovered(oldName, newName);
824+
}
825+
LOGGER.debug("{} has recovered", q);
826+
}
827+
}
809828

810829
public void recoverBinding(RecordedBinding b, boolean retry) {
811830
try {
@@ -829,41 +848,61 @@ public void recoverBinding(RecordedBinding b, boolean retry) {
829848
}
830849
}
831850

851+
/**
852+
* Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
853+
* @param tag consumer tag
854+
* @param consumer recorded consumer
855+
* @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
856+
*/
832857
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
833858
try {
834-
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
835-
LOGGER.debug("Recovering {}", consumer);
836-
String newTag = null;
837-
if (retry) {
838-
final RecordedConsumer entity = consumer;
839-
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
840-
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
841-
newTag = (String) retryResult.getResult();
842-
} else {
843-
newTag = consumer.recover();
844-
}
845-
846-
// make sure server-generated tags are re-added. MK.
847-
if(tag != null && !tag.equals(newTag)) {
848-
synchronized (this.consumers) {
849-
this.consumers.remove(tag);
850-
this.consumers.put(newTag, consumer);
851-
}
852-
consumer.getChannel().updateConsumerTag(tag, newTag);
853-
}
854-
855-
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
856-
crl.consumerRecovered(tag, newTag);
857-
}
858-
LOGGER.debug("{} has recovered", consumer);
859-
}
859+
internalRecoverConsumer(tag, consumer, retry);
860860
} catch (Exception cause) {
861861
final String message = "Caught an exception while recovering consumer " + tag +
862862
": " + cause.getMessage();
863863
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, consumer);
864864
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
865865
}
866866
}
867+
868+
/**
869+
* Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
870+
* @param tag consumer tag
871+
* @param consumer recorded consumer
872+
* @throws Exception if an error occurs recovering the consumer
873+
*/
874+
void recoverConsumer(final String tag, RecordedConsumer consumer) throws Exception {
875+
internalRecoverConsumer(tag, consumer, false);
876+
}
877+
878+
private void internalRecoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) throws Exception {
879+
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
880+
LOGGER.debug("Recovering {}", consumer);
881+
String newTag = null;
882+
if (retry) {
883+
final RecordedConsumer entity = consumer;
884+
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
885+
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
886+
newTag = (String) retryResult.getResult();
887+
} else {
888+
newTag = consumer.recover();
889+
}
890+
891+
// make sure server-generated tags are re-added. MK.
892+
if(tag != null && !tag.equals(newTag)) {
893+
synchronized (this.consumers) {
894+
this.consumers.remove(tag);
895+
this.consumers.put(newTag, consumer);
896+
}
897+
consumer.getChannel().updateConsumerTag(tag, newTag);
898+
}
899+
900+
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
901+
crl.consumerRecovered(tag, newTag);
902+
}
903+
LOGGER.debug("{} has recovered", consumer);
904+
}
905+
}
867906

868907
private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
869908
if (this.retryHandler == null) {

src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public RecordedQueue exclusive(boolean value) {
4141
this.exclusive = value;
4242
return this;
4343
}
44+
45+
public boolean isExclusive() {
46+
return this.exclusive;
47+
}
4448

4549
public RecordedQueue serverNamed(boolean value) {
4650
this.serverNamed = value;
@@ -51,8 +55,6 @@ public boolean isServerNamed() {
5155
return this.serverNamed;
5256
}
5357

54-
public boolean isAutoDelete() { return this.autoDelete; }
55-
5658
public void recover() throws IOException {
5759
this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(),
5860
this.durable,
@@ -69,17 +71,29 @@ public RecordedQueue durable(boolean value) {
6971
this.durable = value;
7072
return this;
7173
}
74+
75+
public boolean isDurable() {
76+
return this.durable;
77+
}
7278

7379
public RecordedQueue autoDelete(boolean value) {
7480
this.autoDelete = value;
7581
return this;
7682
}
83+
84+
public boolean isAutoDelete() {
85+
return this.autoDelete;
86+
}
7787

7888
public RecordedQueue arguments(Map<String, Object> value) {
7989
this.arguments = value;
8090
return this;
8191
}
82-
92+
93+
public Map<String, Object> getArguments() {
94+
return arguments;
95+
}
96+
8397
public RecordedQueue recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
8498
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
8599
return this;

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.ShutdownSignalException;
2020
import com.rabbitmq.utility.Utility;
21+
import java.util.LinkedHashSet;
22+
import java.util.Set;
23+
import java.util.Map.Entry;
2124
import java.util.function.BiPredicate;
2225
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
2326

@@ -62,7 +65,7 @@ public abstract class TopologyRecoveryRetryLogic {
6265
if (context.entity() instanceof RecordedQueue) {
6366
final RecordedQueue recordedQueue = context.queue();
6467
AutorecoveringConnection connection = context.connection();
65-
connection.recoverQueue(recordedQueue.getName(), recordedQueue, false);
68+
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
6669
}
6770
return null;
6871
};
@@ -76,9 +79,7 @@ public abstract class TopologyRecoveryRetryLogic {
7679
AutorecoveringConnection connection = context.connection();
7780
RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination());
7881
if (recordedQueue != null) {
79-
connection.recoverQueue(
80-
recordedQueue.getName(), recordedQueue, false
81-
);
82+
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
8283
}
8384
}
8485
return null;
@@ -122,9 +123,7 @@ public abstract class TopologyRecoveryRetryLogic {
122123
AutorecoveringConnection connection = context.connection();
123124
RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue());
124125
if (recordedQueue != null) {
125-
connection.recoverQueue(
126-
recordedQueue.getName(), recordedQueue, false
127-
);
126+
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
128127
}
129128
}
130129
return null;
@@ -165,14 +164,52 @@ public abstract class TopologyRecoveryRetryLogic {
165164
} else if (consumer.getChannel() == channel) {
166165
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection());
167166
RECOVER_CONSUMER_QUEUE.call(retryContext);
168-
context.connection().recoverConsumer(consumer.getConsumerTag(), consumer, false);
167+
context.connection().recoverConsumer(consumer.getConsumerTag(), consumer);
169168
RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext);
170169
}
171170
}
172171
return context.consumer().getConsumerTag();
173172
}
174173
return null;
175174
};
175+
176+
/**
177+
* Recover earlier auto-delete or exclusive queues that share the same channel as this retry context
178+
*/
179+
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_PREVIOUS_AUTO_DELETE_QUEUES = context -> {
180+
if (context.entity() instanceof RecordedQueue) {
181+
AutorecoveringConnection connection = context.connection();
182+
RecordedQueue queue = context.queue();
183+
// recover all queues for the same channel that had already been recovered successfully before this queue failed.
184+
// If the previous ones were auto-delete or exclusive, they need recovered again
185+
for (Entry<String, RecordedQueue> entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
186+
if (entry.getValue() == queue) {
187+
// we have gotten to the queue in this context. Since this is an ordered map we can now break
188+
// as we know we have recovered all the earlier queues on this channel
189+
break;
190+
} else if (queue.getChannel() == entry.getValue().getChannel()
191+
&& (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
192+
connection.recoverQueue(entry.getKey(), entry.getValue());
193+
}
194+
}
195+
} else if (context.entity() instanceof RecordedQueueBinding) {
196+
AutorecoveringConnection connection = context.connection();
197+
Set<String> queues = new LinkedHashSet<>();
198+
for (Entry<String, RecordedQueue> entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
199+
if (context.entity().getChannel() == entry.getValue().getChannel()
200+
&& (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
201+
connection.recoverQueue(entry.getKey(), entry.getValue());
202+
queues.add(entry.getValue().getName());
203+
}
204+
}
205+
for (final RecordedBinding binding : Utility.copy(connection.getRecordedBindings())) {
206+
if (binding instanceof RecordedQueueBinding && queues.contains(binding.getDestination())) {
207+
binding.recover();
208+
}
209+
}
210+
}
211+
return null;
212+
};
176213

177214
/**
178215
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
@@ -188,11 +225,13 @@ public abstract class TopologyRecoveryRetryLogic {
188225
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
189226
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
190227
.queueRecoveryRetryOperation(RECOVER_CHANNEL
191-
.andThen(RECOVER_QUEUE))
228+
.andThen(RECOVER_QUEUE)
229+
.andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
192230
.bindingRecoveryRetryOperation(RECOVER_CHANNEL
193231
.andThen(RECOVER_BINDING_QUEUE)
194232
.andThen(RECOVER_BINDING)
195-
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
233+
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)
234+
.andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
196235
.consumerRecoveryRetryOperation(RECOVER_CHANNEL
197236
.andThen(RECOVER_CONSUMER_QUEUE)
198237
.andThen(RECOVER_CONSUMER)

0 commit comments

Comments
 (0)