diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml
index 8885aec068..d7533241b6 100644
--- a/.github/workflows/test-pr.yml
+++ b/.github/workflows/test-pr.yml
@@ -24,6 +24,8 @@ jobs:
cache: 'maven'
- name: Start broker
run: ci/start-broker.sh
+ env:
+ RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-frames-otp-max-bazel'
- name: Test
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc
index c7cede605a..e866d50562 100644
--- a/src/docs/asciidoc/super-streams.adoc
+++ b/src/docs/asciidoc/super-streams.adoc
@@ -58,14 +58,51 @@ When a super stream is in use, the stream Java client queries this information t
From the application code point of view, using a super stream is mostly configuration-based.
Some logic must also be provided to extract routing information from messages.
-==== Super Stream Creation
+==== Super Stream Creation and Deletion
-It is possible to create the topology of a super stream with any AMQP 0.9.1 library or with the https://www.rabbitmq.com/management.html[management plugin], but the `rabbitmq-streams add_super_stream` command is a handy shortcut.
-Here is how to create an invoices super stream with 3 partitions:
+It is possible to manage super streams with
+
+* the stream Java client, by using `Environment#streamCreator()` and `Environment#deleteSuperStream(String)`
+* the `add_super_stream` and `delete_super_stream` commands in `rabbitmq-streams` (CLI)
+* any AMQP 0.9.1 client library
+* the https://www.rabbitmq.com/management.html[management plugin]
+
+The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings).
+
+===== With the Client Library
+
+Here is how to create an `invoices` super stream with 5 partitions:
+
+.Creating a super stream by specifying the number of partitions
+[source,java,indent=0]
+--------
+include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions]
+--------
+
+The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`.
+We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to.
+This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order.
+
+It is also possible to specify binding keys when creating a super stream:
+
+.Creating a super stream by specifying the binding keys
+[source,java,indent=0]
+--------
+include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys]
+--------
+
+The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case.
+
+Using one type of topology or the other depends on the use cases, especially how messages are processed.
+See the next sections on publishing and consuming to find out more.
+
+===== With the CLI
+
+Here is how to create an `invoices` super stream with 5 partitions:
.Creating a super stream from the CLI
----
-rabbitmq-streams add_super_stream invoices --partitions 3
+rabbitmq-streams add_super_stream invoices --partitions 5
----
Use `rabbitmq-streams add_super_stream --help` to learn more about the command.
diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java
index 99995a0dd1..dd1f448b8e 100644
--- a/src/main/java/com/rabbitmq/stream/Constants.java
+++ b/src/main/java/com/rabbitmq/stream/Constants.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
+// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -72,6 +72,8 @@ public final class Constants {
public static final short COMMAND_CONSUMER_UPDATE = 26;
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
public static final short COMMAND_STREAM_STATS = 28;
+ public static final short COMMAND_CREATE_SUPER_STREAM = 29;
+ public static final short COMMAND_DELETE_SUPER_STREAM = 30;
public static final short VERSION_1 = 1;
public static final short VERSION_2 = 2;
diff --git a/src/main/java/com/rabbitmq/stream/Environment.java b/src/main/java/com/rabbitmq/stream/Environment.java
index eb8185a119..7963c835ec 100644
--- a/src/main/java/com/rabbitmq/stream/Environment.java
+++ b/src/main/java/com/rabbitmq/stream/Environment.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
+// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -58,9 +58,19 @@ static EnvironmentBuilder builder() {
* Delete a stream
*
* @param stream the stream to delete
+ * @since 0.15.0
*/
void deleteStream(String stream);
+ /**
+ * Delete a super stream.
+ *
+ *
Requires RabbitMQ 3.13.0 or more.
+ *
+ * @param superStream the super stream to delete
+ */
+ void deleteSuperStream(String superStream);
+
/**
* Query statistics on a stream.
*
diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java
index ad205b5998..5d6a753d8d 100644
--- a/src/main/java/com/rabbitmq/stream/StreamCreator.java
+++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
+// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -23,13 +23,24 @@ public interface StreamCreator {
ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB");
/**
- * The name of the stream
+ * The name of the stream.
+ *
+ *
Alias for {@link #name(String)}.
*
* @param stream
* @return this creator instance
*/
StreamCreator stream(String stream);
+ /**
+ * The name of the (super) stream.
+ *
+ * @param name
+ * @return this creator instance
+ * @since 0.15.0
+ */
+ StreamCreator name(String name);
+
/**
* The maximum size of the stream before it gets truncated.
*
@@ -80,6 +91,16 @@ public interface StreamCreator {
*/
StreamCreator filterSize(int size);
+ /**
+ * Configure the super stream to create.
+ *
+ *
Requires RabbitMQ 3.13.0 or more.
+ *
+ * @return the super stream configuration
+ * @since 0.15.0
+ */
+ SuperStreamConfiguration superStream();
+
/**
* Create the stream.
*
@@ -142,4 +163,39 @@ public String value() {
return this.value;
}
}
+
+ /**
+ * Super stream configuration.
+ *
+ * @since 0.15.0
+ */
+ interface SuperStreamConfiguration {
+
+ /**
+ * The number of partitions of the super stream.
+ *
+ *
Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3.
+ *
+ * @param partitions
+ * @return this super stream configuration instance
+ */
+ SuperStreamConfiguration partitions(int partitions);
+
+ /**
+ * The binding keys to use when declaring the super stream partitions.
+ *
+ *
Mutually exclusive with {@link #partitions(int)}. Default is null.
+ *
+ * @param bindingKeys
+ * @return this super stream configuration instance
+ */
+ SuperStreamConfiguration bindingKeys(String... bindingKeys);
+
+ /**
+ * Go back to the creator.
+ *
+ * @return the stream creator
+ */
+ StreamCreator creator();
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index b443bddf51..5d83866bb9 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -22,7 +22,9 @@
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
import static java.lang.String.format;
import static java.lang.String.join;
+import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.StreamSupport.stream;
import com.rabbitmq.stream.AuthenticationFailureException;
import com.rabbitmq.stream.ByteCapacity;
@@ -83,16 +85,10 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -127,6 +123,7 @@
*/
public class Client implements AutoCloseable {
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
public static final int DEFAULT_PORT = 5552;
public static final int DEFAULT_TLS_PORT = 5551;
static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK =
@@ -192,8 +189,9 @@ public long applyAsLong(Object value) {
private final Duration rpcTimeout;
private final List saslMechanisms;
private volatile ShutdownReason shutdownReason = null;
- private final Runnable exchangeCommandVersionsCheck;
+ private final Runnable streamStatsCommandVersionsCheck;
private final boolean filteringSupported;
+ private final Runnable superStreamManagementCommandVersionsCheck;
public Client() {
this(new ClientParameters());
@@ -379,25 +377,36 @@ public void initChannel(SocketChannel ch) {
tuneState.getHeartbeat());
this.connectionProperties = open(parameters.virtualHost);
Set supportedCommands = maybeExchangeCommandVersions();
- AtomicReference exchangeCommandVersionsCheckReference = new AtomicReference<>();
+ AtomicBoolean streamStatsSupported = new AtomicBoolean(false);
AtomicBoolean filteringSupportedReference = new AtomicBoolean(false);
+ AtomicBoolean superStreamManagementSupported = new AtomicBoolean(false);
supportedCommands.forEach(
c -> {
if (c.getKey() == COMMAND_STREAM_STATS) {
- exchangeCommandVersionsCheckReference.set(() -> {});
+ streamStatsSupported.set(true);
}
if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) {
filteringSupportedReference.set(true);
}
+ if (c.getKey() == COMMAND_CREATE_SUPER_STREAM) {
+ superStreamManagementSupported.set(true);
+ }
});
- this.exchangeCommandVersionsCheck =
- exchangeCommandVersionsCheckReference.get() == null
- ? () -> {
+ this.streamStatsCommandVersionsCheck =
+ streamStatsSupported.get()
+ ? () -> {}
+ : () -> {
throw new UnsupportedOperationException(
"QueryStreamInfo is available only on RabbitMQ 3.11 or more.");
- }
- : exchangeCommandVersionsCheckReference.get();
+ };
this.filteringSupported = filteringSupportedReference.get();
+ this.superStreamManagementCommandVersionsCheck =
+ superStreamManagementSupported.get()
+ ? () -> {}
+ : () -> {
+ throw new UnsupportedOperationException(
+ "Super stream management is available only on RabbitMQ 3.13 or more.");
+ };
started.set(true);
this.metricsCollector.openConnection();
} catch (RuntimeException e) {
@@ -446,12 +455,7 @@ int maxFrameSize() {
}
private Map peerProperties() {
- int clientPropertiesSize = 4; // size of the map, always there
- if (!clientProperties.isEmpty()) {
- for (Map.Entry entry : clientProperties.entrySet()) {
- clientPropertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length();
- }
- }
+ int clientPropertiesSize = mapSize(this.clientProperties);
int length = 2 + 2 + 4 + clientPropertiesSize;
int correlationId = correlationSequence.incrementAndGet();
try {
@@ -460,13 +464,7 @@ private Map peerProperties() {
bb.writeShort(encodeRequestCode(COMMAND_PEER_PROPERTIES));
bb.writeShort(VERSION_1);
bb.writeInt(correlationId);
- bb.writeInt(clientProperties.size());
- for (Map.Entry entry : clientProperties.entrySet()) {
- bb.writeShort(entry.getKey().length())
- .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8))
- .writeShort(entry.getValue().length())
- .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8));
- }
+ writeMap(bb, this.clientProperties);
OutstandingRequest