Skip to content

Commit a55f94f

Browse files
committed
Check super stream management commands are supported
1 parent 983080a commit a55f94f

File tree

1 file changed

+23
-9
lines changed

1 file changed

+23
-9
lines changed

Diff for: src/main/java/com/rabbitmq/stream/impl/Client.java

+23-9
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,9 @@ public long applyAsLong(Object value) {
189189
private final Duration rpcTimeout;
190190
private final List<String> saslMechanisms;
191191
private volatile ShutdownReason shutdownReason = null;
192-
private final Runnable exchangeCommandVersionsCheck;
192+
private final Runnable streamStatsCommandVersionsCheck;
193193
private final boolean filteringSupported;
194+
private final Runnable superStreamManagementCommandVersionsCheck;
194195

195196
public Client() {
196197
this(new ClientParameters());
@@ -376,25 +377,36 @@ public void initChannel(SocketChannel ch) {
376377
tuneState.getHeartbeat());
377378
this.connectionProperties = open(parameters.virtualHost);
378379
Set<FrameHandlerInfo> supportedCommands = maybeExchangeCommandVersions();
379-
AtomicReference<Runnable> exchangeCommandVersionsCheckReference = new AtomicReference<>();
380+
AtomicBoolean streamStatsSupported = new AtomicBoolean(false);
380381
AtomicBoolean filteringSupportedReference = new AtomicBoolean(false);
382+
AtomicBoolean superStreamManagementSupported = new AtomicBoolean(false);
381383
supportedCommands.forEach(
382384
c -> {
383385
if (c.getKey() == COMMAND_STREAM_STATS) {
384-
exchangeCommandVersionsCheckReference.set(() -> {});
386+
streamStatsSupported.set(true);
385387
}
386388
if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) {
387389
filteringSupportedReference.set(true);
388390
}
391+
if (c.getKey() == COMMAND_CREATE_SUPER_STREAM) {
392+
superStreamManagementSupported.set(true);
393+
}
389394
});
390-
this.exchangeCommandVersionsCheck =
391-
exchangeCommandVersionsCheckReference.get() == null
392-
? () -> {
395+
this.streamStatsCommandVersionsCheck =
396+
streamStatsSupported.get()
397+
? () -> {}
398+
: () -> {
393399
throw new UnsupportedOperationException(
394400
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
395-
}
396-
: exchangeCommandVersionsCheckReference.get();
401+
};
397402
this.filteringSupported = filteringSupportedReference.get();
403+
this.superStreamManagementCommandVersionsCheck =
404+
superStreamManagementSupported.get()
405+
? () -> {}
406+
: () -> {
407+
throw new UnsupportedOperationException(
408+
"Super stream management is available only on RabbitMQ 3.13 or more.");
409+
};
398410
started.set(true);
399411
this.metricsCollector.openConnection();
400412
} catch (RuntimeException e) {
@@ -678,6 +690,7 @@ Response createSuperStream(
678690
List<String> partitions,
679691
List<String> routingKeys,
680692
Map<String, String> arguments) {
693+
this.superStreamManagementCommandVersionsCheck.run();
681694
if (partitions.isEmpty() || routingKeys.isEmpty()) {
682695
throw new IllegalArgumentException(
683696
"Partitions and routing keys of a super stream cannot be empty");
@@ -724,6 +737,7 @@ Response createSuperStream(
724737
}
725738

726739
Response deleteSuperStream(String superStream) {
740+
this.superStreamManagementCommandVersionsCheck.run();
727741
int length = 2 + 2 + 4 + 2 + superStream.length();
728742
int correlationId = correlationSequence.incrementAndGet();
729743
try {
@@ -1594,7 +1608,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
15941608
}
15951609

15961610
StreamStatsResponse streamStats(String stream) {
1597-
this.exchangeCommandVersionsCheck.run();
1611+
this.streamStatsCommandVersionsCheck.run();
15981612
if (stream == null) {
15991613
throw new IllegalArgumentException("stream must not be null");
16001614
}

0 commit comments

Comments
 (0)