Skip to content

Commit 854850a

Browse files
committed
Complete re-assignment if consumer is closed
1 parent 7d336c1 commit 854850a

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -686,8 +686,8 @@ private void assignConsumersToStream(
686686
} else {
687687
for (SubscriptionTracker affectedSubscription : subscriptions) {
688688
ManagerPool subscriptionPool = null;
689-
boolean reassigned = false;
690-
while (!reassigned) {
689+
boolean reassignmentCompleted = false;
690+
while (!reassignmentCompleted) {
691691
try {
692692
if (affectedSubscription.consumer.isOpen()) {
693693
Client.Broker broker = pickBroker(candidates);
@@ -715,11 +715,14 @@ private void assignConsumersToStream(
715715
}
716716
subscriptionPool.add(
717717
affectedSubscription, offsetSpecification, false);
718-
reassigned = true;
718+
reassignmentCompleted = true;
719+
} else {
720+
reassignmentCompleted = true;
719721
}
720722
}
721723
} else {
722724
LOGGER.debug("Not re-assigning consumer because it has been closed");
725+
reassignmentCompleted = true;
723726
}
724727
} catch (TimeoutStreamException e) {
725728
LOGGER.debug(
@@ -740,6 +743,7 @@ private void assignConsumersToStream(
740743
} catch (Exception e) {
741744
LOGGER.warn(
742745
"Error while re-assigning subscription from stream {}", stream, e);
746+
reassignmentCompleted = true;
743747
}
744748
}
745749
}

0 commit comments

Comments
 (0)