subscriptionProperties) {
+ this.id = id;
this.consumer = consumer;
this.stream = stream;
this.initialOffsetSpecification = initialOffsetSpecification;
@@ -284,6 +429,7 @@ synchronized void cancel() {
} else {
LOGGER.debug("No manager to remove consumer from");
}
+ this.state(SubscriptionState.CLOSED);
}
synchronized void assign(byte subscriptionIdInClient, ClientSubscriptionsManager manager) {
@@ -302,6 +448,25 @@ synchronized void detachFromManager() {
this.manager = null;
this.consumer.setSubscriptionClient(null);
}
+
+ void state(SubscriptionState state) {
+ this.state.set(state);
+ }
+
+ boolean compareAndSet(SubscriptionState expected, SubscriptionState newValue) {
+ return this.state.compareAndSet(expected, newValue);
+ }
+
+ SubscriptionState state() {
+ return this.state.get();
+ }
+ }
+
+ private enum SubscriptionState {
+ OPENING,
+ ACTIVE,
+ RECOVERING,
+ CLOSED
}
private static final class MessageHandlerContext implements Context {
@@ -349,103 +514,34 @@ public Consumer consumer() {
}
}
- /**
- * Maintains {@link ClientSubscriptionsManager} instances for a given host.
- *
- * Creates new {@link ClientSubscriptionsManager} instances (and so {@link Client}s, i.e.
- * connections) when needed and disposes them when appropriate.
- */
- private class ManagerPool {
-
- private final List managers = new CopyOnWriteArrayList<>();
- private final String name;
- private final Client.ClientParameters clientParameters;
-
- private ManagerPool(String name, Client.ClientParameters clientParameters) {
- this.name = name;
- this.clientParameters = clientParameters;
- LOGGER.debug("Creating client subscription pool on {}", name);
- managers.add(new ClientSubscriptionsManager(this, clientParameters));
- }
-
- private synchronized void add(
- SubscriptionTracker subscriptionTracker,
- OffsetSpecification offsetSpecification,
- boolean isInitialSubscription) {
- boolean added = false;
- // FIXME deal with manager unavailability (manager may be closing because of connection
- // closing)
- // try all of them until it succeeds, throw exception if failure
- for (ClientSubscriptionsManager manager : managers) {
- if (!manager.isFull()) {
- manager.add(subscriptionTracker, offsetSpecification, isInitialSubscription);
- added = true;
- break;
- }
- }
- if (!added) {
- LOGGER.debug(
- "Creating subscription manager on {}, this is subscription manager #{}",
- name,
- managers.size() + 1);
- ClientSubscriptionsManager manager = new ClientSubscriptionsManager(this, clientParameters);
- managers.add(manager);
- manager.add(subscriptionTracker, offsetSpecification, isInitialSubscription);
- }
- }
-
- private synchronized void clean() {
- for (ClientSubscriptionsManager manager : managers) {
- maybeDisposeManager(manager);
- }
- }
-
- private synchronized void maybeDisposeManager(
- ClientSubscriptionsManager clientSubscriptionsManager) {
- if (clientSubscriptionsManager.isEmpty()) {
- clientSubscriptionsManager.close();
- this.remove(clientSubscriptionsManager);
- }
- }
-
- private synchronized void remove(ClientSubscriptionsManager clientSubscriptionsManager) {
- managers.remove(clientSubscriptionsManager);
- if (managers.isEmpty()) {
- pools.remove(name);
- LOGGER.debug("Disposed client subscription pool on {} because it was empty", name);
- }
- }
-
- synchronized void close() {
- for (ClientSubscriptionsManager manager : managers) {
- manager.close();
- }
- managers.clear();
- }
- }
-
/**
* Maintains a set of {@link SubscriptionTracker} instances on a {@link Client}.
*
* It dispatches inbound messages to the appropriate {@link SubscriptionTracker} and
* re-allocates {@link SubscriptionTracker}s in case of stream unavailability or disconnection.
*/
- private class ClientSubscriptionsManager {
+ private class ClientSubscriptionsManager implements Comparable {
+ private final long id;
+ private final Broker node;
private final Client client;
+ private final String name;
// the 2 data structures track the subscriptions, they must remain consistent
private final Map> streamToStreamSubscriptions =
new ConcurrentHashMap<>();
- private final ManagerPool owner;
+ // trackers and tracker count must be kept in sync
private volatile List subscriptionTrackers =
new ArrayList<>(maxConsumersByConnection);
+ private volatile int trackerCount = 0;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
- private ClientSubscriptionsManager(
- ManagerPool owner, Client.ClientParameters clientParameters) {
- this.owner = owner;
- String name = owner.name;
+ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientParameters) {
+ this.id = managerIdSequence.getAndIncrement();
+ this.node = node;
+ this.name = keyForClientSubscription(node);
LOGGER.debug("creating subscription manager on {}", name);
IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null));
+ this.trackerCount = 0;
AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
ChunkListener chunkListener =
(client, subscriptionId, offset, messageCount, dataSize) -> {
@@ -470,7 +566,6 @@ private ClientSubscriptionsManager(
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
SubscriptionTracker subscriptionTracker =
subscriptionTrackers.get(subscriptionId & 0xFF);
-
if (subscriptionTracker != null) {
subscriptionTracker.offset = offset;
subscriptionTracker.hasReceivedSomething = true;
@@ -480,50 +575,64 @@ private ClientSubscriptionsManager(
message);
// FIXME set offset here as well, best effort to avoid duplicates?
} else {
- LOGGER.debug("Could not find stream subscription {}", subscriptionId);
+ LOGGER.debug(
+ "Could not find stream subscription {} in manager {}, node {}",
+ subscriptionId,
+ this.id,
+ this.name);
}
};
ShutdownListener shutdownListener =
shutdownContext -> {
- // FIXME should the pool check if it's empty and so remove itself from the
- // pools data structure?
-
- // we may be closing the client because it's not the right node, so the manager
- // should not be removed from its pool, because it's not really in it already
- if (clientInitializedInManager.get()) {
- owner.remove(this);
- }
+ this.closed.set(true);
+ managers.remove(this);
if (shutdownContext.isShutdownUnexpected()) {
LOGGER.debug(
- "Unexpected shutdown notification on subscription client {}, scheduling consumers re-assignment",
+ "Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment",
name);
+ LOGGER.debug(
+ "Subscription connection has {} consumer(s) over {} stream(s) to recover",
+ this.subscriptionTrackers.stream().filter(Objects::nonNull).count(),
+ this.streamToStreamSubscriptions.size());
environment
.scheduledExecutorService()
.execute(
- () -> {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- subscriptionTrackers.stream()
- .filter(Objects::nonNull)
- .forEach(SubscriptionTracker::detachFromManager);
- for (Entry> entry :
- streamToStreamSubscriptions.entrySet()) {
- if (Thread.currentThread().isInterrupted()) {
- break;
- }
- String stream = entry.getKey();
- LOGGER.debug(
- "Re-assigning {} consumer(s) to stream {} after disconnection",
- entry.getValue().size(),
- stream);
- assignConsumersToStream(
- entry.getValue(),
- stream,
- attempt -> environment.recoveryBackOffDelayPolicy().delay(attempt),
- false);
- }
- });
+ namedRunnable(
+ () -> {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ subscriptionTrackers.stream()
+ .filter(Objects::nonNull)
+ .filter(t -> t.state() == SubscriptionState.ACTIVE)
+ .forEach(SubscriptionTracker::detachFromManager);
+ for (Entry> entry :
+ streamToStreamSubscriptions.entrySet()) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOGGER.debug("Interrupting consumer re-assignment task");
+ break;
+ }
+ String stream = entry.getKey();
+ Set trackersToReAssign = entry.getValue();
+ if (trackersToReAssign == null || trackersToReAssign.isEmpty()) {
+ LOGGER.debug(
+ "No consumer to re-assign to stream {} after disconnection",
+ stream);
+ } else {
+ LOGGER.debug(
+ "Re-assigning {} consumer(s) to stream {} after disconnection",
+ trackersToReAssign.size(),
+ stream);
+ assignConsumersToStream(
+ trackersToReAssign,
+ stream,
+ recoveryBackOffDelayPolicy(),
+ false);
+ }
+ }
+ },
+ "Consumers re-assignment after disconnection from %s",
+ name));
}
};
MetadataListener metadataListener =
@@ -531,9 +640,8 @@ private ClientSubscriptionsManager(
LOGGER.debug(
"Received metadata notification for '{}', stream is likely to have become unavailable",
stream);
-
Set affectedSubscriptions;
- synchronized (this.owner) {
+ synchronized (this) {
Set subscriptions = streamToStreamSubscriptions.remove(stream);
if (subscriptions != null && !subscriptions.isEmpty()) {
List newSubscriptions =
@@ -550,31 +658,32 @@ private ClientSubscriptionsManager(
newSubscriptions.set(subscription.subscriptionIdInClient & 0xFF, null);
subscription.consumer.setSubscriptionClient(null);
}
- this.subscriptionTrackers = newSubscriptions;
+ this.setSubscriptionTrackers(newSubscriptions);
}
affectedSubscriptions = subscriptions;
}
- if (isEmpty()) {
- this.owner.remove(this);
- }
+
if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
environment
.scheduledExecutorService()
.execute(
- () -> {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- LOGGER.debug(
- "Trying to move {} subscription(s) (stream {})",
- affectedSubscriptions.size(),
- stream);
- assignConsumersToStream(
- affectedSubscriptions,
- stream,
- metadataUpdateBackOffDelayPolicy(),
- isEmpty());
- });
+ namedRunnable(
+ () -> {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ LOGGER.debug(
+ "Trying to move {} subscription(s) (stream '{}')",
+ affectedSubscriptions.size(),
+ stream);
+ assignConsumersToStream(
+ affectedSubscriptions,
+ stream,
+ metadataUpdateBackOffDelayPolicy(),
+ true);
+ },
+ "Consumers re-assignment after metadata update on stream '%s'",
+ stream));
}
};
ConsumerUpdateListener consumerUpdateListener =
@@ -607,7 +716,7 @@ private ClientSubscriptionsManager(
.shutdownListener(shutdownListener)
.metadataListener(metadataListener)
.consumerUpdateListener(consumerUpdateListener))
- .key(owner.name);
+ .key(name);
this.client = clientFactory.client(clientFactoryContext);
LOGGER.debug("Created consumer connection '{}'", connectionName);
maybeExchangeCommandVersions(client);
@@ -618,7 +727,7 @@ private void assignConsumersToStream(
Collection subscriptions,
String stream,
BackOffDelayPolicy delayPolicy,
- boolean closeClient) {
+ boolean maybeCloseClient) {
Runnable consumersClosingCallback =
() -> {
for (SubscriptionTracker affectedSubscription : subscriptions) {
@@ -631,57 +740,23 @@ private void assignConsumersToStream(
};
AsyncRetry.asyncRetry(() -> findBrokersForStream(stream))
- .description("Candidate lookup to consume from " + stream)
+ .description("Candidate lookup to consume from '%s'", stream)
.scheduler(environment.scheduledExecutorService())
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
.delayPolicy(delayPolicy)
.build()
.thenAccept(
- candidates -> {
+ candidateNodes -> {
+ List candidates = candidateNodes;
if (candidates == null) {
+ LOGGER.debug("No candidate nodes to consume from '{}'", stream);
consumersClosingCallback.run();
} else {
for (SubscriptionTracker affectedSubscription : subscriptions) {
- try {
- if (affectedSubscription.consumer.isOpen()) {
- Client.Broker broker = pickBroker(candidates);
- LOGGER.debug("Using {} to resume consuming from {}", broker, stream);
- String key = keyForClientSubscription(broker);
- // FIXME in case the broker is no longer there, we may have to deal with an
- // error here
- // we could renew the list of candidates for the stream
- ManagerPool subscriptionPool =
- pools.computeIfAbsent(
- key,
- s ->
- new ManagerPool(
- key,
- environment
- .clientParametersCopy()
- .host(broker.getHost())
- .port(broker.getPort())));
- synchronized (affectedSubscription.consumer) {
- if (affectedSubscription.consumer.isOpen()) {
- OffsetSpecification offsetSpecification;
- if (affectedSubscription.hasReceivedSomething) {
- offsetSpecification =
- OffsetSpecification.offset(affectedSubscription.offset);
- } else {
- offsetSpecification = affectedSubscription.initialOffsetSpecification;
- }
- subscriptionPool.add(affectedSubscription, offsetSpecification, false);
- }
- }
- } else {
- LOGGER.debug("Not re-assigning consumer because it has been closed");
- }
- } catch (Exception e) {
- LOGGER.warn(
- "Error while re-assigning subscription from stream {}", stream, e);
- }
+ maybeRecoverSubscription(candidates, affectedSubscription);
}
- if (closeClient) {
- this.close();
+ if (maybeCloseClient) {
+ this.closeIfEmpty();
}
}
})
@@ -693,116 +768,232 @@ private void assignConsumersToStream(
stream,
ex);
consumersClosingCallback.run();
+ if (maybeCloseClient) {
+ this.closeIfEmpty();
+ }
return null;
});
}
- void add(
+ private void maybeRecoverSubscription(List candidates, SubscriptionTracker tracker) {
+ if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
+ try {
+ recoverSubscription(candidates, tracker);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Error while recovering consumer {} from stream '{}'. Reason: {}",
+ tracker.consumer.id(),
+ tracker.stream,
+ Utils.exceptionMessage(e));
+ }
+ } else {
+ LOGGER.debug(
+ "Not recovering consumer {} from stream {}, state is {}, expected is {}",
+ tracker.consumer.id(),
+ tracker.stream,
+ tracker.state(),
+ SubscriptionState.ACTIVE);
+ }
+ }
+
+ private void recoverSubscription(List candidates, SubscriptionTracker tracker) {
+ boolean reassignmentCompleted = false;
+ while (!reassignmentCompleted) {
+ try {
+ if (tracker.consumer.isOpen()) {
+ Broker broker = pickBroker(candidates);
+ LOGGER.debug("Using {} to resume consuming from {}", broker, tracker.stream);
+ synchronized (tracker.consumer) {
+ if (tracker.consumer.isOpen()) {
+ OffsetSpecification offsetSpecification;
+ if (tracker.hasReceivedSomething) {
+ offsetSpecification = OffsetSpecification.offset(tracker.offset);
+ } else {
+ offsetSpecification = tracker.initialOffsetSpecification;
+ }
+ addToManager(broker, tracker, offsetSpecification, false);
+ }
+ }
+ } else {
+ LOGGER.debug(
+ "Not re-assigning consumer {} (stream '{}') because it has been closed",
+ tracker.consumer.id(),
+ tracker.stream);
+ }
+ reassignmentCompleted = true;
+ } catch (ConnectionStreamException
+ | ClientClosedException
+ | StreamNotAvailableException e) {
+ LOGGER.debug(
+ "Consumer {} re-assignment on stream {} timed out or connection closed or stream not available, "
+ + "refreshing candidates and retrying",
+ tracker.consumer.id(),
+ tracker.stream);
+ // maybe not a good candidate, let's refresh and retry for this one
+ candidates =
+ Utils.callAndMaybeRetry(
+ () -> findBrokersForStream(tracker.stream),
+ ex -> !(ex instanceof StreamDoesNotExistException),
+ environment.recoveryBackOffDelayPolicy(),
+ "Candidate lookup to consume from '%s'",
+ tracker.stream);
+ } catch (Exception e) {
+ LOGGER.warn("Error while re-assigning subscription from stream {}", tracker.stream, e);
+ reassignmentCompleted = true;
+ }
+ }
+ }
+
+ private void checkNotClosed() {
+ if (!this.client.isOpen()) {
+ throw new ClientClosedException();
+ }
+ }
+
+ synchronized void add(
SubscriptionTracker subscriptionTracker,
OffsetSpecification offsetSpecification,
boolean isInitialSubscription) {
- synchronized (this.owner) {
+ if (this.isFull()) {
+ throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
+ }
+ if (this.isClosed()) {
+ throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
+ }
- // FIXME check manager is still open (not closed because of connection failure)
- byte subscriptionId = 0;
- for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
- if (subscriptionTrackers.get(i) == null) {
- subscriptionId = (byte) i;
- break;
- }
- }
+ checkNotClosed();
- List previousSubscriptions = this.subscriptionTrackers;
+ byte subscriptionId = 0;
+ for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
+ if (subscriptionTrackers.get(i) == null) {
+ subscriptionId = (byte) i;
+ break;
+ }
+ }
- LOGGER.debug(
- "Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}",
- subscriptionTracker.stream,
- offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification,
- subscriptionTracker.offsetTrackingReference,
- subscriptionTracker.subscriptionProperties);
- try {
- // updating data structures before subscribing
- // (to make sure they are up-to-date in case message would arrive super fast)
- subscriptionTracker.assign(subscriptionId, this);
- streamToStreamSubscriptions
- .computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet())
- .add(subscriptionTracker);
- this.subscriptionTrackers =
- update(previousSubscriptions, subscriptionId, subscriptionTracker);
-
- String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
- if (offsetTrackingReference != null) {
- QueryOffsetResponse queryOffsetResponse =
- client.queryOffset(offsetTrackingReference, subscriptionTracker.stream);
- if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0) {
- if (offsetSpecification != null && isInitialSubscription) {
- // subscription call (not recovery), so telling the user their offset specification
- // is
- // ignored
- LOGGER.info(
- "Requested offset specification {} not used in favor of stored offset found for reference {}",
- offsetSpecification,
- offsetTrackingReference);
- }
- LOGGER.debug(
- "Using offset {} to start consuming from {} with consumer {} "
- + "(instead of {})",
- queryOffsetResponse.getOffset(),
+ List previousSubscriptions = this.subscriptionTrackers;
+
+ LOGGER.debug(
+ "Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}",
+ subscriptionTracker.stream,
+ offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification,
+ subscriptionTracker.offsetTrackingReference,
+ subscriptionTracker.subscriptionProperties);
+ try {
+ // updating data structures before subscribing
+ // (to make sure they are up-to-date in case message would arrive super fast)
+ subscriptionTracker.assign(subscriptionId, this);
+ streamToStreamSubscriptions
+ .computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet())
+ .add(subscriptionTracker);
+ this.setSubscriptionTrackers(
+ update(previousSubscriptions, subscriptionId, subscriptionTracker));
+
+ String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
+ if (offsetTrackingReference != null) {
+ checkNotClosed();
+ QueryOffsetResponse queryOffsetResponse =
+ Utils.callAndMaybeRetry(
+ () -> client.queryOffset(offsetTrackingReference, subscriptionTracker.stream),
+ RETRY_ON_TIMEOUT,
+ "Offset query for consumer %s on stream '%s' (reference %s)",
+ subscriptionTracker.consumer.id(),
subscriptionTracker.stream,
- offsetTrackingReference,
- offsetSpecification);
- offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1);
+ offsetTrackingReference);
+ if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0) {
+ if (offsetSpecification != null && isInitialSubscription) {
+ // subscription call (not recovery), so telling the user their offset specification
+ // is
+ // ignored
+ LOGGER.info(
+ "Requested offset specification {} not used in favor of stored offset found for reference {}",
+ offsetSpecification,
+ offsetTrackingReference);
}
+ LOGGER.debug(
+ "Using offset {} to start consuming from {} with consumer {} " + "(instead of {})",
+ queryOffsetResponse.getOffset(),
+ subscriptionTracker.stream,
+ offsetTrackingReference,
+ offsetSpecification);
+ offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1);
}
+ }
- offsetSpecification =
- offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
-
- // TODO consider using/emulating ConsumerUpdateListener, to have only one API, not 2
- // even when the consumer is not a SAC.
- SubscriptionContext subscriptionContext =
- new DefaultSubscriptionContext(offsetSpecification);
- subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
- LOGGER.info(
- "Computed offset specification {}, offset specification used after subscription listener {}",
- offsetSpecification,
- subscriptionContext.offsetSpecification());
-
- // FIXME consider using fewer initial credits
- Client.Response subscribeResponse =
- client.subscribe(
- subscriptionId,
- subscriptionTracker.stream,
- subscriptionContext.offsetSpecification(),
- 10,
- subscriptionTracker.subscriptionProperties);
- if (!subscribeResponse.isOk()) {
- String message =
- "Subscription to stream "
- + subscriptionTracker.stream
- + " failed with code "
- + formatConstant(subscribeResponse.getResponseCode());
- LOGGER.debug(message);
- throw new StreamException(message);
- }
- } catch (RuntimeException e) {
- subscriptionTracker.assign((byte) -1, null);
- this.subscriptionTrackers = previousSubscriptions;
- streamToStreamSubscriptions
- .computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet())
- .remove(subscriptionTracker);
- throw e;
+ offsetSpecification =
+ offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
+
+ // TODO consider using/emulating ConsumerUpdateListener, to have only one API, not 2
+ // even when the consumer is not a SAC.
+ SubscriptionContext subscriptionContext =
+ new DefaultSubscriptionContext(offsetSpecification);
+ subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
+ LOGGER.info(
+ "Computed offset specification {}, offset specification used after subscription listener {}",
+ offsetSpecification,
+ subscriptionContext.offsetSpecification());
+
+ checkNotClosed();
+ // FIXME consider using fewer initial credits
+ byte subId = subscriptionId;
+ Client.Response subscribeResponse =
+ Utils.callAndMaybeRetry(
+ () ->
+ client.subscribe(
+ subId,
+ subscriptionTracker.stream,
+ subscriptionContext.offsetSpecification(),
+ 10,
+ subscriptionTracker.subscriptionProperties),
+ RETRY_ON_TIMEOUT,
+ "Subscribe request for consumer %s on stream '%s'",
+ subscriptionTracker.consumer.id(),
+ subscriptionTracker.stream);
+ if (!subscribeResponse.isOk()) {
+ String message =
+ "Subscription to stream "
+ + subscriptionTracker.stream
+ + " failed with code "
+ + formatConstant(subscribeResponse.getResponseCode());
+ LOGGER.debug(message);
+ throw convertCodeToException(
+ subscribeResponse.getResponseCode(), subscriptionTracker.stream, () -> message);
}
- LOGGER.debug("Subscribed to {}", subscriptionTracker.stream);
+ } catch (RuntimeException e) {
+ subscriptionTracker.assign((byte) -1, null);
+ this.setSubscriptionTrackers(previousSubscriptions);
+ streamToStreamSubscriptions
+ .computeIfAbsent(subscriptionTracker.stream, s -> ConcurrentHashMap.newKeySet())
+ .remove(subscriptionTracker);
+ maybeCleanStreamToStreamSubscriptions(subscriptionTracker.stream);
+ throw e;
}
+ subscriptionTracker.state(SubscriptionState.ACTIVE);
+ LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream);
}
- void remove(SubscriptionTracker subscriptionTracker) {
- synchronized (this.owner) {
+ private void maybeCleanStreamToStreamSubscriptions(String stream) {
+ this.streamToStreamSubscriptions.compute(
+ stream,
+ (s, trackers) -> {
+ if (trackers == null || trackers.isEmpty()) {
+ return null;
+ } else {
+ return trackers;
+ }
+ });
+ }
- // FIXME check manager is still open (not closed because of connection failure)
- byte subscriptionIdInClient = subscriptionTracker.subscriptionIdInClient;
- Client.Response unsubscribeResponse = client.unsubscribe(subscriptionIdInClient);
+ synchronized void remove(SubscriptionTracker subscriptionTracker) {
+ byte subscriptionIdInClient = subscriptionTracker.subscriptionIdInClient;
+ try {
+ Client.Response unsubscribeResponse =
+ Utils.callAndMaybeRetry(
+ () -> client.unsubscribe(subscriptionIdInClient),
+ RETRY_ON_TIMEOUT,
+ "Unsubscribe request for consumer %d on stream '%s'",
+ subscriptionTracker.consumer.id(),
+ subscriptionTracker.stream);
if (!unsubscribeResponse.isOk()) {
LOGGER.warn(
"Unexpected response code when unsubscribing from {}: {} (subscription ID {})",
@@ -810,20 +1001,27 @@ void remove(SubscriptionTracker subscriptionTracker) {
formatConstant(unsubscribeResponse.getResponseCode()),
subscriptionIdInClient);
}
- this.subscriptionTrackers = update(this.subscriptionTrackers, subscriptionIdInClient, null);
- streamToStreamSubscriptions.compute(
- subscriptionTracker.stream,
- (stream, subscriptionsForThisStream) -> {
- if (subscriptionsForThisStream == null || subscriptionsForThisStream.isEmpty()) {
- // should not happen
- return null;
- } else {
- subscriptionsForThisStream.remove(subscriptionTracker);
- return subscriptionsForThisStream.isEmpty() ? null : subscriptionsForThisStream;
- }
- });
- this.owner.maybeDisposeManager(this);
+ } catch (TimeoutStreamException e) {
+ LOGGER.debug(
+ "Reached timeout when trying to unsubscribe consumer {} from stream '{}'",
+ subscriptionTracker.consumer.id(),
+ subscriptionTracker.stream);
}
+
+ this.setSubscriptionTrackers(update(this.subscriptionTrackers, subscriptionIdInClient, null));
+ streamToStreamSubscriptions.compute(
+ subscriptionTracker.stream,
+ (stream, subscriptionsForThisStream) -> {
+ if (subscriptionsForThisStream == null || subscriptionsForThisStream.isEmpty()) {
+ // should not happen
+ return null;
+ } else {
+ subscriptionsForThisStream.remove(subscriptionTracker);
+ return subscriptionsForThisStream.isEmpty() ? null : subscriptionsForThisStream;
+ }
+ });
+ closeIfEmpty();
+ // this.owner.maybeDisposeManager(this);
}
private List update(
@@ -836,41 +1034,85 @@ private List update(
return newSubcriptions;
}
- synchronized boolean isFull() {
- return trackersCount() == maxConsumersByConnection;
+ private void setSubscriptionTrackers(List trackers) {
+ this.subscriptionTrackers = trackers;
+ this.trackerCount = (int) this.subscriptionTrackers.stream().filter(Objects::nonNull).count();
}
- synchronized boolean isEmpty() {
- return trackersCount() == 0;
+ boolean isFull() {
+ return this.trackerCount == maxConsumersByConnection;
}
- private synchronized int trackersCount() {
- return (int) this.subscriptionTrackers.stream().filter(Objects::nonNull).count();
+ boolean isEmpty() {
+ return this.trackerCount == 0;
}
- synchronized void close() {
- if (this.client != null && this.client.isOpen()) {
- subscriptionTrackers.stream()
- .filter(Objects::nonNull)
- .forEach(
- tracker -> {
- try {
- if (this.client != null && this.client.isOpen() && tracker.consumer.isOpen()) {
- this.client.unsubscribe(tracker.subscriptionIdInClient);
- }
- } catch (Exception e) {
- // OK, moving on
- }
- });
+ boolean isClosed() {
+ if (!this.client.isOpen()) {
+ this.close();
+ }
+ return this.closed.get();
+ }
- streamToStreamSubscriptions.clear();
- subscriptionTrackers.clear();
+ synchronized void closeIfEmpty() {
+ if (this.isEmpty()) {
+ this.close();
+ }
+ }
+ synchronized void close() {
+ if (this.closed.compareAndSet(false, true)) {
+ managers.remove(this);
+ LOGGER.debug("Closing consumer subscription manager on {}, id {}", this.name, this.id);
if (this.client != null && this.client.isOpen()) {
- this.client.close();
+ for (int i = 0; i < this.subscriptionTrackers.size(); i++) {
+ SubscriptionTracker tracker = this.subscriptionTrackers.get(i);
+ if (tracker != null) {
+ try {
+ if (this.client != null && this.client.isOpen() && tracker.consumer.isOpen()) {
+ this.client.unsubscribe(tracker.subscriptionIdInClient);
+ }
+ } catch (Exception e) {
+ // OK, moving on
+ LOGGER.debug(
+ "Error while unsubscribing from {}, registration {}",
+ tracker.stream,
+ tracker.subscriptionIdInClient);
+ }
+ this.subscriptionTrackers.set(i, null);
+ }
+ }
+
+ streamToStreamSubscriptions.clear();
+
+ if (this.client != null && this.client.isOpen()) {
+ this.client.close();
+ }
}
}
}
+
+ @Override
+ public int compareTo(ClientSubscriptionsManager o) {
+ return Long.compare(this.id, o.id);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientSubscriptionsManager that = (ClientSubscriptionsManager) o;
+ return id == that.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
}
private static final class DefaultSubscriptionContext implements SubscriptionContext {
@@ -906,4 +1148,14 @@ private static void maybeExchangeCommandVersions(Client client) {
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
}
}
+
+ private static final Predicate RETRY_ON_TIMEOUT =
+ e -> e instanceof TimeoutStreamException;
+
+ private static class ClientClosedException extends StreamException {
+
+ public ClientClosedException() {
+ super("Client already closed");
+ }
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java
index 546a1b5d96..913e679e94 100644
--- a/src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java
@@ -13,6 +13,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
import com.rabbitmq.stream.MessageHandler.Context;
@@ -83,36 +84,42 @@ Registration registerTrackingConsumer(
this.checkFuture =
this.executor()
.scheduleAtFixedRate(
- () -> {
- if (flushingOnGoing.compareAndSet(false, true)) {
- try {
- this.clock.setTime(System.nanoTime());
- Iterator iterator = trackers.iterator();
- while (iterator.hasNext()) {
- if (Thread.currentThread().isInterrupted()) {
- Thread.currentThread().interrupt();
- break;
- }
- Tracker t = iterator.next();
- if (t.consumer().isOpen()) {
- try {
- t.flushIfNecessary();
- } catch (Exception e) {
- LOGGER.info("Error while flushing tracker: {}", e.getMessage());
+ namedRunnable(
+ () -> {
+ if (flushingOnGoing.compareAndSet(false, true)) {
+ try {
+ this.clock.setTime(System.nanoTime());
+ LOGGER.debug(
+ "Background offset tracking flushing, {} tracker(s) to check",
+ this.trackers.size());
+ Iterator iterator = trackers.iterator();
+ while (iterator.hasNext()) {
+ if (Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ Tracker t = iterator.next();
+ if (t.consumer().isOpen()) {
+ try {
+ t.flushIfNecessary();
+ } catch (Exception e) {
+ LOGGER.info("Error while flushing tracker: {}", e.getMessage());
+ }
+ } else {
+ iterator.remove();
+ }
}
- } else {
- iterator.remove();
+ } finally {
+ flushingOnGoing.set(false);
}
- }
- } finally {
- flushingOnGoing.set(false);
- }
- // TODO consider cancelling the task if there are no more consumers to track
- // it should then be restarted on demand.
+ // TODO consider cancelling the task if there are no more consumers to
+ // track
+ // it should then be restarted on demand.
- }
- },
+ }
+ },
+ "Offset tracking background task"),
this.checkInterval.toMillis(),
this.checkInterval.toMillis(),
TimeUnit.MILLISECONDS);
diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
index 6e1f59ca43..9eda0b91d0 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
+// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -13,12 +13,21 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
+import static com.rabbitmq.stream.impl.Utils.jsonField;
+import static com.rabbitmq.stream.impl.Utils.namedFunction;
+import static com.rabbitmq.stream.impl.Utils.namedRunnable;
+import static com.rabbitmq.stream.impl.Utils.quote;
+import static java.util.stream.Collectors.toSet;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
+import com.rabbitmq.stream.StreamNotAvailableException;
+import com.rabbitmq.stream.impl.Client.Broker;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.MetadataListener;
import com.rabbitmq.stream.impl.Client.PublishConfirmListener;
import com.rabbitmq.stream.impl.Client.PublishErrorListener;
@@ -28,15 +37,21 @@
import com.rabbitmq.stream.impl.Utils.ClientFactory;
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,9 +63,13 @@ class ProducersCoordinator {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducersCoordinator.class);
private final StreamEnvironment environment;
private final ClientFactory clientFactory;
- private final Map pools = new ConcurrentHashMap<>();
private final int maxProducersByClient, maxTrackingConsumersByClient;
private final Function connectionNamingStrategy;
+ private final AtomicLong managerIdSequence = new AtomicLong(0);
+ private final NavigableSet managers = new ConcurrentSkipListSet<>();
+ private final AtomicLong trackerIdSequence = new AtomicLong(0);
+ private final boolean debug = true;
+ private final List producerTrackers = new CopyOnWriteArrayList<>();
ProducersCoordinator(
StreamEnvironment environment,
@@ -65,43 +84,113 @@ class ProducersCoordinator {
this.connectionNamingStrategy = connectionNamingStrategy;
}
- private static String keyForManagerPool(Client.Broker broker) {
- // FIXME make sure this is a reasonable key for brokers
+ private static String keyForNode(Client.Broker broker) {
return broker.getHost() + ":" + broker.getPort();
}
Runnable registerProducer(StreamProducer producer, String reference, String stream) {
- return registerAgentTracker(new ProducerTracker(reference, stream, producer), stream);
+ ProducerTracker tracker =
+ new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer);
+ if (debug) {
+ this.producerTrackers.add(tracker);
+ }
+ return registerAgentTracker(tracker, stream);
}
Runnable registerTrackingConsumer(StreamConsumer consumer) {
return registerAgentTracker(
- new TrackingConsumerTracker(consumer.stream(), consumer), consumer.stream());
+ new TrackingConsumerTracker(
+ trackerIdSequence.getAndIncrement(), consumer.stream(), consumer),
+ consumer.stream());
}
private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
- Client.Broker brokerForProducer = getBrokerForProducer(stream);
-
- String key = keyForManagerPool(brokerForProducer);
- ManagerPool pool =
- pools.computeIfAbsent(
- key,
- s ->
- new ManagerPool(
- key,
- environment
- .clientParametersCopy()
- .host(brokerForProducer.getHost())
- .port(brokerForProducer.getPort())));
-
- pool.add(tracker);
-
- return tracker::cancel;
+ Client.Broker broker = getBrokerForProducer(stream);
+
+ addToManager(broker, tracker);
+
+ if (debug) {
+ return () -> {
+ if (tracker instanceof ProducerTracker) {
+ try {
+ this.producerTrackers.remove(tracker);
+ } catch (Exception e) {
+ LOGGER.debug("Error while removing producer tracker from list");
+ }
+ }
+ tracker.cancel();
+ };
+ } else {
+ return tracker::cancel;
+ }
+ }
+
+ private void addToManager(Broker node, AgentTracker tracker) {
+ ClientParameters clientParameters =
+ environment.clientParametersCopy().host(node.getHost()).port(node.getPort());
+ ClientProducersManager pickedManager = null;
+ while (pickedManager == null) {
+ Iterator iterator = this.managers.iterator();
+ while (iterator.hasNext()) {
+ pickedManager = iterator.next();
+ if (pickedManager.isClosed()) {
+ iterator.remove();
+ pickedManager = null;
+ } else {
+ if (node.equals(pickedManager.node) && !pickedManager.isFullFor(tracker)) {
+ // let's try this one
+ break;
+ } else {
+ pickedManager = null;
+ }
+ }
+ }
+ if (pickedManager == null) {
+ String name = keyForNode(node);
+ LOGGER.debug("Creating producer manager on {}", name);
+ pickedManager = new ClientProducersManager(node, this.clientFactory, clientParameters);
+ LOGGER.debug("Created producer manager on {}, id {}", name, pickedManager.id);
+ }
+ try {
+ pickedManager.register(tracker);
+ LOGGER.debug(
+ "Assigned {} tracker {} (stream '{}') to manager {} (node {}), subscription ID {}",
+ tracker.type(),
+ tracker.uniqueId(),
+ tracker.stream(),
+ pickedManager.id,
+ pickedManager.name,
+ tracker.identifiable() ? tracker.id() : "N/A");
+ this.managers.add(pickedManager);
+ } catch (IllegalStateException e) {
+ pickedManager = null;
+ } catch (ConnectionStreamException | ClientClosedException | StreamNotAvailableException e) {
+ // manager connection is dead or stream not available
+ // scheduling manager closing if necessary in another thread to avoid blocking this one
+ if (pickedManager.isEmpty()) {
+ ClientProducersManager manager = pickedManager;
+ this.environment.execute(
+ () -> {
+ manager.closeIfEmpty();
+ },
+ "Producer manager closing after timeout, producer %d on stream '%s'",
+ tracker.uniqueId(),
+ tracker.stream());
+ }
+ throw e;
+ } catch (RuntimeException e) {
+ if (pickedManager != null) {
+ pickedManager.closeIfEmpty();
+ }
+ throw e;
+ }
+ }
}
private Client.Broker getBrokerForProducer(String stream) {
Map metadata =
- this.environment.locatorOperation(c -> c.metadata(stream));
+ this.environment.locatorOperation(
+ namedFunction(c -> c.metadata(stream), "Candidate lookup to publish to '%s'", stream));
if (metadata.size() == 0 || metadata.get(stream) == null) {
throw new StreamDoesNotExistException(stream);
}
@@ -127,44 +216,117 @@ private Client.Broker getBrokerForProducer(String stream) {
}
void close() {
- for (ManagerPool pool : pools.values()) {
- pool.close();
+ Iterator iterator = this.managers.iterator();
+ while (iterator.hasNext()) {
+ ClientProducersManager manager = iterator.next();
+ try {
+ iterator.remove();
+ manager.close();
+ } catch (Exception e) {
+ LOGGER.info(
+ "Error while closing manager {} connected to node {}: {}",
+ manager.id,
+ manager.name,
+ e.getMessage());
+ }
}
- pools.clear();
}
- int poolSize() {
- return pools.size();
+ int clientCount() {
+ return this.managers.size();
}
- int clientCount() {
- return pools.values().stream().map(pool -> pool.managers.size()).reduce(0, Integer::sum);
+ int nodesConnected() {
+ return this.managers.stream().map(m -> m.name).collect(toSet()).size();
}
@Override
public String toString() {
- return ("[ \n"
- + pools.entrySet().stream()
+ StringBuilder builder = new StringBuilder("{");
+ builder.append(jsonField("client_count", this.managers.size())).append(",");
+ builder
+ .append(
+ jsonField(
+ "producer_count", this.managers.stream().mapToInt(m -> m.producers.size()).sum()))
+ .append(",");
+ builder
+ .append(
+ jsonField(
+ "tracking_consumer_count",
+ this.managers.stream().mapToInt(m -> m.trackingConsumerTrackers.size()).sum()))
+ .append(",");
+ if (debug) {
+ builder.append(jsonField("producer_tracker_count", this.producerTrackers.size())).append(",");
+ }
+ builder.append(quote("clients")).append(" : [");
+ builder.append(
+ this.managers.stream()
.map(
- poolEntry ->
- " { \"broker\" : \""
- + poolEntry.getKey()
- + "\", \"client_count\" : "
- + poolEntry.getValue().managers.size()
- + ", \"clients\" : [ "
- + poolEntry.getValue().managers.stream()
- .map(
- manager ->
- "{ \"producer_count\" : "
- + manager.producers.size()
- + ", "
- + " \"tracking_consumer_count\" : "
- + manager.trackingConsumerTrackers.size()
- + " }")
- .collect(Collectors.joining(", "))
- + " ] }")
- .collect(Collectors.joining(", \n"))
- + "\n]");
+ m -> {
+ StringBuilder managerBuilder = new StringBuilder("{");
+ managerBuilder
+ .append(jsonField("id", m.id))
+ .append(",")
+ .append(jsonField("node", m.name))
+ .append(",")
+ .append(jsonField("producer_count", m.producers.size()))
+ .append(",")
+ .append(
+ jsonField("tracking_consumer_count", m.trackingConsumerTrackers.size()))
+ .append(",");
+ managerBuilder.append("\"producers\" : [");
+ managerBuilder.append(
+ m.producers.values().stream()
+ .map(
+ p -> {
+ StringBuilder producerBuilder = new StringBuilder("{");
+ producerBuilder.append(jsonField("stream", p.stream())).append(",");
+ producerBuilder.append(jsonField("producer_id", p.publisherId));
+ return producerBuilder.append("}").toString();
+ })
+ .collect(Collectors.joining(",")));
+ managerBuilder.append("],");
+ managerBuilder.append("\"tracking_consumers\" : [");
+ managerBuilder.append(
+ m.trackingConsumerTrackers.stream()
+ .map(
+ t -> {
+ StringBuilder trackerBuilder = new StringBuilder("{");
+ trackerBuilder.append(jsonField("stream", t.stream()));
+ return trackerBuilder.append("}").toString();
+ })
+ .collect(Collectors.joining(",")));
+ managerBuilder.append("]");
+ return managerBuilder.append("}").toString();
+ })
+ .collect(Collectors.joining(",")));
+ builder.append("]");
+ if (debug) {
+ builder.append(",");
+ builder.append("\"producer_trackers\" : [");
+ builder.append(
+ this.producerTrackers.stream()
+ .map(
+ t -> {
+ StringBuilder b = new StringBuilder("{");
+ b.append(quote("stream")).append(":").append(quote(t.stream)).append(",");
+ b.append(quote("node")).append(":");
+ Client client = null;
+ ClientProducersManager manager = t.clientProducersManager;
+ if (manager != null) {
+ client = manager.client;
+ }
+ if (client == null) {
+ b.append("null");
+ } else {
+ b.append(quote(client.getHost() + ":" + client.getPort()));
+ }
+ return b.append("}").toString();
+ })
+ .collect(Collectors.joining(",")));
+ builder.append("]");
+ }
+ return builder.append("}").toString();
}
private interface AgentTracker {
@@ -188,17 +350,27 @@ private interface AgentTracker {
String reference();
boolean isOpen();
+
+ long uniqueId();
+
+ String type();
+
+ boolean markRecoveryInProgress();
}
private static class ProducerTracker implements AgentTracker {
+ private final long uniqueId;
private final String reference;
private final String stream;
private final StreamProducer producer;
private volatile byte publisherId;
private volatile ClientProducersManager clientProducersManager;
+ private final AtomicBoolean recovering = new AtomicBoolean(false);
- private ProducerTracker(String reference, String stream, StreamProducer producer) {
+ private ProducerTracker(
+ long uniqueId, String reference, String stream, StreamProducer producer) {
+ this.uniqueId = uniqueId;
this.reference = reference;
this.stream = stream;
this.producer = producer;
@@ -245,6 +417,7 @@ public void unavailable() {
@Override
public void running() {
this.producer.running();
+ this.recovering.set(false);
}
@Override
@@ -263,15 +436,33 @@ public void closeAfterStreamDeletion(short code) {
public boolean isOpen() {
return producer.isOpen();
}
+
+ @Override
+ public long uniqueId() {
+ return this.uniqueId;
+ }
+
+ @Override
+ public String type() {
+ return "producer";
+ }
+
+ @Override
+ public boolean markRecoveryInProgress() {
+ return this.recovering.compareAndSet(false, true);
+ }
}
private static class TrackingConsumerTracker implements AgentTracker {
+ private final long uniqueId;
private final String stream;
private final StreamConsumer consumer;
private volatile ClientProducersManager clientProducersManager;
+ private final AtomicBoolean recovering = new AtomicBoolean(false);
- private TrackingConsumerTracker(String stream, StreamConsumer consumer) {
+ private TrackingConsumerTracker(long uniqueId, String stream, StreamConsumer consumer) {
+ this.uniqueId = uniqueId;
this.stream = stream;
this.consumer = consumer;
}
@@ -315,6 +506,7 @@ public void unavailable() {
@Override
public void running() {
this.consumer.running();
+ this.recovering.set(false);
}
@Override
@@ -334,78 +526,41 @@ public void closeAfterStreamDeletion(short code) {
public boolean isOpen() {
return this.consumer.isOpen();
}
- }
-
- private class ManagerPool {
-
- private final List managers = new CopyOnWriteArrayList<>();
- private final String name;
- private final Client.ClientParameters clientParameters;
-
- private ManagerPool(String name, Client.ClientParameters clientParameters) {
- this.name = name;
- this.clientParameters = clientParameters;
- this.managers.add(new ClientProducersManager(this, clientFactory, clientParameters));
- }
-
- private synchronized void add(AgentTracker producerTracker) {
- boolean added = false;
- // FIXME deal with state unavailability (state may be closing because of connection closing)
- // try all of them until it succeeds, throw exception if failure
- for (ClientProducersManager manager : this.managers) {
- if (!manager.isFullFor(producerTracker)) {
- manager.register(producerTracker);
- added = true;
- break;
- }
- }
- if (!added) {
- LOGGER.debug(
- "Creating producers tracker on {}, this is subscription state #{}",
- name,
- managers.size() + 1);
- ClientProducersManager manager =
- new ClientProducersManager(this, clientFactory, clientParameters);
- this.managers.add(manager);
- manager.register(producerTracker);
- }
- }
- synchronized void maybeDisposeManager(ClientProducersManager manager) {
- if (manager.isEmpty()) {
- manager.close();
- this.remove(manager);
- }
+ @Override
+ public long uniqueId() {
+ return this.uniqueId;
}
- private synchronized void remove(ClientProducersManager manager) {
- this.managers.remove(manager);
- if (this.managers.isEmpty()) {
- pools.remove(this.name);
- }
+ @Override
+ public String type() {
+ return "tracking consumer";
}
- synchronized void close() {
- for (ClientProducersManager manager : managers) {
- manager.close();
- }
- managers.clear();
+ @Override
+ public boolean markRecoveryInProgress() {
+ return this.recovering.compareAndSet(false, true);
}
}
- private class ClientProducersManager {
+ private class ClientProducersManager implements Comparable {
+ private final long id;
+ private final String name;
+ private final Broker node;
private final ConcurrentMap producers =
new ConcurrentHashMap<>(maxProducersByClient);
private final Set trackingConsumerTrackers =
ConcurrentHashMap.newKeySet(maxTrackingConsumersByClient);
private final Map> streamToTrackers = new ConcurrentHashMap<>();
private final Client client;
- private final ManagerPool owner;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private ClientProducersManager(
- ManagerPool owner, ClientFactory cf, Client.ClientParameters clientParameters) {
- this.owner = owner;
+ Broker node, ClientFactory cf, Client.ClientParameters clientParameters) {
+ this.id = managerIdSequence.getAndIncrement();
+ this.name = keyForNode(node);
+ this.node = node;
AtomicReference ref = new AtomicReference<>();
AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
PublishConfirmListener publishConfirmListener =
@@ -431,14 +586,10 @@ private ClientProducersManager(
};
ShutdownListener shutdownListener =
shutdownContext -> {
- // we may be closing the client because it's not the right node, so the manager
- // should not be removed from its pool, because it's not really in it already
- if (clientInitializedInManager.get()) {
- owner.remove(this);
- }
+ managers.remove(this);
if (shutdownContext.isShutdownUnexpected()) {
LOGGER.debug(
- "Recovering {} producers after unexpected connection termination",
+ "Recovering {} producer(s) after unexpected connection termination",
producers.size());
producers.forEach((publishingId, tracker) -> tracker.unavailable());
trackingConsumerTrackers.forEach(AgentTracker::unavailable);
@@ -446,24 +597,34 @@ private ClientProducersManager(
environment
.scheduledExecutorService()
.execute(
- () -> {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- streamToTrackers.forEach(
- (stream, trackers) -> {
- if (!Thread.currentThread().isInterrupted()) {
- assignProducersToNewManagers(
- trackers, stream, environment.recoveryBackOffDelayPolicy());
- }
- });
- });
+ namedRunnable(
+ () -> {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ streamToTrackers.forEach(
+ (stream, trackers) -> {
+ if (!Thread.currentThread().isInterrupted()) {
+ assignProducersToNewManagers(
+ trackers, stream, environment.recoveryBackOffDelayPolicy());
+ }
+ });
+ },
+ "Producer recovery after disconnection from %s",
+ name));
}
};
MetadataListener metadataListener =
(stream, code) -> {
+ LOGGER.debug(
+ "Received metadata notification for '{}', stream is likely to have become unavailable",
+ stream);
+ Set affectedTrackers;
synchronized (ClientProducersManager.this) {
- Set affectedTrackers = streamToTrackers.remove(stream);
+ affectedTrackers = streamToTrackers.remove(stream);
+ LOGGER.debug(
+ "Affected publishers and consumer trackers after metadata update: {}",
+ affectedTrackers == null ? 0 : affectedTrackers.size());
if (affectedTrackers != null && !affectedTrackers.isEmpty()) {
affectedTrackers.forEach(
tracker -> {
@@ -474,23 +635,28 @@ private ClientProducersManager(
trackingConsumerTrackers.remove(tracker);
}
});
- environment
- .scheduledExecutorService()
- .execute(
- () -> {
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
- // close manager if no more trackers for it
- // needs to be done in another thread than the IO thread
- this.owner.maybeDisposeManager(this);
- assignProducersToNewManagers(
- affectedTrackers,
- stream,
- environment.topologyUpdateBackOffDelayPolicy());
- });
}
}
+ if (affectedTrackers != null && !affectedTrackers.isEmpty()) {
+ environment
+ .scheduledExecutorService()
+ .execute(
+ namedRunnable(
+ () -> {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ // close manager if no more trackers for it
+ // needs to be done in another thread than the IO thread
+ closeIfEmpty();
+ assignProducersToNewManagers(
+ affectedTrackers,
+ stream,
+ environment.topologyUpdateBackOffDelayPolicy());
+ },
+ "Producer re-assignment after metadata update on stream '%s'",
+ stream));
+ }
};
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.PRODUCER);
ClientFactoryContext connectionFactoryContext =
@@ -501,7 +667,7 @@ private ClientProducersManager(
.shutdownListener(shutdownListener)
.metadataListener(metadataListener)
.clientProperty("connection_name", connectionName))
- .key(owner.name);
+ .key(name);
this.client = cf.client(connectionFactoryContext);
LOGGER.debug("Created producer connection '{}'", connectionName);
clientInitializedInManager.set(true);
@@ -518,48 +684,16 @@ private void assignProducersToNewManagers(
.build()
.thenAccept(
broker -> {
- String key = keyForManagerPool(broker);
+ String key = keyForNode(broker);
LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key);
-
- trackers.forEach(
- tracker -> {
- try {
- if (tracker.isOpen()) {
- // we create the pool only if necessary
- ManagerPool pool =
- pools.computeIfAbsent(
- key,
- s ->
- new ManagerPool(
- key,
- environment
- .clientParametersCopy()
- .host(broker.getHost())
- .port(broker.getPort())));
- pool.add(tracker);
- tracker.running();
- } else {
- LOGGER.debug("Not re-assigning producer because it has been closed");
- }
- } catch (Exception e) {
- LOGGER.info(
- "Error while re-assigning producer {} to {}: {}. Moving on.",
- tracker.identifiable() ? tracker.id() : "(tracking consumer)",
- key,
- e.getMessage());
- }
- });
+ trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
})
.exceptionally(
ex -> {
- LOGGER.info("Error while re-assigning producers: {}", ex.getMessage());
+ LOGGER.info(
+ "Error while re-assigning producers and consumer trackers, closing them: {}",
+ ex.getMessage());
for (AgentTracker tracker : trackers) {
- // FIXME what to do with tracking consumers after a timeout?
- // here they are left as "unavailable" and not, meaning they will not be
- // able to store. Yet recovery mechanism could try to reconnect them, but
- // that seems far-fetched (the first recovery already failed). They could
- // be put in a state whereby they refuse all new store commands and inform
- // with an exception they should be restarted.
try {
short code;
if (ex instanceof StreamDoesNotExistException
@@ -577,15 +711,93 @@ private void assignProducersToNewManagers(
});
}
+ private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
+ if (tracker.markRecoveryInProgress()) {
+ try {
+ recoverAgent(broker, tracker);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Error while recovering {} tracker {} (stream '{}'). Reason: {}",
+ tracker.type(),
+ tracker.uniqueId(),
+ tracker.stream(),
+ Utils.exceptionMessage(e));
+ }
+ } else {
+ LOGGER.debug(
+ "Not recovering {} (stream '{}'), recovery is already is progress",
+ tracker.type(),
+ tracker.stream());
+ }
+ }
+
+ private void recoverAgent(Broker node, AgentTracker tracker) {
+ boolean reassignmentCompleted = false;
+ while (!reassignmentCompleted) {
+ try {
+ if (tracker.isOpen()) {
+ LOGGER.debug(
+ "Using {} to resume {} to {}", node.label(), tracker.type(), tracker.stream());
+ addToManager(node, tracker);
+ tracker.running();
+ } else {
+ LOGGER.debug(
+ "Not recovering {} (stream '{}') because it has been closed",
+ tracker.type(),
+ tracker.stream());
+ }
+ reassignmentCompleted = true;
+ } catch (ConnectionStreamException
+ | ClientClosedException
+ | StreamNotAvailableException e) {
+ LOGGER.debug(
+ "{} re-assignment on stream {} timed out or connection closed or stream not available, "
+ + "refreshing candidate leader and retrying",
+ tracker.type(),
+ tracker.id(),
+ tracker.stream());
+ // maybe not a good candidate, let's refresh and retry for this one
+ node =
+ Utils.callAndMaybeRetry(
+ () -> getBrokerForProducer(tracker.stream()),
+ ex -> !(ex instanceof StreamDoesNotExistException),
+ environment.recoveryBackOffDelayPolicy(),
+ "Candidate lookup for %s on stream '%s'",
+ tracker.type(),
+ tracker.stream());
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
+ reassignmentCompleted = true;
+ }
+ }
+ }
+
private synchronized void register(AgentTracker tracker) {
+ if (this.isFullFor(tracker)) {
+ throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
+ }
+ if (this.isClosed()) {
+ throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
+ }
+ checkNotClosed();
if (tracker.identifiable()) {
ProducerTracker producerTracker = (ProducerTracker) tracker;
// using the next available slot
for (int i = 0; i < maxProducersByClient; i++) {
ProducerTracker previousValue = producers.putIfAbsent((byte) i, producerTracker);
if (previousValue == null) {
+ this.checkNotClosed();
+ int index = i;
Response response =
- this.client.declarePublisher((byte) i, tracker.reference(), tracker.stream());
+ callAndMaybeRetry(
+ () ->
+ this.client.declarePublisher(
+ (byte) index, tracker.reference(), tracker.stream()),
+ RETRY_ON_TIMEOUT,
+ "Declare publisher request for publisher %d on stream '%s'",
+ producerTracker.uniqueId(),
+ producerTracker.stream());
if (response.isOk()) {
tracker.assign((byte) i, this.client, this);
} else {
@@ -604,13 +816,14 @@ private synchronized void register(AgentTracker tracker) {
tracker.assign((byte) 0, this.client, this);
trackingConsumerTrackers.add(tracker);
}
-
streamToTrackers
.computeIfAbsent(tracker.stream(), s -> ConcurrentHashMap.newKeySet())
.add(tracker);
}
private synchronized void unregister(AgentTracker tracker) {
+ LOGGER.debug(
+ "Unregistering {} {} from manager on {}", tracker.type(), tracker.uniqueId(), this.name);
if (tracker.identifiable()) {
producers.remove(tracker.id());
} else {
@@ -627,7 +840,7 @@ private synchronized void unregister(AgentTracker tracker) {
return trackersForThisStream.isEmpty() ? null : trackersForThisStream;
}
});
- this.owner.maybeDisposeManager(this);
+ closeIfEmpty();
}
synchronized boolean isFullFor(AgentTracker tracker) {
@@ -642,14 +855,74 @@ synchronized boolean isEmpty() {
return producers.isEmpty() && trackingConsumerTrackers.isEmpty();
}
+ private void checkNotClosed() {
+ if (!this.client.isOpen()) {
+ throw new ClientClosedException();
+ }
+ }
+
+ boolean isClosed() {
+ if (!this.client.isOpen()) {
+ this.close();
+ }
+ return this.closed.get();
+ }
+
+ private void closeIfEmpty() {
+ if (!closed.get()) {
+ synchronized (this) {
+ if (this.isEmpty()) {
+ this.close();
+ } else {
+ LOGGER.debug("Not closing producer manager {} because it is not empty", this.id);
+ }
+ }
+ }
+ }
+
private void close() {
- try {
- if (this.client.isOpen()) {
- this.client.close();
+ if (closed.compareAndSet(false, true)) {
+ managers.remove(this);
+ try {
+ if (this.client.isOpen()) {
+ this.client.close();
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Error while closing client producer connection: ", e.getMessage());
}
- } catch (Exception e) {
- // ok
}
}
+
+ @Override
+ public int compareTo(ClientProducersManager o) {
+ return Long.compare(this.id, o.id);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientProducersManager that = (ClientProducersManager) o;
+ return id == that.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+ }
+
+ private static final Predicate RETRY_ON_TIMEOUT =
+ e -> e instanceof TimeoutStreamException;
+
+ private static class ClientClosedException extends StreamException {
+
+ public ClientClosedException() {
+ super("Client already closed");
+ }
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/ScheduledExecutorServiceWrapper.java b/src/main/java/com/rabbitmq/stream/impl/ScheduledExecutorServiceWrapper.java
new file mode 100644
index 0000000000..87463c9aed
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/ScheduledExecutorServiceWrapper.java
@@ -0,0 +1,235 @@
+// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import io.micrometer.core.instrument.util.NamedThreadFactory;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ScheduledExecutorServiceWrapper implements ScheduledExecutorService {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ScheduledExecutorServiceWrapper.class);
+ private final ScheduledExecutorService delegate;
+ private final Set tasks = ConcurrentHashMap.newKeySet();
+ private final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory("rabbitmq-stream-scheduled-executor-service-wrapper-"));
+
+ ScheduledExecutorServiceWrapper(ScheduledExecutorService delegate) {
+ this.delegate = delegate;
+ Duration period = Duration.ofSeconds(10);
+ this.scheduler.scheduleAtFixedRate(
+ () -> {
+ LOGGER.debug("Background scheduled task check, {} task(s) submitted", this.tasks.size());
+ try {
+ long now = System.nanoTime();
+ Duration warningTimeout = Duration.ofSeconds(60);
+ int cleanedCount = 0;
+ Iterator iterator = this.tasks.iterator();
+ while (iterator.hasNext()) {
+ Task task = iterator.next();
+ if (task.isCompleted()) {
+ iterator.remove();
+ cleanedCount++;
+ } else {
+ Duration elapsed = task.elapsedTime(now);
+ if (elapsed.compareTo(warningTimeout) > 0) {
+ LOGGER.debug(
+ "Warning: task {} has been running for {} second(s)",
+ task.description,
+ elapsed.getSeconds());
+ }
+ }
+ }
+ LOGGER.debug("{} completed task(s) cleaned", cleanedCount);
+ } catch (Exception e) {
+ LOGGER.debug("Error during background scheduled task check", e.getMessage());
+ }
+ },
+ period.toMillis(),
+ period.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void task(Runnable command, Future> future) {
+ task(command.toString(), future);
+ }
+
+ private void task(Callable> callable, Future> future) {
+ task(callable.toString(), future);
+ }
+
+ private void task(String description, Future> future) {
+ this.tasks.add(new Task(description, future));
+ }
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ ScheduledFuture> future = this.delegate.schedule(command, delay, unit);
+ task(command, future);
+ return future;
+ }
+
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ ScheduledFuture future = this.delegate.schedule(callable, delay, unit);
+ task(callable, future);
+ return future;
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ // we don't track these, because they are expected to run for a long time
+ LOGGER.debug("Registering scheduled at fixed rate task '{}'", command.toString());
+ return this.delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ // we don't track these, because they are expected to run for a long time
+ LOGGER.debug("Registering scheduled with fixed delay task '%s'", command.toString());
+ return this.delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ this.delegate.shutdown();
+ }
+
+ @Override
+ public List shutdownNow() {
+ return this.delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return this.delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return this.delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return this.delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ Future future = this.delegate.submit(task);
+ task(task, future);
+ return future;
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ Future future = this.delegate.submit(task, result);
+ task(task, future);
+ return future;
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ Future> future = this.delegate.submit(task);
+ task(task, future);
+ return future;
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks)
+ throws InterruptedException {
+ List extends Callable> taskList = new ArrayList<>(tasks);
+ List> futures = this.delegate.invokeAll(taskList);
+ for (int i = 0; i < taskList.size(); i++) {
+ task(taskList.get(i), futures.get(i));
+ }
+ return futures;
+ }
+
+ @Override
+ public List> invokeAll(
+ Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ List extends Callable> taskList = new ArrayList<>(tasks);
+ List> futures = this.delegate.invokeAll(taskList, timeout, unit);
+ for (int i = 0; i < taskList.size(); i++) {
+ task(taskList.get(i), futures.get(i));
+ }
+ return futures;
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ Callable callable =
+ () -> {
+ command.run();
+ return null;
+ };
+ Future future = this.delegate.submit(callable);
+ task(command, future);
+ }
+
+ static class Task {
+
+ private final Future> future;
+ private final String description;
+ private final long start;
+
+ Task(String description, Future> future) {
+ this.description = description;
+ this.future = future;
+ this.start = System.nanoTime();
+ }
+
+ boolean isCompleted() {
+ return this.future.isDone();
+ }
+
+ Duration elapsedTime(long now) {
+ if (now - start < 0) {
+ return Duration.ZERO;
+ } else {
+ return Duration.ofNanos(now - start);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
index 78d785e1f0..867c41bddc 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
@@ -13,10 +13,11 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
+import static java.time.Duration.ofMillis;
-import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerUpdateListener;
@@ -30,7 +31,6 @@
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
-import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -283,7 +283,7 @@ static long getStoredOffsetSafely(StreamConsumer consumer, StreamEnvironment env
.scheduler(environment.scheduledExecutorService())
.retry(ex -> ex instanceof IllegalStateException)
.delayPolicy(
- BackOffDelayPolicy.fixedWithInitialDelay(
+ fixedWithInitialDelay(
environment.recoveryBackOffDelayPolicy().delay(0),
environment.recoveryBackOffDelayPolicy().delay(1),
environment.recoveryBackOffDelayPolicy().delay(0).multipliedBy(3)))
@@ -346,9 +346,7 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
.description(
"Last stored offset for consumer %s on stream %s must be %d",
this.name, this.stream, expectedStoredOffset)
- .delayPolicy(
- BackOffDelayPolicy.fixedWithInitialDelay(
- Duration.ofMillis(200), Duration.ofMillis(200)))
+ .delayPolicy(fixedWithInitialDelay(ofMillis(200), ofMillis(200)))
.retry(exception -> exception instanceof IllegalStateException)
.scheduler(environment.scheduledExecutorService())
.build();
@@ -498,7 +496,7 @@ boolean isOpen() {
return !this.closed.get();
}
- synchronized void setTrackingClient(Client client) {
+ void setTrackingClient(Client client) {
this.trackingClient = client;
}
@@ -529,8 +527,8 @@ long storedOffset(Supplier clientSupplier) {
} catch (Exception e) {
throw new IllegalStateException(
String.format(
- "Not possible to query offset for consumer %s on stream %s for now",
- this.name, this.stream),
+ "Not possible to query offset for consumer %s on stream %s for now: %s",
+ this.name, this.stream, e.getMessage()),
e);
}
if (response.isOk()) {
@@ -554,8 +552,8 @@ long storedOffset(Supplier clientSupplier) {
} else {
throw new IllegalStateException(
String.format(
- "Not possible to query offset for consumer %s on stream %s for now",
- this.name, this.stream));
+ "Not possible to query offset for consumer %s on stream %s for now, consumer status is %s",
+ this.name, this.stream, this.status.name()));
}
}
@@ -618,4 +616,8 @@ private void checkNotClosed() {
throw new IllegalStateException("This producer instance has been closed");
}
}
+
+ long id() {
+ return this.id;
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index 8332fab8a3..38685c1b3e 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -13,8 +13,10 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
+import static com.rabbitmq.stream.impl.Utils.exceptionMessage;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
-import static com.rabbitmq.stream.impl.Utils.propagateException;
+import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.rabbitmq.stream.Address;
@@ -34,6 +36,7 @@
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.Client.ClientParameters;
+import com.rabbitmq.stream.impl.Client.ShutdownListener;
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
@@ -47,10 +50,11 @@
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Random;
+import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -74,8 +78,6 @@ class StreamEnvironment implements Environment {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamEnvironment.class);
- private final Random random = new Random();
-
private final EventLoopGroup eventLoopGroup;
private final ScheduledExecutorService scheduledExecutorService;
private final boolean privateScheduleExecutorService;
@@ -94,9 +96,9 @@ class StreamEnvironment implements Environment {
private final Clock clock = new Clock();
private final ScheduledFuture> clockRefreshFuture;
private final ByteBufAllocator byteBufAllocator;
- private final AtomicBoolean locatorInitialized = new AtomicBoolean(false);
+ private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
private final Runnable locatorInitializationSequence;
- private volatile Client locator;
+ private final List locators = new CopyOnWriteArrayList<>();
StreamEnvironment(
ScheduledExecutorService scheduledExecutorService,
@@ -188,6 +190,8 @@ class StreamEnvironment implements Environment {
.collect(Collectors.toList());
}
+ this.addresses.forEach(address -> this.locators.add(new Locator(address)));
+
if (clientParametersPrototype.eventLoopGroup == null) {
this.eventLoopGroup = new NioEventLoopGroup();
this.clientParametersPrototype =
@@ -199,14 +203,17 @@ class StreamEnvironment implements Environment {
.duplicate()
.eventLoopGroup(clientParametersPrototype.eventLoopGroup);
}
+ ScheduledExecutorService executorService;
if (scheduledExecutorService == null) {
- this.scheduledExecutorService =
+ executorService =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
this.privateScheduleExecutorService = true;
} else {
- this.scheduledExecutorService = scheduledExecutorService;
+ executorService = scheduledExecutorService;
this.privateScheduleExecutorService = false;
}
+ // TODO remove executor wrapper (it's here just for debugging)
+ this.scheduledExecutorService = new ScheduledExecutorServiceWrapper(executorService);
this.producersCoordinator =
new ProducersCoordinator(
@@ -222,50 +229,13 @@ class StreamEnvironment implements Environment {
connectionNamingStrategy,
Utils.coordinatorClientFactory(this));
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
-
- AtomicReference shutdownListenerReference = new AtomicReference<>();
- Client.ShutdownListener shutdownListener =
- shutdownContext -> {
- if (shutdownContext.isShutdownUnexpected()) {
- this.locator = null;
- LOGGER.debug("Unexpected locator disconnection, trying to reconnect");
- Client.ClientParameters newLocatorParameters =
- this.clientParametersPrototype
- .duplicate()
- .shutdownListener(shutdownListenerReference.get());
- AsyncRetry.asyncRetry(
- () -> {
- Address address =
- addresses.size() == 1
- ? addresses.get(0)
- : addresses.get(random.nextInt(addresses.size()));
- address = addressResolver.resolve(address);
- LOGGER.debug("Trying to reconnect locator on {}", address);
- String connectionName =
- connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
- Client newLocator =
- clientFactory.apply(
- newLocatorParameters
- .host(address.host())
- .port(address.port())
- .clientProperty("connection_name", connectionName));
- LOGGER.debug("Created locator connection '{}'", connectionName);
- LOGGER.debug("Locator connected on {}", address);
- return newLocator;
- })
- .description("Locator recovery")
- .scheduler(this.scheduledExecutorService)
- .delayPolicy(recoveryBackOffDelayPolicy)
- .build()
- .thenAccept(newLocator -> this.locator = newLocator);
- }
- };
- shutdownListenerReference.set(shutdownListener);
ClientParameters clientParametersForInit = clientParametersPrototype.duplicate();
Runnable locatorInitSequence =
() -> {
RuntimeException lastException = null;
- for (Address address : addresses) {
+ for (int i = 0; i < addresses.size(); i++) {
+ Address address = addresses.get(i);
+ Locator locator = locator(i);
address = addressResolver.resolve(address);
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
Client.ClientParameters locatorParameters =
@@ -274,26 +244,34 @@ class StreamEnvironment implements Environment {
.host(address.host())
.port(address.port())
.clientProperty("connection_name", connectionName)
- .shutdownListener(shutdownListenerReference.get());
+ .shutdownListener(
+ shutdownListener(locator, connectionNamingStrategy, clientFactory));
try {
- this.locator = clientFactory.apply(locatorParameters);
+ Client client = clientFactory.apply(locatorParameters);
+ locator.client(client);
LOGGER.debug("Created locator connection '{}'", connectionName);
LOGGER.debug("Locator connected to {}", address);
- break;
} catch (RuntimeException e) {
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
lastException = e;
}
}
- if (this.locator == null) {
+ if (this.locators.stream().allMatch(l -> l.isNotSet())) {
throw lastException;
+ } else {
+ this.locators.forEach(
+ l -> {
+ if (l.isNotSet()) {
+ scheduleLocatorConnection(l, connectionNamingStrategy, clientFactory);
+ }
+ });
}
};
if (lazyInit) {
this.locatorInitializationSequence = locatorInitSequence;
} else {
locatorInitSequence.run();
- locatorInitialized.set(true);
+ locatorsInitialized.set(true);
this.locatorInitializationSequence = () -> {};
}
this.codec =
@@ -302,7 +280,112 @@ class StreamEnvironment implements Environment {
: clientParametersPrototype.codec();
this.clockRefreshFuture =
this.scheduledExecutorService.scheduleAtFixedRate(
- () -> this.clock.refresh(), 1, 1, SECONDS);
+ Utils.namedRunnable(() -> this.clock.refresh(), "Background clock refresh"),
+ 1,
+ 1,
+ SECONDS);
+ }
+
+ private ShutdownListener shutdownListener(
+ Locator locator,
+ Function connectionNamingStrategy,
+ Function clientFactory) {
+ AtomicReference shutdownListenerReference = new AtomicReference<>();
+ Client.ShutdownListener shutdownListener =
+ shutdownContext -> {
+ if (shutdownContext.isShutdownUnexpected()) {
+ locator.client(null);
+ LOGGER.debug("Unexpected locator disconnection, trying to reconnect");
+ try {
+ Client.ClientParameters newLocatorParameters =
+ this.clientParametersPrototype
+ .duplicate()
+ .shutdownListener(shutdownListenerReference.get());
+ AsyncRetry.asyncRetry(
+ () -> {
+ LOGGER.debug("Locator reconnection...");
+ Address resolvedAddress = addressResolver.resolve(locator.address());
+ String connectionName =
+ connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
+ LOGGER.debug(
+ "Trying to reconnect locator on {}, with client connection name '{}'",
+ resolvedAddress,
+ connectionName);
+ Client newLocator =
+ clientFactory.apply(
+ newLocatorParameters
+ .host(resolvedAddress.host())
+ .port(resolvedAddress.port())
+ .clientProperty("connection_name", connectionName));
+ LOGGER.debug("Created locator connection '{}'", connectionName);
+ LOGGER.debug("Locator connected on {}", resolvedAddress);
+ return newLocator;
+ })
+ .description("Locator recovery")
+ .scheduler(this.scheduledExecutorService)
+ .delayPolicy(recoveryBackOffDelayPolicy)
+ .build()
+ .thenAccept(newClient -> locator.client(newClient))
+ .exceptionally(
+ ex -> {
+ LOGGER.debug("Locator recovery failed", ex);
+ return null;
+ });
+ } catch (Exception e) {
+ LOGGER.debug("Error while scheduling locator reconnection", e);
+ }
+ }
+ };
+ shutdownListenerReference.set(shutdownListener);
+ return shutdownListener;
+ }
+
+ private void scheduleLocatorConnection(
+ Locator locator,
+ Function connectionNamingStrategy,
+ Function clientFactory) {
+ ShutdownListener shutdownListener =
+ shutdownListener(locator, connectionNamingStrategy, clientFactory);
+ try {
+ Client.ClientParameters newLocatorParameters =
+ this.clientParametersPrototype.duplicate().shutdownListener(shutdownListener);
+ AsyncRetry.asyncRetry(
+ () -> {
+ LOGGER.debug("Locator reconnection...");
+ Address resolvedAddress = addressResolver.resolve(locator.address());
+ String connectionName =
+ connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
+ LOGGER.debug(
+ "Trying to reconnect locator on {}, with client connection name '{}'",
+ resolvedAddress,
+ connectionName);
+ Client newLocator =
+ clientFactory.apply(
+ newLocatorParameters
+ .host(resolvedAddress.host())
+ .port(resolvedAddress.port())
+ .clientProperty("connection_name", connectionName));
+ LOGGER.debug("Created locator connection '{}'", connectionName);
+ LOGGER.debug("Locator connected on {}", resolvedAddress);
+ return newLocator;
+ })
+ .description("Locator recovery")
+ .scheduler(this.scheduledExecutorService)
+ .delayPolicy(recoveryBackOffDelayPolicy)
+ .build()
+ .thenAccept(newClient -> locator.client(newClient))
+ .exceptionally(
+ ex -> {
+ LOGGER.debug("Locator recovery failed", ex);
+ return null;
+ });
+ } catch (Exception e) {
+ LOGGER.debug("Error while scheduling locator reconnection", e);
+ }
+ }
+
+ private Locator locator(int i) {
+ return this.locators.get(i);
}
private static String uriDecode(String s) {
@@ -361,11 +444,11 @@ public ByteBufAllocator byteBufAllocator() {
}
void maybeInitializeLocator() {
- if (this.locatorInitialized.compareAndSet(false, true)) {
+ if (this.locatorsInitialized.compareAndSet(false, true)) {
try {
this.locatorInitializationSequence.run();
} catch (RuntimeException e) {
- this.locatorInitialized.set(false);
+ this.locatorsInitialized.set(false);
throw e;
}
}
@@ -398,14 +481,17 @@ public StreamStats queryStreamStats(String stream) {
checkNotClosed();
StreamStatsResponse response =
locatorOperation(
- client -> {
- if (Utils.is3_11_OrMore(client.brokerVersion())) {
- return client.streamStats(stream);
- } else {
- throw new UnsupportedOperationException(
- "QueryStringInfo is available only for RabbitMQ 3.11 or more.");
- }
- });
+ Utils.namedFunction(
+ client -> {
+ if (Utils.is3_11_OrMore(client.brokerVersion())) {
+ return client.streamStats(stream);
+ } else {
+ throw new UnsupportedOperationException(
+ "QueryStringInfo is available only for RabbitMQ 3.11 or more.");
+ }
+ },
+ "Query stream stats on stream '%s'",
+ stream));
if (response.isOk()) {
Map info = response.getInfo();
BiFunction offsetSupplierLogic =
@@ -432,7 +518,13 @@ public StreamStats queryStreamStats(String stream) {
"committed_chunk_id", "No committed chunk ID for stream " + stream);
return new DefaultStreamStats(firstOffsetSupplier, committedOffsetSupplier);
} else {
- throw propagateException(response.getResponseCode(), stream);
+ throw convertCodeToException(
+ response.getResponseCode(),
+ stream,
+ () ->
+ "Error while querying stream info: "
+ + formatConstant(response.getResponseCode())
+ + ".");
}
}
@@ -508,14 +600,17 @@ public void close() {
this.consumersCoordinator.close();
this.offsetTrackingCoordinator.close();
- try {
- if (this.locator != null && this.locator.isOpen()) {
- this.locator.close();
- this.locator = null;
+ for (Locator locator : this.locators) {
+ try {
+ if (locator.isSet()) {
+ locator.client().close();
+ locator.client(null);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error while closing locator client", e);
}
- } catch (Exception e) {
- LOGGER.warn("Error while closing locator client", e);
}
+
this.clockRefreshFuture.cancel(false);
if (privateScheduleExecutorService) {
this.scheduledExecutorService.shutdownNow();
@@ -541,6 +636,10 @@ ScheduledExecutorService scheduledExecutorService() {
return this.scheduledExecutorService;
}
+ void execute(Runnable task, String description, Object... args) {
+ this.scheduledExecutorService().execute(namedRunnable(task, description, args));
+ }
+
BackOffDelayPolicy recoveryBackOffDelayPolicy() {
return this.recoveryBackOffDelayPolicy;
}
@@ -580,10 +679,11 @@ Runnable registerProducer(StreamProducer producer, String reference, String stre
}
Client locator() {
- if (this.locator == null) {
- throw new LocatorNotAvailableException();
- }
- return this.locator;
+ return this.locators.stream()
+ .filter(Locator::isSet)
+ .findAny()
+ .orElseThrow(() -> new LocatorNotAvailableException())
+ .client();
}
T locatorOperation(Function operation) {
@@ -599,9 +699,21 @@ static T locatorOperation(
boolean executed = false;
Exception lastException = null;
T result = null;
+ LOGGER.debug("Starting locator operation '{}'", operation);
+ long start = System.nanoTime();
while (attempt < maxAttempt) {
try {
- result = operation.apply(clientSupplier.get());
+ Client client = clientSupplier.get();
+ LOGGER.debug(
+ "Using locator on {}:{} to run operation '{}'",
+ client.getHost(),
+ client.getPort(),
+ operation);
+ result = operation.apply(client);
+ LOGGER.debug(
+ "Locator operation '{}' succeeded in {}",
+ operation,
+ Duration.ofNanos(System.nanoTime() - start));
executed = true;
break;
} catch (LocatorNotAvailableException e) {
@@ -613,6 +725,10 @@ static T locatorOperation(
Thread.currentThread().interrupt();
break;
}
+ } catch (Exception e) {
+ LOGGER.debug("Exception during locator operation '{}': {}", operation, exceptionMessage(e));
+ lastException = e;
+ break;
}
}
if (!executed) {
@@ -687,10 +803,19 @@ TrackingConsumerRegistration registerTrackingConsumer(
@Override
public String toString() {
- Client locator = this.locator;
- return "{ \"locator\" : "
- + (locator == null ? "null" : ("\"" + locator.connectionName() + "\""))
- + ", "
+ return "{ \"locators\" : ["
+ + this.locators.stream()
+ .map(
+ l -> {
+ Client c = l.nullableClient();
+ return c == null ? "null" : ("\"" + c.connectionName() + "\"");
+ })
+ .collect(Collectors.joining(","))
+ + "], "
+ + Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount())
+ + ","
+ + Utils.jsonField("consumer_client_count", this.consumersCoordinator.managerCount())
+ + ","
+ "\"producers\" : "
+ this.producersCoordinator
+ ", \"consumers\" : "
@@ -747,4 +872,40 @@ private void checkNotClosed() {
throw new IllegalStateException("This environment instance has been closed");
}
}
+
+ private static class Locator {
+
+ private final Address address;
+ private volatile Optional client;
+
+ private Locator(Address address) {
+ this.address = address;
+ this.client = Optional.empty();
+ }
+
+ Locator client(Client client) {
+ this.client = Optional.ofNullable(client);
+ return this;
+ }
+
+ private boolean isNotSet() {
+ return !this.isSet();
+ }
+
+ private boolean isSet() {
+ return this.client.isPresent();
+ }
+
+ private Client client() {
+ return this.client.orElseThrow(() -> new LocatorNotAvailableException());
+ }
+
+ private Client nullableClient() {
+ return this.client.orElse(null);
+ }
+
+ private Address address() {
+ return this.address;
+ }
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 2993b9cf78..49919164eb 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -17,6 +17,7 @@
import static com.rabbitmq.stream.Constants.CODE_PRODUCER_CLOSED;
import static com.rabbitmq.stream.Constants.CODE_PRODUCER_NOT_AVAILABLE;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
+import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
@@ -166,13 +167,26 @@ public int fragmentLength(Object entity) {
environment
.scheduledExecutorService()
.schedule(
- taskReference.get(), batchPublishingDelay.toMillis(), TimeUnit.MILLISECONDS);
+ namedRunnable(
+ taskReference.get(),
+ "Background batch publishing task for publisher %d on stream '%s'",
+ this.id,
+ this.stream),
+ batchPublishingDelay.toMillis(),
+ TimeUnit.MILLISECONDS);
}
};
taskReference.set(task);
environment
.scheduledExecutorService()
- .schedule(task, batchPublishingDelay.toMillis(), TimeUnit.MILLISECONDS);
+ .schedule(
+ namedRunnable(
+ task,
+ "Background batch publishing task for publisher %d on stream '%s'",
+ this.id,
+ this.stream),
+ batchPublishingDelay.toMillis(),
+ TimeUnit.MILLISECONDS);
}
this.batchSize = batchSize;
this.codec = environment.codec();
@@ -192,14 +206,27 @@ public int fragmentLength(Object entity) {
this.environment
.scheduledExecutorService()
.schedule(
- taskReference.get(), confirmTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ namedRunnable(
+ taskReference.get(),
+ "Background confirm timeout task for producer %d on stream %s",
+ this.id,
+ this.stream),
+ confirmTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
}
};
taskReference.set(wrapperTask);
this.confirmTimeoutFuture =
this.environment
.scheduledExecutorService()
- .schedule(taskReference.get(), confirmTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ .schedule(
+ namedRunnable(
+ taskReference.get(),
+ "Background confirm timeout task for producer %d on stream %s",
+ this.id,
+ this.stream),
+ confirmTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
}
this.status = Status.RUNNING;
}
@@ -209,7 +236,6 @@ private Runnable confirmTimeoutTask(Duration confirmTimeout) {
long limit = this.environment.clock().time() - confirmTimeout.toNanos();
SortedMap unconfirmedSnapshot =
new TreeMap<>(this.unconfirmedMessages);
- LOGGER.debug("Starting confirm timeout check task");
int count = 0;
for (Entry unconfirmedEntry : unconfirmedSnapshot.entrySet()) {
if (unconfirmedEntry.getValue().time() < limit) {
@@ -223,7 +249,15 @@ private Runnable confirmTimeoutTask(Duration confirmTimeout) {
break;
}
}
- LOGGER.debug("Failed {} message(s) which had timed out (limit {})", count, limit);
+ if (count > 0) {
+ LOGGER.debug(
+ "{} outbound message(s) had reached the confirm timeout (limit {}) "
+ + "for producer {} on stream '{}', application notified with callback",
+ count,
+ limit,
+ this.id,
+ this.stream);
+ }
};
}
@@ -450,9 +484,10 @@ void running() {
}
}
publishBatch(false);
- if (unconfirmedMessagesSemaphore.availablePermits() != maxUnconfirmedMessages) {
- unconfirmedMessagesSemaphore.release(
- maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits());
+
+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
+ if (toRelease > 0) {
+ unconfirmedMessagesSemaphore.release(toRelease);
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
LOGGER.debug(
"Could not acquire {} permit(s) for message republishing",
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
index 5aa0f016bc..0b7f0fd559 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java
@@ -14,6 +14,7 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
+import static com.rabbitmq.stream.impl.Utils.namedFunction;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Constants;
@@ -73,7 +74,11 @@ public void create() {
}
this.environment.maybeInitializeLocator();
Client.Response response =
- environment.locatorOperation(c -> c.create(stream, streamParametersBuilder.build()));
+ environment.locatorOperation(
+ namedFunction(
+ c -> c.create(stream, streamParametersBuilder.build()),
+ "Creation of stream '%s'",
+ this.stream));
if (!response.isOk()
&& response.getResponseCode() != Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) {
throw new StreamException(
diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java
index 117b13b566..c884da7e3c 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java
@@ -13,6 +13,8 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.impl.Utils.namedFunction;
+
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageHandler;
@@ -38,7 +40,12 @@ class SuperStreamConsumer implements Consumer {
StreamEnvironment environment,
TrackingConfiguration trackingConfiguration) {
this.superStream = superStream;
- List partitions = environment.locatorOperation(c -> c.partitions(superStream));
+ List partitions =
+ environment.locatorOperation(
+ namedFunction(
+ c -> c.partitions(superStream),
+ "Partition lookup for super stream '%s'",
+ superStream));
// for manual offset tracking strategy only
ConsumerState[] states = new ConsumerState[partitions.size()];
diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java
index ff232fb5b3..8a000d94b7 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java
@@ -13,6 +13,8 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static com.rabbitmq.stream.impl.Utils.namedFunction;
+
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
@@ -70,12 +72,23 @@ public MessageBuilder messageBuilder() {
@Override
public long getLastPublishingId() {
if (this.name != null && !this.name.isEmpty()) {
- List streams = this.environment.locatorOperation(c -> c.partitions(superStream));
+ List streams =
+ this.environment.locatorOperation(
+ namedFunction(
+ c -> c.partitions(superStream),
+ "Partition lookup for super stream '%s'",
+ this.superStream));
long publishingId = 0;
boolean first = true;
for (String partition : streams) {
long pubId =
- this.environment.locatorOperation(c -> c.queryPublisherSequence(this.name, partition));
+ this.environment.locatorOperation(
+ namedFunction(
+ c -> c.queryPublisherSequence(this.name, partition),
+ "Publisher sequence query for on partition '%s' of super stream '%s', publisher name '%s'",
+ partition,
+ this.superStream,
+ this.name));
if (first) {
publishingId = pubId;
first = false;
@@ -148,7 +161,12 @@ private static class DefaultSuperStreamMetadata implements Metadata {
private DefaultSuperStreamMetadata(String superStream, StreamEnvironment environment) {
this.superStream = superStream;
this.environment = environment;
- List ps = environment.locatorOperation(c -> c.partitions(superStream));
+ List ps =
+ environment.locatorOperation(
+ namedFunction(
+ c -> c.partitions(superStream),
+ "Partition lookup for super stream '%s'",
+ superStream));
this.partitions = new CopyOnWriteArrayList<>(ps);
}
@@ -161,7 +179,13 @@ public List partitions() {
public List route(String routingKey) {
return routes.computeIfAbsent(
routingKey,
- routingKey1 -> environment.locatorOperation(c -> c.route(routingKey1, superStream)));
+ routingKey1 ->
+ environment.locatorOperation(
+ namedFunction(
+ c -> c.route(routingKey1, superStream),
+ "Route lookup on super stream '%s' for key '%s'",
+ this.superStream,
+ routingKey1)));
}
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/TimeoutStreamException.java b/src/main/java/com/rabbitmq/stream/impl/TimeoutStreamException.java
new file mode 100644
index 0000000000..7c0c787012
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/TimeoutStreamException.java
@@ -0,0 +1,25 @@
+// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+class TimeoutStreamException extends ConnectionStreamException {
+
+ public TimeoutStreamException(String message) {
+ super(message);
+ }
+
+ public TimeoutStreamException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index bff0dcbc86..141c42b432 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -13,7 +13,10 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
+import static java.lang.String.format;
+
import com.rabbitmq.stream.Address;
+import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.ConsumerUpdateListener;
import com.rabbitmq.stream.OffsetSpecification;
@@ -38,6 +41,7 @@
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import javax.net.ssl.X509TrustManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,6 +155,85 @@ static ClientFactory connectToAdvertisedNodeClientFactory(
retryInterval);
}
+ static Runnable namedRunnable(Runnable task, String format, Object... args) {
+ return new NamedRunnable(String.format(format, args), task);
+ }
+
+ static Function namedFunction(Function task, String format, Object... args) {
+ return new NamedFunction<>(String.format(format, args), task);
+ }
+
+ static T callAndMaybeRetry(
+ Supplier operation, Predicate retryCondition, String format, Object... args) {
+ return callAndMaybeRetry(
+ operation,
+ retryCondition,
+ i -> i >= 3 ? BackOffDelayPolicy.TIMEOUT : Duration.ZERO,
+ format,
+ args);
+ }
+
+ static T callAndMaybeRetry(
+ Supplier operation,
+ Predicate retryCondition,
+ BackOffDelayPolicy delayPolicy,
+ String format,
+ Object... args) {
+ String description = format(format, args);
+ int attempt = 0;
+ Exception lastException = null;
+ boolean keepTrying = true;
+ while (keepTrying) {
+ try {
+ attempt++;
+ T result = operation.get();
+ LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt);
+ return result;
+ } catch (Exception e) {
+ lastException = e;
+ if (retryCondition.test(e)) {
+ LOGGER.debug("Operation '{}' failed, retrying...", description);
+ Duration delay = delayPolicy.delay(attempt);
+ if (BackOffDelayPolicy.TIMEOUT.equals(delay)) {
+ keepTrying = false;
+ } else if (!delay.isZero()) {
+ try {
+ Thread.sleep(delay.toMillis());
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ lastException = ex;
+ keepTrying = false;
+ }
+ }
+ } else {
+ keepTrying = false;
+ }
+ }
+ }
+ String message =
+ format(
+ "Could not complete task '%s' after %d attempt(s) (reason: %s)",
+ description, attempt, exceptionMessage(lastException));
+ LOGGER.debug(message);
+ if (lastException == null) {
+ throw new StreamException(message);
+ } else if (lastException instanceof RuntimeException) {
+ throw (RuntimeException) lastException;
+ } else {
+ throw new StreamException(message, lastException);
+ }
+ }
+
+ static String exceptionMessage(Exception e) {
+ if (e == null) {
+ return "unknown";
+ } else if (e.getMessage() == null) {
+ return e.getClass().getSimpleName();
+ } else {
+ return e.getMessage() + " [" + e.getClass().getSimpleName() + "]";
+ }
+ }
+
interface ClientFactory {
Client client(ClientFactoryContext context);
@@ -331,14 +414,72 @@ static boolean is3_11_OrMore(String brokerVersion) {
return versionCompare(currentVersion(brokerVersion), "3.11.0") >= 0;
}
- static StreamException propagateException(short responseCode, String stream) {
+ static StreamException convertCodeToException(
+ short responseCode, String stream, Supplier fallbackMessage) {
if (responseCode == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
return new StreamDoesNotExistException(stream);
} else if (responseCode == Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE) {
return new StreamNotAvailableException(stream);
} else {
- String message = "Error while querying stream info: " + formatConstant(responseCode) + ".";
- return new StreamException(message, responseCode);
+ return new StreamException(fallbackMessage.get(), responseCode);
+ }
+ }
+
+ private static class NamedRunnable implements Runnable {
+
+ private final String name;
+ private final Runnable delegate;
+
+ private NamedRunnable(String name, Runnable delegate) {
+ this.name = name;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void run() {
+ this.delegate.run();
+ }
+
+ @Override
+ public String toString() {
+ return this.name;
+ }
+ }
+
+ private static class NamedFunction implements Function {
+
+ private final String name;
+ private final Function delegate;
+
+ private NamedFunction(String name, Function delegate) {
+ this.name = name;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public R apply(T t) {
+ return this.delegate.apply(t);
+ }
+
+ @Override
+ public String toString() {
+ return this.name;
+ }
+ }
+
+ static String quote(String value) {
+ if (value == null) {
+ return "null";
+ } else {
+ return "\"" + value + "\"";
}
}
+
+ static String jsonField(String name, Number value) {
+ return quote(name) + " : " + value.longValue();
+ }
+
+ static String jsonField(String name, String value) {
+ return quote(name) + " : " + quote(value);
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java
index a83e05ff4a..7fe4c1b97d 100644
--- a/src/test/java/com/rabbitmq/stream/Host.java
+++ b/src/test/java/com/rabbitmq/stream/Host.java
@@ -98,24 +98,32 @@ public static Process rabbitmqctl(String command) throws IOException {
return executeCommand(rabbitmqctlCommand() + " " + command);
}
- public static Process killConnection(String connectionName) throws IOException {
- List cs = listConnections();
- if (cs.stream().filter(c -> connectionName.equals(c.clientProvidedName())).count() != 1) {
- throw new IllegalArgumentException(
- String.format(
- "Could not find 1 connection '%s' in stream connections: %s",
- connectionName,
- cs.stream()
- .map(ConnectionInfo::clientProvidedName)
- .collect(Collectors.joining(", "))));
+ public static Process killConnection(String connectionName) {
+ try {
+ List cs = listConnections();
+ if (cs.stream().filter(c -> connectionName.equals(c.clientProvidedName())).count() != 1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not find 1 connection '%s' in stream connections: %s",
+ connectionName,
+ cs.stream()
+ .map(ConnectionInfo::clientProvidedName)
+ .collect(Collectors.joining(", "))));
+ }
+ return rabbitmqctl("eval 'rabbit_stream:kill_connection(\"" + connectionName + "\").'");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return rabbitmqctl("eval 'rabbit_stream:kill_connection(\"" + connectionName + "\").'");
}
- public static List listConnections() throws IOException {
- Process process =
- rabbitmqctl("list_stream_connections --formatter json conn_name,client_properties");
- return toConnectionInfoList(capture(process.getInputStream()));
+ public static List listConnections() {
+ try {
+ Process process =
+ rabbitmqctl("list_stream_connections --formatter json conn_name,client_properties");
+ return toConnectionInfoList(capture(process.getInputStream()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
static List toConnectionInfoList(String json) {
diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
index 215b525a44..38076b247a 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
@@ -18,6 +18,8 @@
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.metadata;
import static com.rabbitmq.stream.impl.TestUtils.namedConsumer;
+import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
+import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -40,7 +42,8 @@
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
import com.rabbitmq.stream.impl.Client.MessageListener;
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
-import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumersPoolInfo;
+import com.rabbitmq.stream.impl.Client.Response;
+import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo;
import com.rabbitmq.stream.impl.Utils.ClientFactory;
import java.time.Duration;
import java.util.ArrayList;
@@ -143,6 +146,8 @@ public Client.ClientParameters shutdownListener(
when(environment.locatorOperation(any())).thenCallRealMethod();
when(environment.clientParametersCopy()).thenReturn(clientParameters);
when(environment.addressResolver()).thenReturn(address -> address);
+ when(client.brokerVersion()).thenReturn("3.11.0");
+ when(client.isOpen()).thenReturn(true);
coordinator =
new ConsumersCoordinator(
@@ -335,7 +340,7 @@ void subscribePropagateExceptionWhenClientSubscriptionFails() {
Collections.emptyMap()))
.isInstanceOf(StreamException.class)
.hasMessage(exceptionMessage);
- assertThat(MonitoringTestUtils.extract(coordinator)).isEmpty();
+ assertThat(MonitoringTestUtils.extract(coordinator).isEmpty()).isTrue();
}
@Test
@@ -493,7 +498,7 @@ void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() {
@Test
void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(100);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
@@ -506,13 +511,18 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
.thenReturn(metadata(null, replica()));
when(clientFactory.client(any())).thenReturn(client);
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
when(client.subscribe(
subscriptionIdCaptor.capture(),
anyString(),
any(OffsetSpecification.class),
anyInt(),
anyMap()))
- .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return new Client.Response(Constants.RESPONSE_CODE_OK);
+ });
StreamConsumer consumerClosedAfterConnectionLost = mock(StreamConsumer.class);
when(consumerClosedAfterConnectionLost.isOpen()).thenReturn(false);
@@ -553,12 +563,12 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
shutdownListener.handle(
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
- Thread.sleep(retryDelay.toMillis() * 5);
+ // the second consumer does not re-subscribe because it returns it is not open
+ waitAtMost(() -> subscriptionCount.get() == 2 + 1);
// the consumer connection should be reset after the connection disruption
verify(consumer, times(1)).setSubscriptionClient(isNull());
- // the second consumer does not re-subscribe because it returns it is not open
verify(client, times(2 + 1))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
@@ -578,11 +588,74 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
assertThat(messageHandlerCalls.get()).isEqualTo(2);
}
+ @Test
+ void shouldSkipRecoveryIfRecoveryIsAlreadyInProgress() throws Exception {
+ scheduledExecutorService = createScheduledExecutorService(2);
+ when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
+ Duration retryDelay = Duration.ofMillis(100);
+ when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(consumer.isOpen()).thenReturn(true);
+ when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
+
+ when(clientFactory.client(any())).thenReturn(client);
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
+ when(client.subscribe(
+ subscriptionIdCaptor.capture(),
+ anyString(),
+ any(OffsetSpecification.class),
+ anyInt(),
+ anyMap()))
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return new Client.Response(Constants.RESPONSE_CODE_OK);
+ });
+
+ String trackingReference = "reference";
+
+ when(client.queryOffset(trackingReference, "stream"))
+ .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L)) // first subscription
+ .thenAnswer(
+ invocation -> {
+ // during recovery, we trigger another disconnection
+ shutdownListener.handle(
+ new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+ Thread.sleep(retryDelay.multipliedBy(3).toMillis());
+ throw new TimeoutStreamException("");
+ })
+ .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L));
+
+ AtomicInteger messageHandlerCalls = new AtomicInteger();
+ coordinator.subscribe(
+ consumer,
+ "stream",
+ OffsetSpecification.first(),
+ trackingReference,
+ NO_OP_SUBSCRIPTION_LISTENER,
+ NO_OP_TRACKING_CLOSING_CALLBACK,
+ (offset, message) -> messageHandlerCalls.incrementAndGet(),
+ Collections.emptyMap());
+ verify(clientFactory, times(1)).client(any());
+ verify(client, times(1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ shutdownListener.handle(
+ new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+
+ waitAtMost(
+ () -> subscriptionCount.get() == 1 + 1,
+ () -> format("Subscription count is %s", subscriptionCount.get()));
+
+ verify(consumer, times(1)).setSubscriptionClient(isNull());
+ verify(client, times(1 + 1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+ }
+
@Test
void shouldRedistributeConsumerOnMetadataUpdate() throws Exception {
BackOffDelayPolicy delayPolicy = fixedWithInitialDelay(ms(100), ms(100));
when(environment.topologyUpdateBackOffDelayPolicy()).thenReturn(delayPolicy);
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
@@ -592,13 +665,18 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception {
StreamConsumer consumerClosedAfterMetadataUpdate = mock(StreamConsumer.class);
when(consumerClosedAfterMetadataUpdate.isOpen()).thenReturn(false);
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
when(client.subscribe(
subscriptionIdCaptor.capture(),
anyString(),
any(OffsetSpecification.class),
anyInt(),
anyMap()))
- .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return responseOk();
+ });
AtomicInteger messageHandlerCalls = new AtomicInteger();
Runnable closingRunnable =
@@ -644,24 +722,24 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception {
// the consumer connection should be reset after the metadata update
verify(consumer, times(1)).setSubscriptionClient(isNull());
- Thread.sleep(delayPolicy.delay(0).toMillis() * 5);
-
// the second consumer does not re-subscribe because it returns it is not open
+ waitAtMost(() -> subscriptionCount.get() == 2 + 1);
verify(client, times(2 + 1))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
assertThat(messageHandlerCalls.get()).isEqualTo(1);
- lastMessageListener()
- .handle(
- subscriptionIdCaptor.getAllValues().get(0),
- 0,
- 0,
- 0,
- new WrapperMessageBuilder().build());
+ // listener is per manager (connection), so it can have been disposed of,
+ // depending on which replica is chosen
+ // we dispatch to all of them, we should have only one subscription active
+ // we use the latest subscription ID though
+ this.messageListeners.forEach(
+ l -> {
+ l.handle(subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build());
+ });
+
assertThat(messageHandlerCalls.get()).isEqualTo(2);
- when(client.unsubscribe(subscriptionIdCaptor.getValue()))
- .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
+ when(client.unsubscribe(subscriptionIdCaptor.getValue())).thenReturn(responseOk());
closingRunnable.run();
verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue());
@@ -670,14 +748,14 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception {
.handle(subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build());
assertThat(messageHandlerCalls.get()).isEqualTo(2);
- assertThat(coordinator.poolSize()).isZero();
+ assertThat(coordinator.managerCount()).isZero();
}
@Test
void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Exception {
BackOffDelayPolicy delayPolicy = fixedWithInitialDelay(ms(100), ms(100));
when(environment.topologyUpdateBackOffDelayPolicy()).thenReturn(delayPolicy);
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream"))
@@ -687,13 +765,18 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti
.thenReturn(metadata(null, replicas()));
when(clientFactory.client(any())).thenReturn(client);
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
when(client.subscribe(
subscriptionIdCaptor.capture(),
anyString(),
any(OffsetSpecification.class),
anyInt(),
anyMap()))
- .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
+ .then(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return responseOk();
+ });
AtomicInteger messageHandlerCalls = new AtomicInteger();
Runnable closingRunnable =
@@ -717,7 +800,7 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti
metadataListener.handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
- Thread.sleep(delayPolicy.delay(0).toMillis() + delayPolicy.delay(1).toMillis() * 5);
+ waitAtMost(() -> subscriptionCount.get() == 2);
verify(client, times(2))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
@@ -737,14 +820,14 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti
subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build());
assertThat(messageHandlerCalls.get()).isEqualTo(2);
- assertThat(coordinator.poolSize()).isZero();
+ assertThat(coordinator.managerCount()).isZero();
}
@Test
void metadataUpdate_shouldCloseConsumerIfStreamIsDeleted() throws Exception {
BackOffDelayPolicy delayPolicy = fixedWithInitialDelay(ms(50), ms(50));
when(environment.topologyUpdateBackOffDelayPolicy()).thenReturn(delayPolicy);
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream"))
@@ -788,7 +871,7 @@ void metadataUpdate_shouldCloseConsumerIfStreamIsDeleted() throws Exception {
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
verify(client, times(0)).unsubscribe(anyByte());
- assertThat(coordinator.poolSize()).isZero();
+ assertThat(coordinator.managerCount()).isZero();
}
@Test
@@ -796,7 +879,7 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio
Duration retryTimeout = Duration.ofMillis(200);
BackOffDelayPolicy delayPolicy = fixedWithInitialDelay(ms(50), ms(50), ms(200));
when(environment.topologyUpdateBackOffDelayPolicy()).thenReturn(delayPolicy);
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream"))
@@ -840,7 +923,7 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
verify(client, times(0)).unsubscribe(anyByte());
- assertThat(coordinator.poolSize()).isZero();
+ assertThat(coordinator.managerCount()).isZero();
}
@Test
@@ -904,7 +987,7 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip
@Test
void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(100);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
@@ -969,7 +1052,7 @@ void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws E
void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() throws Exception {
BackOffDelayPolicy delayPolicy = fixedWithInitialDelay(ms(50), ms(50));
when(environment.topologyUpdateBackOffDelayPolicy()).thenReturn(delayPolicy);
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream")).thenReturn(metadata(null, replicas().subList(0, 1)));
@@ -1004,27 +1087,21 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t
verify(client, times(subscriptionCount))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
- List info = MonitoringTestUtils.extract(coordinator);
- assertThat(info)
- .hasSize(1)
- .element(0)
- .extracting(pool -> pool.consumerCount())
- .isEqualTo(subscriptionCount);
+ ConsumerCoordinatorInfo info = MonitoringTestUtils.extract(coordinator);
+ assertThat(info.nodesConnected());
+ assertThat(info.consumerCount()).isEqualTo(subscriptionCount);
- // let's kill the first client connection
+ // let's make the stream unavailable on the first manager
metadataListeners.get(0).handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
Thread.sleep(delayPolicy.delay(0).toMillis() * 5);
info = MonitoringTestUtils.extract(coordinator);
- assertThat(info)
- .hasSize(1)
- .element(0)
- .extracting(pool -> pool.consumerCount())
- .isEqualTo(subscriptionCount);
+ assertThat(info.nodesConnected()).hasSize(1);
+ assertThat(info.consumerCount()).isEqualTo(subscriptionCount);
- // the MAX consumers must have been re-allocated to the existing client and a new one
- // let's add a new subscription to make sure we are still using the same pool
+ // the MAX consumers must have been re-allocated to the initial client because it's not closed
+ // let's add a new subscription to make sure we are still using the second client
coordinator.subscribe(
consumer,
"stream",
@@ -1035,23 +1112,21 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t
(offset, message) -> {},
Collections.emptyMap());
- verify(clientFactory, times(2 + 1)).client(any());
+ // no more client creation
+ verify(clientFactory, times(2)).client(any());
verify(client, times(subscriptionCount + ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + 1))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
info = MonitoringTestUtils.extract(coordinator);
- assertThat(info)
- .hasSize(1)
- .element(0)
- .extracting(pool -> pool.consumerCount())
- .isEqualTo(subscriptionCount + 1);
+ assertThat(info.nodesConnected()).hasSize(1);
+ assertThat(info.consumerCount()).isEqualTo(subscriptionCount + 1);
}
@ParameterizedTest
@MethodSource("disruptionArguments")
void shouldRestartWhereItLeftOffAfterDisruption(Consumer configurator)
throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(100);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
@@ -1122,7 +1197,7 @@ void shouldRestartWhereItLeftOffAfterDisruption(Consumer configurator) throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(100);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
@@ -1186,7 +1261,7 @@ void shouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived(
@SuppressWarnings("unchecked")
void shouldUseStoredOffsetOnRecovery(Consumer configurator)
throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(100);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
@@ -1268,7 +1343,207 @@ void shouldUseStoredOffsetOnRecovery(Consumer configur
}
@Test
- void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() throws Exception {
+ @SuppressWarnings("unchecked")
+ void shouldRetryAssignmentOnRecoveryTimeout() throws Exception {
+ scheduledExecutorService = createScheduledExecutorService(2);
+ when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
+ Duration retryDelay = Duration.ofMillis(100);
+ when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(environment.topologyUpdateBackOffDelayPolicy())
+ .thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(consumer.isOpen()).thenReturn(true);
+ when(locator.metadata("stream-1")).thenReturn(metadata("stream-1", null, replica()));
+ when(locator.metadata("stream-2")).thenReturn(metadata("stream-2", null, replica()));
+
+ when(clientFactory.client(any())).thenReturn(client);
+
+ String consumerName = "consumer-name";
+ when(client.queryOffset(consumerName, "stream-1"))
+ .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L));
+ when(client.queryOffset(consumerName, "stream-2"))
+ .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L)) // first subscription
+ .thenThrow(new TimeoutStreamException("")) // on recovery
+ .thenThrow(new TimeoutStreamException("")) // on recovery, retry
+ .thenThrow(new TimeoutStreamException("")) // on recovery, retry
+ .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L));
+
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
+ when(client.subscribe(
+ subscriptionIdCaptor.capture(),
+ anyString(),
+ any(OffsetSpecification.class),
+ anyInt(),
+ anyMap()))
+ .thenAnswer(
+ a -> {
+ subscriptionCount.incrementAndGet();
+ return new Client.Response(Constants.RESPONSE_CODE_OK);
+ });
+
+ coordinator.subscribe(
+ consumer,
+ "stream-1",
+ null,
+ consumerName,
+ NO_OP_SUBSCRIPTION_LISTENER,
+ NO_OP_TRACKING_CLOSING_CALLBACK,
+ (offset, message) -> {},
+ Collections.emptyMap());
+ verify(clientFactory, times(1)).client(any());
+ verify(client, times(1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ coordinator.subscribe(
+ consumer,
+ "stream-2",
+ null,
+ consumerName,
+ NO_OP_SUBSCRIPTION_LISTENER,
+ NO_OP_TRACKING_CLOSING_CALLBACK,
+ (offset, message) -> {},
+ Collections.emptyMap());
+ verify(clientFactory, times(1)).client(any());
+ verify(client, times(1 + 1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ this.shutdownListener.handle(
+ new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+
+ waitAtMost(() -> subscriptionCount.get() == (1 + 1) * 2);
+
+ verify(locator, times(2)).metadata("stream-1");
+ verify(client, times(2)).queryOffset(consumerName, "stream-1");
+ // for stream-2, the offset query on recovery timed out, so more calls...
+ verify(locator, times(3)).metadata("stream-2");
+ verify(client, times(1 + 3 + 1))
+ .queryOffset(
+ consumerName, "stream-2"); // subscription call, times out 3 times, retry that succeeds
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void shouldRetryAssignmentOnRecoveryStreamNotAvailableFailure() throws Exception {
+ scheduledExecutorService = createScheduledExecutorService(2);
+ when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
+ Duration retryDelay = Duration.ofMillis(100);
+ when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(environment.topologyUpdateBackOffDelayPolicy())
+ .thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(consumer.isOpen()).thenReturn(true);
+ when(locator.metadata("stream")).thenReturn(metadata("stream", null, replicas()));
+
+ when(clientFactory.client(any())).thenReturn(client);
+
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
+ when(client.subscribe(
+ subscriptionIdCaptor.capture(),
+ anyString(),
+ any(OffsetSpecification.class),
+ anyInt(),
+ anyMap()))
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return responseOk();
+ })
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return new Response(Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
+ })
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return responseOk();
+ });
+
+ coordinator.subscribe(
+ consumer,
+ "stream",
+ null,
+ null,
+ NO_OP_SUBSCRIPTION_LISTENER,
+ NO_OP_TRACKING_CLOSING_CALLBACK,
+ (offset, message) -> {},
+ Collections.emptyMap());
+ verify(clientFactory, times(1)).client(any());
+ verify(client, times(1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ this.shutdownListener.handle(
+ new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+
+ waitAtMost(() -> subscriptionCount.get() == 1 + 1 + 1);
+
+ verify(locator, times(3)).metadata("stream");
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void shouldRetryAssignmentOnRecoveryCandidateLookupFailure() throws Exception {
+ scheduledExecutorService = createScheduledExecutorService();
+ when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
+ Duration retryDelay = Duration.ofMillis(100);
+ when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(environment.topologyUpdateBackOffDelayPolicy())
+ .thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(consumer.isOpen()).thenReturn(true);
+ when(locator.metadata("stream"))
+ .thenReturn(metadata("stream", null, replicas()))
+ .thenReturn(metadata("stream", null, replicas()))
+ .thenReturn(metadata("stream", null, null))
+ .thenReturn(metadata("stream", null, replicas()));
+
+ when(clientFactory.client(any())).thenReturn(client);
+
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
+ when(client.subscribe(
+ subscriptionIdCaptor.capture(),
+ anyString(),
+ any(OffsetSpecification.class),
+ anyInt(),
+ anyMap()))
+ .thenAnswer(
+ invocation -> {
+ // first subscription
+ subscriptionCount.incrementAndGet();
+ return responseOk();
+ })
+ .thenAnswer(
+ invocation -> {
+ // on recovery, subscription fails, to trigger candidate lookup
+ subscriptionCount.incrementAndGet();
+ return new Response(Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
+ })
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return responseOk();
+ });
+
+ coordinator.subscribe(
+ consumer,
+ "stream",
+ null,
+ null,
+ NO_OP_SUBSCRIPTION_LISTENER,
+ NO_OP_TRACKING_CLOSING_CALLBACK,
+ (offset, message) -> {},
+ Collections.emptyMap());
+ verify(clientFactory, times(1)).client(any());
+ verify(client, times(1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ this.shutdownListener.handle(
+ new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+
+ waitAtMost(() -> subscriptionCount.get() == 1 + 1 + 1);
+
+ verify(locator, times(4)).metadata("stream");
+ }
+
+ @Test
+ void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() {
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
when(clientFactory.client(any())).thenReturn(client);
@@ -1342,4 +1617,19 @@ private MessageListener firstMessageListener() {
private MessageListener lastMessageListener() {
return this.messageListeners.get(messageListeners.size() - 1);
}
+
+ private static ScheduledExecutorService createScheduledExecutorService() {
+ return createScheduledExecutorService(1);
+ }
+
+ private static ScheduledExecutorService createScheduledExecutorService(int nbThreads) {
+ return new ScheduledExecutorServiceWrapper(
+ nbThreads == 1
+ ? Executors.newSingleThreadScheduledExecutor()
+ : Executors.newScheduledThreadPool(nbThreads));
+ }
+
+ private static Response responseOk() {
+ return new Response(Constants.RESPONSE_CODE_OK);
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java b/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java
index 41be5cf5db..e09178c978 100644
--- a/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java
+++ b/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java
@@ -13,36 +13,25 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
-import java.lang.reflect.Type;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
class MonitoringTestUtils {
private static final Gson GSON = new Gson();
- private static List arrayToList(T[] array) {
- if (array == null || array.length == 0) {
- return Collections.emptyList();
- } else {
- return Arrays.asList(array);
- }
- }
-
- static List extract(ProducersCoordinator coordinator) {
- Type type = new TypeToken>() {}.getType();
- return GSON.fromJson(coordinator.toString(), type);
+ static ProducersCoordinatorInfo extract(ProducersCoordinator coordinator) {
+ return GSON.fromJson(coordinator.toString(), ProducersCoordinatorInfo.class);
}
- static List extract(ConsumersCoordinator coordinator) {
- Type type = new TypeToken>() {}.getType();
- return GSON.fromJson(coordinator.toString(), type);
+ static ConsumerCoordinatorInfo extract(ConsumersCoordinator coordinator) {
+ return GSON.fromJson(coordinator.toString(), ConsumerCoordinatorInfo.class);
}
static EnvironmentInfo extract(Environment environment) {
@@ -63,85 +52,80 @@ static ConsumerInfo extract(Consumer consumer) {
public static class EnvironmentInfo {
- private final String locator;
- private final ProducersPoolInfo[] producers;
- private final ConsumersPoolInfo[] consumers;
+ private final String[] locators;
+ private final ProducersCoordinatorInfo producers;
+ private final ConsumerCoordinatorInfo consumers;
public EnvironmentInfo(
- String locator, ProducersPoolInfo[] producers, ConsumersPoolInfo[] consumers) {
- this.locator = locator;
+ String[] locators, ProducersCoordinatorInfo producers, ConsumerCoordinatorInfo consumers) {
+ this.locators = locators;
this.producers = producers;
this.consumers = consumers;
}
- public String getLocator() {
- return locator;
+ public String[] getLocators() {
+ return locators;
}
- public List getConsumers() {
- return arrayToList(this.consumers);
+ public ConsumerCoordinatorInfo getConsumers() {
+ return this.consumers;
}
- public List getProducers() {
- return arrayToList(this.producers);
+ public ProducersCoordinatorInfo getProducers() {
+ return this.producers;
}
@Override
public String toString() {
return "EnvironmentInfo{"
- + "locator='"
- + locator
+ + "locators='"
+ + Arrays.toString(locators)
+ '\''
+ ", producers="
- + Arrays.toString(producers)
+ + producers
+ ", consumers="
- + Arrays.toString(consumers)
+ + consumers
+ '}';
}
}
- public static class ConsumersPoolInfo {
+ public static class ConsumerCoordinatorInfo {
- private final String broker;
+ private final int subscription_count;
private final ConsumerManager[] clients;
- public ConsumersPoolInfo(String broker, ConsumerManager[] clients) {
- this.broker = broker;
+ public ConsumerCoordinatorInfo(int subscription_count, ConsumerManager[] clients) {
+ this.subscription_count = subscription_count;
this.clients = clients;
}
- public String getBroker() {
- return broker;
+ boolean isEmpty() {
+ return this.clients.length == 0;
}
- public List getClients() {
- return arrayToList(this.clients);
+ Set nodesConnected() {
+ return Arrays.stream(this.clients).map(m -> m.node).collect(Collectors.toSet());
}
- public int consumerCount() {
- return getClients().stream()
- .map(manager -> manager.getConsumerCount())
- .reduce(0, (acc, count) -> acc + count);
+ List clients() {
+ return Arrays.asList(this.clients);
}
- @Override
- public String toString() {
- return "ConsumerPoolInfo{"
- + "broker='"
- + broker
- + '\''
- + ", clients="
- + Arrays.toString(clients)
- + '}';
+ int consumerCount() {
+ return Arrays.stream(this.clients).mapToInt(ConsumerManager::getConsumerCount).sum();
}
}
public static class ConsumerManager {
+ private final long id;
+ private final String node;
private final int consumer_count;
- public ConsumerManager(int consumerCount) {
- this.consumer_count = consumerCount;
+ public ConsumerManager(long id, String node, int consumer_count) {
+ this.id = id;
+ this.node = node;
+ this.consumer_count = consumer_count;
}
public int getConsumerCount() {
@@ -150,46 +134,63 @@ public int getConsumerCount() {
@Override
public String toString() {
- return "ConsumerManager{" + "consumerCount=" + consumer_count + '}';
+ return "ConsumerManager{"
+ + "id="
+ + id
+ + ", node='"
+ + node
+ + '\''
+ + ", consumer_count="
+ + consumer_count
+ + '}';
}
}
- public static class ProducersPoolInfo {
+ public static class ProducersCoordinatorInfo {
- private final String broker;
+ private final int client_count;
+ private final int producer_count;
+ private final int tracking_consumer_count;
private final ProducerManager[] clients;
- public ProducersPoolInfo(String broker, ProducerManager[] clients) {
- this.broker = broker;
+ public ProducersCoordinatorInfo(
+ int client_count,
+ int producer_count,
+ int tracking_consumer_count,
+ ProducerManager[] clients) {
+ this.client_count = client_count;
+ this.producer_count = producer_count;
+ this.tracking_consumer_count = tracking_consumer_count;
this.clients = clients;
}
- public String getBroker() {
- return broker;
+ int clientCount() {
+ return this.client_count;
}
- public List getClients() {
- return arrayToList(this.clients);
+ int producerCount() {
+ return this.producer_count;
}
- @Override
- public String toString() {
- return "ProducersPoolInfo{"
- + "broker='"
- + broker
- + '\''
- + ", clients="
- + Arrays.toString(clients)
- + '}';
+ int trackingConsumerCount() {
+ return this.tracking_consumer_count;
+ }
+
+ Set nodesConnected() {
+ return Arrays.stream(this.clients).map(m -> m.node).collect(Collectors.toSet());
}
}
public static class ProducerManager {
+ private final long id;
+ private final String node;
private final int producer_count;
private final int tracking_consumer_count;
- public ProducerManager(int producerCount, int tracking_consumer_count) {
+ public ProducerManager(long id, String node, int producerCount, int tracking_consumer_count) {
+ this.id = id;
+ this.node = node;
this.producer_count = producerCount;
this.tracking_consumer_count = tracking_consumer_count;
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java
index a3a3ed8f57..8a7af3ece6 100644
--- a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java
@@ -50,7 +50,7 @@ public class OffsetTrackingCoordinatorTest {
@BeforeEach
void init() {
mocks = MockitoAnnotations.openMocks(this);
- executorService = Executors.newScheduledThreadPool(2);
+ executorService = new ScheduledExecutorServiceWrapper(Executors.newScheduledThreadPool(2));
when(env.scheduledExecutorService()).thenReturn(executorService);
when(consumer.isOpen()).thenReturn(true);
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java
index 3faae0c8a1..74e3a0c562 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java
@@ -118,6 +118,8 @@ public Client.ClientParameters metadataListener(
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
type -> "producer-connection",
clientFactory);
+ when(client.isOpen()).thenReturn(true);
+ when(client.deletePublisher(anyByte())).thenReturn(new Response(Constants.RESPONSE_CODE_OK));
}
@AfterEach
@@ -229,7 +231,7 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod
@Test
void shouldRedistributeProducerAndTrackingConsumerIfConnectionIsLost() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(50);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
@@ -272,7 +274,7 @@ void shouldRedistributeProducerAndTrackingConsumerIfConnectionIsLost() throws Ex
verify(producer, times(1)).setClient(client);
verify(trackingConsumer, times(1)).setTrackingClient(client);
verify(producerClosedAfterDisconnection, times(1)).setClient(client);
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
shutdownListener.handle(
@@ -289,13 +291,13 @@ void shouldRedistributeProducerAndTrackingConsumerIfConnectionIsLost() throws Ex
verify(producerClosedAfterDisconnection, times(1)).unavailable();
verify(producerClosedAfterDisconnection, times(1)).setClient(client);
verify(producerClosedAfterDisconnection, never()).running();
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
}
@Test
void shouldDisposeProducerAndNotTrackingConsumerIfRecoveryTimesOut() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(environment.recoveryBackOffDelayPolicy())
.thenReturn(BackOffDelayPolicy.fixedWithInitialDelay(ms(10), ms(10), ms(100)));
@@ -316,7 +318,7 @@ void shouldDisposeProducerAndNotTrackingConsumerIfRecoveryTimesOut() throws Exce
verify(producer, times(1)).setClient(client);
verify(trackingConsumer, times(1)).setTrackingClient(client);
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
shutdownListener.handle(
@@ -330,13 +332,13 @@ void shouldDisposeProducerAndNotTrackingConsumerIfRecoveryTimesOut() throws Exce
verify(trackingConsumer, times(1)).setTrackingClient(client);
verify(trackingConsumer, never()).running();
verify(trackingConsumer, never()).closeAfterStreamDeletion();
- assertThat(coordinator.poolSize()).isEqualTo(0);
+ assertThat(coordinator.nodesConnected()).isEqualTo(0);
assertThat(coordinator.clientCount()).isEqualTo(0);
}
@Test
void shouldRedistributeProducersAndTrackingConsumersOnMetadataUpdate() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(50);
when(environment.topologyUpdateBackOffDelayPolicy())
@@ -396,7 +398,6 @@ void shouldRedistributeProducersAndTrackingConsumersOnMetadataUpdate() throws Ex
verify(producerClosedAfterDisconnection, times(1)).setClient(client);
verify(movingTrackingConsumer, times(1)).setTrackingClient(client);
verify(fixedTrackingConsumer, times(1)).setTrackingClient(client);
- assertThat(coordinator.poolSize()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
metadataListener.handle(movingStream, Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
@@ -420,13 +421,13 @@ void shouldRedistributeProducersAndTrackingConsumersOnMetadataUpdate() throws Ex
verify(fixedTrackingConsumer, never()).unavailable();
verify(fixedTrackingConsumer, times(1)).setTrackingClient(client);
verify(fixedTrackingConsumer, never()).running();
- assertThat(coordinator.poolSize()).isEqualTo(2);
+ assertThat(coordinator.nodesConnected()).isEqualTo(2);
assertThat(coordinator.clientCount()).isEqualTo(2);
}
@Test
void shouldDisposeProducerIfStreamIsDeleted() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(environment.topologyUpdateBackOffDelayPolicy())
.thenReturn(BackOffDelayPolicy.fixedWithInitialDelay(ms(10), ms(10), ms(100)));
@@ -444,7 +445,6 @@ void shouldDisposeProducerIfStreamIsDeleted() throws Exception {
coordinator.registerProducer(producer, null, "stream");
verify(producer, times(1)).setClient(client);
- assertThat(coordinator.poolSize()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
metadataListener.handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
@@ -454,13 +454,12 @@ void shouldDisposeProducerIfStreamIsDeleted() throws Exception {
verify(producer, times(1)).setClient(client);
verify(producer, never()).running();
- assertThat(coordinator.poolSize()).isEqualTo(0);
assertThat(coordinator.clientCount()).isEqualTo(0);
}
@Test
void shouldDisposeProducerAndNotTrackingConsumerIfMetadataUpdateTimesOut() throws Exception {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(environment.topologyUpdateBackOffDelayPolicy())
.thenReturn(BackOffDelayPolicy.fixedWithInitialDelay(ms(10), ms(10), ms(100)));
@@ -481,7 +480,7 @@ void shouldDisposeProducerAndNotTrackingConsumerIfMetadataUpdateTimesOut() throw
verify(producer, times(1)).setClient(client);
verify(trackingConsumer, times(1)).setTrackingClient(client);
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
metadataListener.handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE);
@@ -494,13 +493,13 @@ void shouldDisposeProducerAndNotTrackingConsumerIfMetadataUpdateTimesOut() throw
verify(trackingConsumer, times(1)).setTrackingClient(client);
verify(trackingConsumer, never()).running();
verify(trackingConsumer, never()).closeAfterStreamDeletion();
- assertThat(coordinator.poolSize()).isEqualTo(0);
+ assertThat(coordinator.nodesConnected()).isEqualTo(0);
assertThat(coordinator.clientCount()).isEqualTo(0);
}
@Test
void growShrinkResourcesBasedOnProducersAndTrackingConsumersCount() {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
@@ -529,7 +528,7 @@ class ProducerInfo {
producerInfos.add(info);
});
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(2);
// let's add some tracking consumers
@@ -554,7 +553,7 @@ class TrackingConsumerInfo {
trackingConsumerInfos.add(info);
});
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount())
.as("new tracking consumers needs yet another client")
.isEqualTo(3);
@@ -588,7 +587,7 @@ class TrackingConsumerInfo {
verify(p, times(1)).setClient(client);
assertThat(publishingIdForNewProducer.get()).isEqualTo(info.publishingId);
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(2);
// close some of the last producers, this should free a whole producer manager and a bit of the
@@ -598,7 +597,11 @@ class TrackingConsumerInfo {
producerInfo.cleaningCallback.run();
}
- assertThat(coordinator.poolSize()).isEqualTo(1);
+ assertThat(coordinator.nodesConnected()).isEqualTo(1);
assertThat(coordinator.clientCount()).isEqualTo(1);
}
+
+ private static ScheduledExecutorService createScheduledExecutorService() {
+ return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor());
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
index 1b11d91bf1..78acda9623 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
@@ -482,6 +482,7 @@ void consumerShouldKeepConsumingAfterDisruption(
java.util.function.Consumer