Skip to content

Consumer not recovering connection after timeout #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Hendr-ik-a opened this issue Oct 14, 2024 · 6 comments · Fixed by #641
Closed

Consumer not recovering connection after timeout #630

Hendr-ik-a opened this issue Oct 14, 2024 · 6 comments · Fixed by #641
Assignees
Labels
bug Something isn't working
Milestone

Comments

@Hendr-ik-a
Copy link

Describe the bug


com.rabbitmq.stream.StreamException: Error while creating stream connection to xxxx:5552. connection timed out after 30000 ms: xxxx/xxxx:5552. This may be due to the usage of a load balancer that makes topology discovery fail. Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.
at com.rabbitmq.stream.impl.Utils.lambda$coordinatorClientFactory$10(Utils.java:175) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.<init>(ConsumersCoordinator.java:806) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.<init>(ConsumersCoordinator.java:569) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator.addToManager(ConsumersCoordinator.java:194) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator.access$2000(ConsumersCoordinator.java:56) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.recoverSubscription(ConsumersCoordinator.java:909) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.maybeRecoverSubscription(ConsumersCoordinator.java:876) ~[stream-client-0.17.0.jar!/:0.17.0]
at com.rabbitmq.stream.impl.ConsumersCoordinator$ClientSubscriptionsManager.lambda$assignConsumersToStream$12(ConsumersCoordinator.java:845) ~[stream-client-0.17.0.jar!/:0.17.0]

Looking at the implementation of ConsumersCoordinator.recoverSubscription method, it seems like the exception is catched by the general Exception catch block where reassignmentCompleted parameter is set to true which ends the recovery process, even though the error message indicates a connection timeout -

private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
      boolean reassignmentCompleted = false;
      while (!reassignmentCompleted) {
        try {
          if (tracker.consumer.isOpen()) {
            Broker broker = pickBroker(candidates);
            LOGGER.debug("Using {} to resume consuming from {}", broker, tracker.stream);
            synchronized (tracker.consumer) {
              if (tracker.consumer.isOpen()) {
                OffsetSpecification offsetSpecification;
                if (tracker.hasReceivedSomething) {
                  offsetSpecification = OffsetSpecification.offset(tracker.offset);
                } else {
                  offsetSpecification = tracker.initialOffsetSpecification;
                }
                addToManager(broker, tracker, offsetSpecification, false);
              }
            }
          } else {
            LOGGER.debug(
                "Not re-assigning consumer {} (stream '{}') because it has been closed",
                tracker.consumer.id(),
                tracker.stream);
          }
          reassignmentCompleted = true;
        } catch (ConnectionStreamException
            | ClientClosedException
            | StreamNotAvailableException e) {
          LOGGER.debug(
              "Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, "
                  + "refreshing candidates and retrying",
              tracker.consumer.id(),
              tracker.stream);
          // maybe not a good candidate, let's refresh and retry for this one
          candidates =
              Utils.callAndMaybeRetry(
                  findBrokersForStream(tracker.stream),
                  ex -> !(ex instanceof StreamDoesNotExistException),
                  recoveryBackOffDelayPolicy(),
                  "Candidate lookup to consume from '%s' (subscription recovery)",
                  tracker.stream);
        } catch (Exception e) {
          LOGGER.warn("Error while re-assigning subscription from stream {}", tracker.stream, e);
          reassignmentCompleted = true;
        }
      }
    }

Reproduction steps

  1. Set up a 3 node rabbitmq stream environment with single active consumer
  2. Make a connection timeout longer than 30000 ms

...

Expected behavior

Client would not stop trying to reconnect the consumer when a timeout occurs.

Additional context

RabbitMQ is running on 3 nodes with a single active consumer. Restarting the nodes works as intended - leader node changes accordingly and consumer recovers after the node restart (Maybe due to shorter timeout period?)

@Hendr-ik-a Hendr-ik-a added the bug Something isn't working label Oct 14, 2024
@acogoluegnes
Copy link
Contributor

Can you increase the connection timeout? Set up a channel customizer in the Netty configuration of the environment builder, e.g ch.config().setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60_000).

@Hendr-ik-a
Copy link
Author

Can you increase the connection timeout? Set up a channel customizer in the Netty configuration of the environment builder, e.g ch.config().setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60_000).

Thank you for the suggestion to increase the connection timeout by setting a custom channel option in the Netty configuration ch.config().setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60_000).

The issue is that when this timeout occurs, it seems to cancel the entire recovery process. Is this the intended behavior? Shouldn’t connection-related timeouts trigger a retry or a different recovery mechanism rather than canceling the process entirely?

@michaelklishin
Copy link
Contributor

@Hendr-ik-a for initial connections, they should not. For recovery of an existing connection, any I/O exception should trigger a retry. At least that's how most RabbitMQ clients approach connection recovery since 2013, when it was introduced in Ruby and Java (AMQP 0-9-1) clients.

@acogoluegnes
Copy link
Contributor

@Hendr-ik-a I pushed a fix, can you try the snapshot?

@Hendr-ik-a
Copy link
Author

Thanks for the fix! I’ll test the snapshot and let you know if the issue is resolved.

@michaelklishin
Copy link
Contributor

@Hendr-ik-a have you had a chance to test the snapshot? Did it address the issue?

acogoluegnes added a commit that referenced this issue Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants