Skip to content

Commit 13f4f46

Browse files
committed
Exchange command versions in connection
Command availability (only StreamStats for now) was based on broker version. This commit makes the exchange for each open connection and make sure the users of Client won't call a command not supported by the broker.
1 parent 2616345 commit 13f4f46

File tree

4 files changed

+52
-21
lines changed

4 files changed

+52
-21
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+33
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,11 @@
112112
import java.util.ArrayList;
113113
import java.util.Collections;
114114
import java.util.HashMap;
115+
import java.util.HashSet;
115116
import java.util.List;
116117
import java.util.Map;
117118
import java.util.Objects;
119+
import java.util.Set;
118120
import java.util.concurrent.ConcurrentHashMap;
119121
import java.util.concurrent.ConcurrentMap;
120122
import java.util.concurrent.CopyOnWriteArrayList;
@@ -210,6 +212,7 @@ public long applyAsLong(Object value) {
210212
private final Map<String, String> connectionProperties;
211213
private final Duration rpcTimeout;
212214
private volatile ShutdownReason shutdownReason = null;
215+
private final Runnable exchangeCommandVersionsCheck;
213216

214217
public Client() {
215218
this(new ClientParameters());
@@ -373,6 +376,19 @@ public void write(
373376
this.maxFrameSize(),
374377
tuneState.getHeartbeat());
375378
this.connectionProperties = open(parameters.virtualHost);
379+
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
380+
if (supportedCommands.stream()
381+
.filter(i -> i.getKey() == COMMAND_STREAM_STATS)
382+
.findAny()
383+
.isPresent()) {
384+
this.exchangeCommandVersionsCheck = () -> {};
385+
} else {
386+
this.exchangeCommandVersionsCheck =
387+
() -> {
388+
throw new UnsupportedOperationException(
389+
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
390+
};
391+
}
376392
started.set(true);
377393
this.metricsCollector.openConnection();
378394
} catch (RuntimeException e) {
@@ -1455,6 +1471,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
14551471
}
14561472

14571473
StreamStatsResponse streamStats(String stream) {
1474+
this.exchangeCommandVersionsCheck.run();
14581475
if (stream == null) {
14591476
throw new IllegalArgumentException("stream must not be null");
14601477
}
@@ -1548,6 +1565,22 @@ private EncodedMessageBatch createEncodedMessageBatch(Compression compression, i
15481565
channel.alloc(), compression.code(), compressionCodecFactory.get(compression), batchSize);
15491566
}
15501567

1568+
private Set<FrameHandlerInfo> maybeExchangeCommandVersions() {
1569+
Set<FrameHandlerInfo> supported = new HashSet<>();
1570+
try {
1571+
if (Utils.is3_11_OrMore(brokerVersion())) {
1572+
for (FrameHandlerInfo info : exchangeCommandVersions()) {
1573+
if (info.getKey() == COMMAND_STREAM_STATS) {
1574+
supported.add(info);
1575+
}
1576+
}
1577+
}
1578+
} catch (Exception e) {
1579+
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
1580+
}
1581+
return supported;
1582+
}
1583+
15511584
public interface OutboundEntityMappingCallback {
15521585

15531586
void handle(long publishingId, Object originalMessageOrBatch);

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

-11
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
729729
.key(name);
730730
this.client = clientFactory.client(clientFactoryContext);
731731
LOGGER.debug("Created consumer connection '{}'", connectionName);
732-
maybeExchangeCommandVersions(client);
733732
clientInitializedInManager.set(true);
734733
}
735734

@@ -1149,16 +1148,6 @@ public String toString() {
11491148
}
11501149
}
11511150

1152-
private static void maybeExchangeCommandVersions(Client client) {
1153-
try {
1154-
if (Utils.is3_11_OrMore(client.brokerVersion())) {
1155-
client.exchangeCommandVersions();
1156-
}
1157-
} catch (Exception e) {
1158-
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
1159-
}
1160-
}
1161-
11621151
private static final Predicate<Exception> RETRY_ON_TIMEOUT =
11631152
e -> e instanceof TimeoutStreamException;
11641153

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

+18
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.LinkedHashMap;
8484
import java.util.List;
8585
import java.util.Map;
86+
import java.util.Objects;
8687
import java.util.concurrent.ConcurrentMap;
8788
import java.util.concurrent.atomic.AtomicBoolean;
8889
import org.slf4j.Logger;
@@ -184,6 +185,23 @@ public String toString() {
184185
+ maxVersion
185186
+ '}';
186187
}
188+
189+
@Override
190+
public boolean equals(Object o) {
191+
if (this == o) {
192+
return true;
193+
}
194+
if (o == null || getClass() != o.getClass()) {
195+
return false;
196+
}
197+
FrameHandlerInfo that = (FrameHandlerInfo) o;
198+
return key == that.key && minVersion == that.minVersion && maxVersion == that.maxVersion;
199+
}
200+
201+
@Override
202+
public int hashCode() {
203+
return Objects.hash(key, minVersion, maxVersion);
204+
}
187205
}
188206

189207
static List<FrameHandlerInfo> commandVersions() {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -481,16 +481,7 @@ public StreamStats queryStreamStats(String stream) {
481481
StreamStatsResponse response =
482482
locatorOperation(
483483
Utils.namedFunction(
484-
client -> {
485-
if (Utils.is3_11_OrMore(client.brokerVersion())) {
486-
return client.streamStats(stream);
487-
} else {
488-
throw new UnsupportedOperationException(
489-
"QueryStringInfo is available only for RabbitMQ 3.11 or more.");
490-
}
491-
},
492-
"Query stream stats on stream '%s'",
493-
stream));
484+
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
494485
if (response.isOk()) {
495486
Map<String, Long> info = response.getInfo();
496487
BiFunction<String, String, LongSupplier> offsetSupplierLogic =

0 commit comments

Comments
 (0)