Skip to content

Commit 26fc974

Browse files
authored
Merge pull request #276 from rabbitmq/exchange-command-versions-in-connection
Exchange command versions in connection
2 parents 2616345 + 13f4f46 commit 26fc974

File tree

4 files changed

+52
-21
lines changed

4 files changed

+52
-21
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+33
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,11 @@
112112
import java.util.ArrayList;
113113
import java.util.Collections;
114114
import java.util.HashMap;
115+
import java.util.HashSet;
115116
import java.util.List;
116117
import java.util.Map;
117118
import java.util.Objects;
119+
import java.util.Set;
118120
import java.util.concurrent.ConcurrentHashMap;
119121
import java.util.concurrent.ConcurrentMap;
120122
import java.util.concurrent.CopyOnWriteArrayList;
@@ -210,6 +212,7 @@ public long applyAsLong(Object value) {
210212
private final Map<String, String> connectionProperties;
211213
private final Duration rpcTimeout;
212214
private volatile ShutdownReason shutdownReason = null;
215+
private final Runnable exchangeCommandVersionsCheck;
213216

214217
public Client() {
215218
this(new ClientParameters());
@@ -373,6 +376,19 @@ public void write(
373376
this.maxFrameSize(),
374377
tuneState.getHeartbeat());
375378
this.connectionProperties = open(parameters.virtualHost);
379+
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
380+
if (supportedCommands.stream()
381+
.filter(i -> i.getKey() == COMMAND_STREAM_STATS)
382+
.findAny()
383+
.isPresent()) {
384+
this.exchangeCommandVersionsCheck = () -> {};
385+
} else {
386+
this.exchangeCommandVersionsCheck =
387+
() -> {
388+
throw new UnsupportedOperationException(
389+
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
390+
};
391+
}
376392
started.set(true);
377393
this.metricsCollector.openConnection();
378394
} catch (RuntimeException e) {
@@ -1455,6 +1471,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
14551471
}
14561472

14571473
StreamStatsResponse streamStats(String stream) {
1474+
this.exchangeCommandVersionsCheck.run();
14581475
if (stream == null) {
14591476
throw new IllegalArgumentException("stream must not be null");
14601477
}
@@ -1548,6 +1565,22 @@ private EncodedMessageBatch createEncodedMessageBatch(Compression compression, i
15481565
channel.alloc(), compression.code(), compressionCodecFactory.get(compression), batchSize);
15491566
}
15501567

1568+
private Set<FrameHandlerInfo> maybeExchangeCommandVersions() {
1569+
Set<FrameHandlerInfo> supported = new HashSet<>();
1570+
try {
1571+
if (Utils.is3_11_OrMore(brokerVersion())) {
1572+
for (FrameHandlerInfo info : exchangeCommandVersions()) {
1573+
if (info.getKey() == COMMAND_STREAM_STATS) {
1574+
supported.add(info);
1575+
}
1576+
}
1577+
}
1578+
} catch (Exception e) {
1579+
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
1580+
}
1581+
return supported;
1582+
}
1583+
15511584
public interface OutboundEntityMappingCallback {
15521585

15531586
void handle(long publishingId, Object originalMessageOrBatch);

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

-11
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
729729
.key(name);
730730
this.client = clientFactory.client(clientFactoryContext);
731731
LOGGER.debug("Created consumer connection '{}'", connectionName);
732-
maybeExchangeCommandVersions(client);
733732
clientInitializedInManager.set(true);
734733
}
735734

@@ -1149,16 +1148,6 @@ public String toString() {
11491148
}
11501149
}
11511150

1152-
private static void maybeExchangeCommandVersions(Client client) {
1153-
try {
1154-
if (Utils.is3_11_OrMore(client.brokerVersion())) {
1155-
client.exchangeCommandVersions();
1156-
}
1157-
} catch (Exception e) {
1158-
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
1159-
}
1160-
}
1161-
11621151
private static final Predicate<Exception> RETRY_ON_TIMEOUT =
11631152
e -> e instanceof TimeoutStreamException;
11641153

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

+18
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.LinkedHashMap;
8484
import java.util.List;
8585
import java.util.Map;
86+
import java.util.Objects;
8687
import java.util.concurrent.ConcurrentMap;
8788
import java.util.concurrent.atomic.AtomicBoolean;
8889
import org.slf4j.Logger;
@@ -184,6 +185,23 @@ public String toString() {
184185
+ maxVersion
185186
+ '}';
186187
}
188+
189+
@Override
190+
public boolean equals(Object o) {
191+
if (this == o) {
192+
return true;
193+
}
194+
if (o == null || getClass() != o.getClass()) {
195+
return false;
196+
}
197+
FrameHandlerInfo that = (FrameHandlerInfo) o;
198+
return key == that.key && minVersion == that.minVersion && maxVersion == that.maxVersion;
199+
}
200+
201+
@Override
202+
public int hashCode() {
203+
return Objects.hash(key, minVersion, maxVersion);
204+
}
187205
}
188206

189207
static List<FrameHandlerInfo> commandVersions() {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -481,16 +481,7 @@ public StreamStats queryStreamStats(String stream) {
481481
StreamStatsResponse response =
482482
locatorOperation(
483483
Utils.namedFunction(
484-
client -> {
485-
if (Utils.is3_11_OrMore(client.brokerVersion())) {
486-
return client.streamStats(stream);
487-
} else {
488-
throw new UnsupportedOperationException(
489-
"QueryStringInfo is available only for RabbitMQ 3.11 or more.");
490-
}
491-
},
492-
"Query stream stats on stream '%s'",
493-
stream));
484+
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
494485
if (response.isOk()) {
495486
Map<String, Long> info = response.getInfo();
496487
BiFunction<String, String, LongSupplier> offsetSupplierLogic =

0 commit comments

Comments
 (0)