From cc3a5c3bd3b035f53d3fcb8a15d1785bb3e4828a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 27 Oct 2023 17:02:40 +0200 Subject: [PATCH 1/8] Support super stream creation/deletion --- .../java/com/rabbitmq/stream/Constants.java | 2 + .../java/com/rabbitmq/stream/impl/Client.java | 212 ++++++++++++------ 2 files changed, 143 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java index 99995a0dd1..f3530767dc 100644 --- a/src/main/java/com/rabbitmq/stream/Constants.java +++ b/src/main/java/com/rabbitmq/stream/Constants.java @@ -72,6 +72,8 @@ public final class Constants { public static final short COMMAND_CONSUMER_UPDATE = 26; public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27; public static final short COMMAND_STREAM_STATS = 28; + public static final short COMMAND_CREATE_SUPER_STREAM = 29; + public static final short COMMAND_DELETE_SUPER_STREAM = 30; public static final short VERSION_1 = 1; public static final short VERSION_2 = 2; diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index b443bddf51..86fa36a654 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -22,7 +22,9 @@ import static com.rabbitmq.stream.impl.Utils.noOpConsumer; import static java.lang.String.format; import static java.lang.String.join; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.StreamSupport.stream; import com.rabbitmq.stream.AuthenticationFailureException; import com.rabbitmq.stream.ByteCapacity; @@ -83,16 +85,10 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -127,6 +123,7 @@ */ public class Client implements AutoCloseable { + private static final Charset CHARSET = StandardCharsets.UTF_8; public static final int DEFAULT_PORT = 5552; public static final int DEFAULT_TLS_PORT = 5551; static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK = @@ -446,12 +443,7 @@ int maxFrameSize() { } private Map peerProperties() { - int clientPropertiesSize = 4; // size of the map, always there - if (!clientProperties.isEmpty()) { - for (Map.Entry entry : clientProperties.entrySet()) { - clientPropertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length(); - } - } + int clientPropertiesSize = mapSize(this.clientProperties); int length = 2 + 2 + 4 + clientPropertiesSize; int correlationId = correlationSequence.incrementAndGet(); try { @@ -460,13 +452,7 @@ private Map peerProperties() { bb.writeShort(encodeRequestCode(COMMAND_PEER_PROPERTIES)); bb.writeShort(VERSION_1); bb.writeInt(correlationId); - bb.writeInt(clientProperties.size()); - for (Map.Entry entry : clientProperties.entrySet()) { - bb.writeShort(entry.getKey().length()) - .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8)) - .writeShort(entry.getValue().length()) - .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8)); - } + writeMap(bb, this.clientProperties); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -540,7 +526,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate( bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(saslMechanism.getName().length()); - bb.writeBytes(saslMechanism.getName().getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(saslMechanism.getName().getBytes(CHARSET)); if (challengeResponse == null) { bb.writeInt(-1); } else { @@ -570,7 +556,7 @@ private Map open(String virtualHost) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(virtualHost.length()); - bb.writeBytes(virtualHost.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(virtualHost.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -612,7 +598,7 @@ private void sendClose(short code, String reason) { bb.writeInt(correlationId); bb.writeShort(code); bb.writeShort(reason.length()); - bb.writeBytes(reason.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reason.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -662,10 +648,7 @@ public Response create(String stream) { } public Response create(String stream, Map arguments) { - int length = 2 + 2 + 4 + 2 + stream.length() + 4; - for (Map.Entry argument : arguments.entrySet()) { - length = length + 2 + argument.getKey().length() + 2 + argument.getValue().length(); - } + int length = 2 + 2 + 4 + 2 + stream.length() + mapSize(arguments); int correlationId = correlationSequence.incrementAndGet(); try { ByteBuf bb = allocate(length + 4); @@ -674,14 +657,8 @@ public Response create(String stream, Map arguments) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); - bb.writeInt(arguments.size()); - for (Map.Entry argument : arguments.entrySet()) { - bb.writeShort(argument.getKey().length()); - bb.writeBytes(argument.getKey().getBytes(StandardCharsets.UTF_8)); - bb.writeShort(argument.getValue().length()); - bb.writeBytes(argument.getValue().getBytes(StandardCharsets.UTF_8)); - } + bb.writeBytes(stream.getBytes(CHARSET)); + writeMap(bb, arguments); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -696,6 +673,116 @@ public Response create(String stream, Map arguments) { } } + Response createSuperStream( + String superStream, + List partitions, + List routingKeys, + Map arguments) { + if (partitions.isEmpty() || routingKeys.isEmpty()) { + throw new IllegalArgumentException( + "Partitions and routing keys of a super stream cannot be empty"); + } + if (partitions.size() != routingKeys.size()) { + throw new IllegalArgumentException( + "Partitions and routing keys of a super stream must have " + + "the same number of elements"); + } + int length = + 2 + + 2 + + 4 + + 2 + + superStream.length() + + collectionSize(partitions) + + collectionSize(routingKeys) + + mapSize(arguments); + int correlationId = correlationSequence.incrementAndGet(); + try { + ByteBuf bb = allocate(length + 4); + bb.writeInt(length); + bb.writeShort(encodeRequestCode(COMMAND_CREATE_SUPER_STREAM)); + bb.writeShort(VERSION_1); + bb.writeInt(correlationId); + bb.writeShort(superStream.length()); + bb.writeBytes(superStream.getBytes(CHARSET)); + writeCollection(bb, partitions); + writeCollection(bb, routingKeys); + writeMap(bb, arguments); + OutstandingRequest request = outstandingRequest(); + outstandingRequests.put(correlationId, request); + channel.writeAndFlush(bb); + request.block(); + return request.response.get(); + } catch (StreamException e) { + outstandingRequests.remove(correlationId); + throw e; + } catch (RuntimeException e) { + outstandingRequests.remove(correlationId); + throw new StreamException(format("Error while creating super stream '%s'", superStream), e); + } + } + + Response deleteSuperStream(String superStream) { + int length = 2 + 2 + 4 + 2 + superStream.length(); + int correlationId = correlationSequence.incrementAndGet(); + try { + ByteBuf bb = allocate(length + 4); + bb.writeInt(length); + bb.writeShort(encodeRequestCode(COMMAND_DELETE_SUPER_STREAM)); + bb.writeShort(VERSION_1); + bb.writeInt(correlationId); + bb.writeShort(superStream.length()); + bb.writeBytes(superStream.getBytes(CHARSET)); + OutstandingRequest request = outstandingRequest(); + outstandingRequests.put(correlationId, request); + channel.writeAndFlush(bb); + request.block(); + return request.response.get(); + } catch (StreamException e) { + outstandingRequests.remove(correlationId); + throw e; + } catch (RuntimeException e) { + outstandingRequests.remove(correlationId); + throw new StreamException(format("Error while deleting stream '%s'", superStream), e); + } + } + + private static int collectionSize(Collection elements) { + return 4 + elements.stream().mapToInt(v -> 2 + v.length()).sum(); + } + + private static int arraySize(String... elements) { + return 4 + collectionSize(asList(elements)); + } + + private static int mapSize(Map elements) { + return 4 + + elements.entrySet().stream() + .mapToInt(e -> 2 + e.getKey().length() + 2 + e.getValue().length()) + .sum(); + } + + private static ByteBuf writeCollection(ByteBuf bb, Collection elements) { + bb.writeInt(elements.size()); + elements.forEach(e -> bb.writeShort(e.length()).writeBytes(e.getBytes(CHARSET))); + return bb; + } + + private static ByteBuf writeArray(ByteBuf bb, String... elements) { + return writeCollection(bb, asList(elements)); + } + + private static ByteBuf writeMap(ByteBuf bb, Map elements) { + bb.writeInt(elements.size()); + elements.forEach( + (key, value) -> + bb.writeShort(key.length()) + .writeBytes(key.getBytes(CHARSET)) + .writeShort(value.length()) + .writeBytes(value.getBytes(CHARSET))); + return bb; + } + ByteBuf allocate(ByteBufAllocator allocator, int capacity) { if (frameSizeCopped && capacity > this.maxFrameSize()) { throw new IllegalArgumentException( @@ -729,7 +816,7 @@ public Response delete(String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -748,11 +835,7 @@ public Map metadata(String... streams) { if (streams == null || streams.length == 0) { throw new IllegalArgumentException("At least one stream must be specified"); } - int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, size of array - for (String stream : streams) { - length += 2; - length += stream.length(); - } + int length = 2 + 2 + 4 + arraySize(streams); // API code, version, correlation ID, array size int correlationId = correlationSequence.incrementAndGet(); try { ByteBuf bb = allocate(length + 4); @@ -760,11 +843,7 @@ public Map metadata(String... streams) { bb.writeShort(encodeRequestCode(COMMAND_METADATA)); bb.writeShort(VERSION_1); bb.writeInt(correlationId); - bb.writeInt(streams.length); - for (String stream : streams) { - bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); - } + writeArray(bb, streams); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -800,10 +879,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St bb.writeByte(publisherId); bb.writeShort(publisherReferenceSize); if (publisherReferenceSize > 0) { - bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(publisherReference.getBytes(CHARSET)); } bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1142,10 +1221,7 @@ public Response subscribe( } int propertiesSize = 0; if (properties != null && !properties.isEmpty()) { - propertiesSize = 4; // size of the map - for (Map.Entry entry : properties.entrySet()) { - propertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length(); - } + propertiesSize = mapSize(properties); } length += propertiesSize; int correlationId = correlationSequence.getAndIncrement(); @@ -1157,20 +1233,14 @@ public Response subscribe( bb.writeInt(correlationId); bb.writeByte(subscriptionId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); bb.writeShort(offsetSpecification.getType()); if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) { bb.writeLong(offsetSpecification.getOffset()); } bb.writeShort(initialCredits); if (properties != null && !properties.isEmpty()) { - bb.writeInt(properties.size()); - for (Map.Entry entry : properties.entrySet()) { - bb.writeShort(entry.getKey().length()) - .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8)) - .writeShort(entry.getValue().length()) - .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8)); - } + writeMap(bb, properties); } OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); @@ -1205,9 +1275,9 @@ public void storeOffset(String reference, String stream, long offset) { bb.writeShort(encodeRequestCode(COMMAND_STORE_OFFSET)); bb.writeShort(VERSION_1); bb.writeShort(reference.length()); - bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); bb.writeLong(offset); channel.writeAndFlush(bb); } @@ -1230,9 +1300,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(reference.length()); - bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1271,9 +1341,9 @@ public long queryPublisherSequence(String publisherReference, String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(publisherReference.length()); - bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(publisherReference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1436,9 +1506,9 @@ public List route(String routingKey, String superStream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(routingKey.length()); - bb.writeBytes(routingKey.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(routingKey.getBytes(CHARSET)); bb.writeShort(superStream.length()); - bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1471,7 +1541,7 @@ public List partitions(String superStream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(superStream.length()); - bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1532,7 +1602,7 @@ StreamStatsResponse streamStats(String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); From 7f8afbe2d40013739b896f51d49bee8c6cdad3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 27 Oct 2023 17:40:37 +0200 Subject: [PATCH 2/8] Fix array size calculation --- src/main/java/com/rabbitmq/stream/impl/Client.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 86fa36a654..83e678ebc7 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -752,7 +752,7 @@ private static int collectionSize(Collection elements) { } private static int arraySize(String... elements) { - return 4 + collectionSize(asList(elements)); + return collectionSize(asList(elements)); } private static int mapSize(Map elements) { From b84fc52acd46eb384099e402fb22f2f0848b9f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 7 Nov 2023 17:15:36 +0100 Subject: [PATCH 3/8] Add test for super stream creation/deletion References rabbitmq/rabbitmq-server#9813 --- .github/workflows/test-pr.yml | 2 + .../java/com/rabbitmq/stream/impl/Client.java | 5 + .../stream/impl/ServerFrameHandler.java | 34 +--- src/test/java/com/rabbitmq/stream/Host.java | 18 +- .../rabbitmq/stream/impl/FilteringTest.java | 8 +- .../stream/impl/RoutePartitionsTest.java | 45 ++--- .../rabbitmq/stream/impl/SacClientTest.java | 22 +-- .../impl/SacSuperStreamConsumerTest.java | 25 ++- .../stream/impl/SuperStreamConsumerTest.java | 31 ++- .../impl/SuperStreamManagementTest.java | 176 ++++++++++++++++++ .../stream/impl/SuperStreamProducerTest.java | 32 ++-- .../rabbitmq/stream/impl/SuperStreamTest.java | 30 ++- .../com/rabbitmq/stream/impl/TestUtils.java | 71 +------ 13 files changed, 295 insertions(+), 204 deletions(-) create mode 100644 src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 8885aec068..d7533241b6 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,6 +24,8 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh + env: + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-frames-otp-max-bazel' - name: Test run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 83e678ebc7..c989304dcc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -687,6 +687,7 @@ Response createSuperStream( "Partitions and routing keys of a super stream must have " + "the same number of elements"); } + arguments = arguments == null ? Collections.emptyMap() : arguments; int length = 2 + 2 @@ -831,6 +832,10 @@ public Response delete(String stream) { } } + Map metadata(List streams) { + return this.metadata(streams.toArray(new String[] {})); + } + public Map metadata(String... streams) { if (streams == null || streams.length == 0) { throw new IllegalArgumentException("At least one stream must be specified"); diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index 2b02a475ff..04b45446ea 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -13,37 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.COMMAND_CLOSE; -import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_CREDIT; -import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_DELIVER; -import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS; -import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_OPEN; -import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS; -import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH_CONFIRM; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH_ERROR; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE; -import static com.rabbitmq.stream.Constants.COMMAND_ROUTE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE; -import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS; -import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE; -import static com.rabbitmq.stream.Constants.COMMAND_TUNE; -import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE; -import static com.rabbitmq.stream.Constants.VERSION_1; -import static com.rabbitmq.stream.Constants.VERSION_2; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.impl.Utils.encodeResponseCode; import com.rabbitmq.stream.ChunkChecksum; @@ -128,6 +98,8 @@ class ServerFrameHandler { handlers.put(COMMAND_CONSUMER_UPDATE, new ConsumerUpdateFrameHandler()); handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler()); handlers.put(COMMAND_STREAM_STATS, new StreamStatsFrameHandler()); + handlers.put(COMMAND_CREATE_SUPER_STREAM, RESPONSE_FRAME_HANDLER); + handlers.put(COMMAND_DELETE_SUPER_STREAM, RESPONSE_FRAME_HANDLER); HANDLERS = new FrameHandler[maxCommandKey + 1][]; handlers .entrySet() diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index b5bd0a6934..433685a9bb 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream; import static java.lang.String.format; +import static java.util.Arrays.asList; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; @@ -151,12 +152,24 @@ public static void addUser(String username, String password) throws IOException rabbitmqctl(format("add_user %s %s", username, password)); } + public static void setPermissions(String username, List permissions) throws IOException { + setPermissions(username, "/", permissions); + } + public static void setPermissions(String username, String vhost, String permission) throws IOException { + setPermissions(username, vhost, asList(permission, permission, permission)); + } + + public static void setPermissions(String username, String vhost, List permissions) + throws IOException { + if (permissions.size() != 3) { + throw new IllegalArgumentException(); + } rabbitmqctl( format( "set_permissions --vhost %s %s '%s' '%s' '%s'", - vhost, username, permission, permission, permission)); + vhost, username, permissions.get(0), permissions.get(1), permissions.get(2))); } public static void changePassword(String username, String newPassword) throws IOException { @@ -180,7 +193,8 @@ public static void setEnv(String parameter, String value) throws IOException { } public static String rabbitmqctlCommand() { - String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); + String rabbitmqCtl = "/home/acogoluegnes/prog/rabbitmq/rabbitmq-server/sbin/rabbitmqctl"; + // String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); if (rabbitmqCtl == null) { throw new IllegalStateException("Please define the rabbitmqctl.bin system property"); } diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index fd477345c7..f9c9d9ab97 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -58,6 +58,7 @@ public class FilteringTest { Environment environment; String stream; + ClientFactory cf; @BeforeEach void init() throws Exception { @@ -278,9 +279,9 @@ void superStream(TestInfo info) throws Exception { () -> { String superStream = TestUtils.streamName(info); int partitionCount = 3; - Connection connection = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); try { - TestUtils.declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Producer producer = environment @@ -362,8 +363,7 @@ void superStream(TestInfo info) throws Exception { CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); } finally { - TestUtils.deleteSuperStreamTopology(connection, superStream, partitionCount); - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } }); } diff --git a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java index 5da95d97de..7ca0193fee 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java @@ -36,21 +36,20 @@ public class RoutePartitionsTest { TestUtils.ClientFactory cf; - Connection connection; + Client configurationClient; int partitions = 3; String superStream; @BeforeEach - void init(TestInfo info) throws Exception { - connection = new ConnectionFactory().newConnection(); + void init(TestInfo info) { + configurationClient = cf.get(); superStream = TestUtils.streamName(info); } @AfterEach - void tearDown() throws Exception { - deleteSuperStreamTopology(connection, superStream, partitions); - connection.close(); + void tearDown() { + deleteSuperStreamTopology(configurationClient, superStream); } @Test @@ -64,8 +63,8 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() { } @Test - void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception { - declareSuperStreamTopology(connection, superStream, partitions); + void routeShouldReturnNullWhenNoStreamForRoutingKey() { + declareSuperStreamTopology(configurationClient, superStream, partitions); Client client = cf.get(); assertThat(client.route("0", superStream)).hasSize(1).contains(superStream + "-0"); @@ -73,16 +72,14 @@ void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception { } @Test - void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception { - declareSuperStreamTopology(connection, superStream, 0); - + void partitionsShouldReturnEmptyListWhenSuperStreamDoesNotExist() { Client client = cf.get(); assertThat(client.partitions(superStream)).isEmpty(); } @Test - void routeTopologyWithPartitionCount() throws Exception { - declareSuperStreamTopology(connection, superStream, 3); + void routeTopologyWithPartitionCount() { + declareSuperStreamTopology(configurationClient, superStream, 3); Client client = cf.get(); List streams = client.partitions(superStream); @@ -97,8 +94,10 @@ void routeTopologyWithPartitionCount() throws Exception { @Test void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception { - declareSuperStreamTopology(connection, superStream, 3); - connection.createChannel().queueBind(superStream + "-1", superStream, "0"); + declareSuperStreamTopology(configurationClient, superStream, 3); + try (Connection connection = new ConnectionFactory().newConnection()) { + connection.createChannel().queueBind(superStream + "-1", superStream, "0"); + } Client client = cf.get(); List streams = client.partitions(superStream); assertThat(streams) @@ -114,10 +113,12 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception @Test void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { - declareSuperStreamTopology(connection, superStream, 3); - Channel channel = connection.createChannel(); - String nonStreamQueue = channel.queueDeclare().getQueue(); - connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream"); + declareSuperStreamTopology(configurationClient, superStream, 3); + try (Connection connection = new ConnectionFactory().newConnection()) { + Channel channel = connection.createChannel(); + String nonStreamQueue = channel.queueDeclare().getQueue(); + connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream"); + } Client client = cf.get(); List streams = client.partitions(superStream); assertThat(streams) @@ -131,9 +132,9 @@ void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { } @Test - void partitionsReturnsCorrectOrder() throws Exception { + void partitionsReturnsCorrectOrder() { String[] partitionNames = {"z", "y", "x"}; - declareSuperStreamTopology(connection, superStream, partitionNames); + declareSuperStreamTopology(configurationClient, superStream, partitionNames); try { Client client = cf.get(); List streams = client.partitions(superStream); @@ -142,7 +143,7 @@ void partitionsReturnsCorrectOrder() throws Exception { .containsSequence( Arrays.stream(partitionNames).map(p -> superStream + "-" + p).toArray(String[]::new)); } finally { - deleteSuperStreamTopology(connection, superStream, partitionNames); + deleteSuperStreamTopology(configurationClient, superStream); } } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java index 42125fc8dd..a3879d0e13 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java @@ -26,8 +26,6 @@ import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Host; import com.rabbitmq.stream.OffsetSpecification; @@ -230,9 +228,9 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info) Map receivedMessages = receivedMessages(2); String superStream = streamName(info); String consumerName = "foo"; - Connection c = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); try { - TestUtils.declareSuperStreamTopology(c, superStream, 3); + declareSuperStreamTopology(configurationClient, superStream, 3); // working with the second partition String partition = superStream + "-1"; @@ -330,8 +328,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info) .isEqualTo(messageCount * 2); } finally { - TestUtils.deleteSuperStreamTopology(c, superStream, 3); - c.close(); + deleteSuperStreamTopology(configurationClient, superStream); } } @@ -340,11 +337,11 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr Map consumerStates = consumerStates(3 * 3); String superStream = streamName(info); String consumerName = "foo"; - Connection c = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); // subscription distribution // client 1: 0, 1, 2 / client 2: 3, 4, 5, / client 3: 6, 7, 8 try { - declareSuperStreamTopology(c, superStream, 3); + declareSuperStreamTopology(configurationClient, superStream, 3); List partitions = IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(toList()); ConsumerUpdateListener consumerUpdateListener = @@ -422,7 +419,7 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr client.set(client3); partitions.forEach(unsubscriptionCallback); } finally { - deleteSuperStreamTopology(c, superStream, 3); + deleteSuperStreamTopology(configurationClient, superStream); } } @@ -485,10 +482,10 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep Map consumerStates = consumerStates(3 * 3); String superStream = streamName(info); String consumerName = "foo"; - Connection c = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); AtomicBoolean keepPublishing = new AtomicBoolean(true); try { - declareSuperStreamTopology(c, superStream, 3); + declareSuperStreamTopology(configurationClient, superStream, 3); // we use the second partition because a rebalancing occurs // when the second consumer joins String partitionInUse = superStream + "-1"; @@ -584,8 +581,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep assertThat(response).is(ok()); } finally { keepPublishing.set(false); - deleteSuperStreamTopology(c, superStream, 3); - c.close(); + deleteSuperStreamTopology(configurationClient, superStream); } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java index c37c5857b9..d00f59bef5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java @@ -19,8 +19,6 @@ import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.ConsumerUpdateListener; import com.rabbitmq.stream.Environment; @@ -56,26 +54,25 @@ public class SacSuperStreamConsumerTest { Environment environment; - Connection connection; + Client configurationClient; int partitionCount = 3; String superStream; TestUtils.ClientFactory cf; @BeforeEach - void init(TestInfo info) throws Exception { + void init(TestInfo info) { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); superStream = TestUtils.streamName(info); - connection = new ConnectionFactory().newConnection(); - declareSuperStreamTopology(connection, superStream, partitionCount); + configurationClient = cf.get(); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); } @AfterEach - void tearDown() throws Exception { + void tearDown() { environment.close(); - deleteSuperStreamTopology(connection, superStream, partitionCount); - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } @Test @@ -171,7 +168,7 @@ void sacShouldSpreadAcrossPartitions() throws Exception { @Test void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 5_000; AtomicInteger messageWaveCount = new AtomicInteger(); int superConsumerCount = 3; @@ -357,7 +354,7 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { @Test void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 5_000; int storeEvery = messageCount / 100; AtomicInteger messageWaveCount = new AtomicInteger(); @@ -561,7 +558,7 @@ void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { @Test void sacExternalOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 5_000; AtomicInteger messageWaveCount = new AtomicInteger(); int superConsumerCount = 3; @@ -731,8 +728,8 @@ void sacExternalOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { } @Test - void consumerGroupsOnSameSuperStreamShouldBeIndependent() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void consumerGroupsOnSameSuperStreamShouldBeIndependent() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 20_000; int consumerGroupsCount = 3; List consumerNames = diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java index 55a92dbabd..319d664feb 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java @@ -23,8 +23,6 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; @@ -53,7 +51,7 @@ public class SuperStreamConsumerTest { Environment environment; - Connection connection; + Client configurationClient; int partitionCount = 3; String superStream; String[] routingKeys = null; @@ -65,18 +63,13 @@ void init(TestInfo info) throws Exception { Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); superStream = TestUtils.streamName(info); - connection = new ConnectionFactory().newConnection(); + configurationClient = cf.get(); } @AfterEach void tearDown() throws Exception { environment.close(); - if (routingKeys == null) { - deleteSuperStreamTopology(connection, superStream, partitionCount); - } else { - deleteSuperStreamTopology(connection, superStream, routingKeys); - } - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } private static void publishToPartitions( @@ -101,8 +94,8 @@ private static void publishToPartitions( } @Test - void consumeAllMessagesFromAllPartitions() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void consumeAllMessagesFromAllPartitions() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10000 * partitionCount; @@ -133,8 +126,8 @@ void consumeAllMessagesFromAllPartitions() throws Exception { } @Test - void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void manualOffsetTrackingShouldStoreOnAllPartitions() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10000 * partitionCount; @@ -191,8 +184,8 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { } @Test - void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void autoOffsetTrackingShouldStoreOnAllPartitions() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10000 * partitionCount; @@ -247,8 +240,8 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { } @Test - void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void autoOffsetTrackingShouldStoreOffsetZero() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = partitionCount; @@ -305,7 +298,7 @@ void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { @BrokerVersionAtLeast(RABBITMQ_3_11_11) void rebalancedPartitionShouldGetMessagesWhenItComesBackToOriginalConsumerInstance() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10_000; diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java new file mode 100644 index 0000000000..3a7f94a2ee --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java @@ -0,0 +1,176 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.Constants.*; +import static com.rabbitmq.stream.Host.*; +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*; +import static com.rabbitmq.stream.impl.TestUtils.streamName; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.OffsetSpecification; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@TestUtils.DisabledIfRabbitMqCtlNotSet +public class SuperStreamManagementTest { + + TestUtils.ClientFactory cf; + static final int partitionCount = 3; + String s; + List partitions; + List routingKeys; + + @BeforeEach + void init(TestInfo info) { + s = streamName(info); + partitions = partitions(s); + routingKeys = routingKeys(); + } + + @Test + void createDelete() { + Client c = cf.get(); + Client.Response response = c.createSuperStream(s, partitions, routingKeys, null); + assertThat(response).is(ok()); + assertThat(c.metadata(partitions)) + .hasSameSizeAs(partitions) + .allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue()); + assertThat(c.partitions(s)).isEqualTo(partitions); + routingKeys.forEach(rk -> assertThat(c.route(rk, s)).hasSize(1).contains(s + "-" + rk)); + + response = c.createSuperStream(s, partitions, routingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_STREAM_ALREADY_EXISTS)); + + response = c.deleteSuperStream(s); + assertThat(response).is(ok()); + assertThat(c.metadata(partitions)) + .hasSameSizeAs(partitions) + .allSatisfy( + (s, streamMetadata) -> + assertThat(streamMetadata.getResponseCode()) + .isEqualTo(RESPONSE_CODE_STREAM_DOES_NOT_EXIST)); + assertThat(c.partitions(s)).isEmpty(); + routingKeys.forEach(rk -> assertThat(c.route(rk, s)).isEmpty()); + + response = c.deleteSuperStream(s); + assertThat(response).is(responseCode(RESPONSE_CODE_STREAM_DOES_NOT_EXIST)); + } + + @Test + void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception { + Client c = cf.get(); + Client.Response response = c.createSuperStream(s, partitions, routingKeys, null); + assertThat(response).is(ok()); + Map notifications = new ConcurrentHashMap<>(partitions.size()); + AtomicInteger notificationCount = new AtomicInteger(); + partitions.forEach( + p -> { + Client client = + cf.get( + new Client.ClientParameters() + .metadataListener( + (stream, code) -> { + notifications.put(stream, code); + notificationCount.incrementAndGet(); + })); + Client.Response r = client.subscribe((byte) 0, p, OffsetSpecification.first(), 1); + assertThat(r).is(ok()); + }); + + response = c.deleteSuperStream(s); + assertThat(response).is(ok()); + waitAtMost(() -> notificationCount.get() == partitionCount); + assertThat(notifications).hasSize(partitionCount).containsOnlyKeys(partitions); + assertThat(notifications.values()).containsOnly(RESPONSE_CODE_STREAM_NOT_AVAILABLE); + } + + @Test + void authorisation() throws Exception { + String user = "stream"; + // routing keys do not matter for authorisation + routingKeys = asList("1", "2", "3"); + try { + addUser(user, user); + setPermissions(user, asList("stream|partition.*$", "partition.*$", "stream.*$")); + Client c = cf.get(new Client.ClientParameters().username(user).password(user)); + Client.Response response = c.createSuperStream("not-allowed", partitions, routingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); + + s = name("stream"); + response = c.createSuperStream(s, asList("1", "2", "3"), routingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); + + partitions = range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList()); + // we can create the queues, but can't bind them, as it requires write permission + response = c.createSuperStream(s, partitions, routingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); + + String partitionName = name("partition"); + partitions = + range(0, partitionCount).mapToObj(i -> partitionName + "-" + i).collect(toList()); + response = c.createSuperStream(s, partitions, routingKeys, null); + assertThat(response).is(ok()); + + assertThat(c.metadata(partitions)) + .hasSameSizeAs(partitions) + .allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue()); + assertThat(c.partitions(s)).isEqualTo(partitions); + for (int i = 0; i < routingKeys.size(); i++) { + String rk = routingKeys.get(i); + assertThat(c.route(rk, s)).hasSize(1).contains(partitions.get(i)); + } + + response = c.deleteSuperStream(s); + assertThat(response).is(ok()); + } finally { + deleteUser(user); + } + } + + private static List routingKeys() { + return routingKeys(partitionCount); + } + + private static List routingKeys(int partitions) { + return range(0, partitions).mapToObj(String::valueOf).collect(toList()); + } + + private static List partitions(String superStream) { + return partitions(superStream, partitionCount); + } + + private static List partitions(String superStream, int partitions) { + return range(0, partitions).mapToObj(i -> superStream + "-" + i).collect(toList()); + } + + private static String name(String value) { + String uuid = UUID.randomUUID().toString(); + return String.format( + "%s-%s%s", + SuperStreamManagementTest.class.getSimpleName(), value, uuid.substring(uuid.length() / 2)); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index 75ff916bd2..e8fc7250ee 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -20,8 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; @@ -50,30 +48,25 @@ public class SuperStreamProducerTest { Environment environment; - Connection connection; + Client configurationClient; int partitions = 3; String superStream; String[] routingKeys = null; TestUtils.ClientFactory cf; @BeforeEach - void init(TestInfo info) throws Exception { + void init(TestInfo info) { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); - connection = new ConnectionFactory().newConnection(); superStream = TestUtils.streamName(info); + configurationClient = cf.get(); } @AfterEach void tearDown() throws Exception { environment.close(); - if (routingKeys == null) { - deleteSuperStreamTopology(connection, superStream, partitions); - } else { - deleteSuperStreamTopology(connection, superStream, routingKeys); - } - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } @Test @@ -85,7 +78,7 @@ void exceptionShouldBeThrownWhenSuperStreamIsSetAndRoutingIsNotConfigured() { @Test void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); Producer producer = environment .producerBuilder() @@ -140,7 +133,7 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; routingKeys = new String[] {"amer", "emea", "apac"}; - declareSuperStreamTopology(connection, superStream, routingKeys); + declareSuperStreamTopology(configurationClient, superStream, routingKeys); Producer producer = environment .producerBuilder() @@ -190,9 +183,9 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr } @Test - void messageIsNackedIfNoRouteFound() throws Exception { + void messageIsNackedIfNoRouteFound() { routingKeys = new String[] {"amer", "emea", "apac"}; - declareSuperStreamTopology(connection, superStream, routingKeys); + declareSuperStreamTopology(configurationClient, superStream, routingKeys); Producer producer = environment .producerBuilder() @@ -226,7 +219,7 @@ void messageIsNackedIfNoRouteFound() throws Exception { @Test void getLastPublishingIdShouldReturnLowestValue() throws Exception { int messageCount = 10_000; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); String producerName = "super-stream-application"; Producer producer = environment @@ -292,9 +285,9 @@ void getLastPublishingIdShouldReturnLowestValue() throws Exception { } @Test - void producerShouldNotPublishMessagesOnceClosed() throws Exception { + void producerShouldNotPublishMessagesOnceClosed() { int messageCount = 100; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); String producerName = "super-stream-application"; Producer producer = environment @@ -331,8 +324,7 @@ void producerShouldNotPublishMessagesOnceClosed() throws Exception { } @Test - void producerCreationShouldFailIfNoPartition() throws Exception { - declareSuperStreamTopology(connection, superStream, 0); + void producerCreationShouldFailIfNoPartition() { String producerName = "super-stream-application"; assertThatThrownBy( () -> { diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index 04b15df47e..20395539b2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -18,8 +18,6 @@ import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; @@ -53,35 +51,31 @@ public class SuperStreamTest { Environment environment; - Connection connection; + TestUtils.ClientFactory cf; + Client configurationClient; int partitions = 3; String superStream; String[] routingKeys = null; @BeforeEach - void init(TestInfo info) throws Exception { + void init(TestInfo info) { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); - connection = new ConnectionFactory().newConnection(); + configurationClient = cf.get(); superStream = TestUtils.streamName(info); } @AfterEach - void tearDown() throws Exception { + void tearDown() { environment.close(); - if (routingKeys == null) { - deleteSuperStreamTopology(connection, superStream, partitions); - } else { - deleteSuperStreamTopology(connection, superStream, routingKeys); - } - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } @Test - void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { + void allMessagesSentWithHashRoutingShouldBeThenConsumed() { int messageCount = 10_000 * partitions; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); Producer producer = environment .producerBuilder() @@ -123,10 +117,10 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { } @Test - void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { + void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() { int messageCount = 10_000 * partitions; routingKeys = new String[] {"amer", "emea", "apac"}; - declareSuperStreamTopology(connection, superStream, routingKeys); + declareSuperStreamTopology(configurationClient, superStream, routingKeys); Producer producer = environment .producerBuilder() @@ -170,10 +164,10 @@ void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception @Test @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) - void allMessagesForSameUserShouldEndUpInSamePartition() throws Exception { + void allMessagesForSameUserShouldEndUpInSamePartition() { int messageCount = 10_000 * partitions; int userCount = 10; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); AtomicInteger totalReceivedCount = new AtomicInteger(0); // . => count diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 5dafd5bf94..29704ca6b2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -15,13 +15,11 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import ch.qos.logback.classic.Level; -import com.rabbitmq.client.BuiltinExchangeType; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; import com.rabbitmq.stream.Address; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Host; @@ -34,8 +32,6 @@ import com.rabbitmq.stream.impl.Client.StreamMetadata; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.vavr.Tuple; -import io.vavr.Tuple2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -49,13 +45,7 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -281,62 +271,21 @@ static void doIfNotNull(T obj, Consumer action) { } } - static void declareSuperStreamTopology(Connection connection, String superStream, int partitions) - throws Exception { + static void declareSuperStreamTopology(Client client, String superStream, int partitions) { declareSuperStreamTopology( - connection, + client, superStream, IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new)); } - static void declareSuperStreamTopology(Connection connection, String superStream, String... rks) - throws Exception { - try (Channel ch = connection.createChannel()) { - ch.exchangeDeclare( - superStream, - BuiltinExchangeType.DIRECT, - true, - false, - Collections.singletonMap("x-super-stream", true)); - - List> bindings = new ArrayList<>(rks.length); - for (int i = 0; i < rks.length; i++) { - bindings.add(Tuple.of(rks[i], i)); - } - // shuffle the order to make sure we get in the correct order from the server - Collections.shuffle(bindings); - - for (Tuple2 binding : bindings) { - String routingKey = binding._1(); - String partitionName = superStream + "-" + routingKey; - ch.queueDeclare( - partitionName, true, false, false, Collections.singletonMap("x-queue-type", "stream")); - ch.queueBind( - partitionName, - superStream, - routingKey, - Collections.singletonMap("x-stream-partition-order", binding._2())); - } - } - } - - static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions) - throws Exception { - deleteSuperStreamTopology( - connection, - superStream, - IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new)); + static void declareSuperStreamTopology(Client client, String superStream, String... rks) { + List partitions = + Arrays.stream(rks).map(rk -> superStream + "-" + rk).collect(toList()); + client.createSuperStream(superStream, partitions, Arrays.asList(rks), null); } - static void deleteSuperStreamTopology( - Connection connection, String superStream, String... routingKeys) throws Exception { - try (Channel ch = connection.createChannel()) { - ch.exchangeDelete(superStream); - for (String routingKey : routingKeys) { - String partitionName = superStream + "-" + routingKey; - ch.queueDelete(partitionName); - } - } + static void deleteSuperStreamTopology(Client client, String superStream) { + client.deleteSuperStream(superStream); } public static String streamName(TestInfo info) { From a2f6b57873b34a19dc6ca260de4d4b71e5c6ce30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 7 Nov 2023 17:36:04 +0100 Subject: [PATCH 4/8] Fix rabbitmqctl command --- src/test/java/com/rabbitmq/stream/Host.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index 433685a9bb..4f0564c091 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -193,8 +193,7 @@ public static void setEnv(String parameter, String value) throws IOException { } public static String rabbitmqctlCommand() { - String rabbitmqCtl = "/home/acogoluegnes/prog/rabbitmq/rabbitmq-server/sbin/rabbitmqctl"; - // String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); + String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); if (rabbitmqCtl == null) { throw new IllegalStateException("Please define the rabbitmqctl.bin system property"); } From 9308d71135e8010a88b6df27f6023686af4ddf0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 8 Nov 2023 15:35:19 +0100 Subject: [PATCH 5/8] Fall back to AMQP to create super stream topology If RabbitMQ < 3.13. For tests. --- .../impl/SuperStreamManagementTest.java | 3 + .../com/rabbitmq/stream/impl/TestUtils.java | 71 +++++++++++++++++-- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java index 3a7f94a2ee..6bd9b33ae7 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java @@ -52,6 +52,7 @@ void init(TestInfo info) { } @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) void createDelete() { Client c = cf.get(); Client.Response response = c.createSuperStream(s, partitions, routingKeys, null); @@ -81,6 +82,7 @@ void createDelete() { } @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception { Client c = cf.get(); Client.Response response = c.createSuperStream(s, partitions, routingKeys, null); @@ -109,6 +111,7 @@ void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exceptio } @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) void authorisation() throws Exception { String user = "stream"; // routing keys do not matter for authorisation diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 29704ca6b2..b65638d63a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -13,13 +13,19 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static io.vavr.Tuple.*; import static java.lang.String.format; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import ch.qos.logback.classic.Level; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Address; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Host; @@ -32,6 +38,7 @@ import com.rabbitmq.stream.impl.Client.StreamMetadata; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.vavr.Tuple2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -46,11 +53,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.*; @@ -81,6 +84,8 @@ public final class TestUtils { private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10); + private static final ConnectionFactory AMQP_CF = new ConnectionFactory(); + private TestUtils() {} public static Duration waitAtMost(CallableBooleanSupplier condition) throws Exception { @@ -281,11 +286,59 @@ static void declareSuperStreamTopology(Client client, String superStream, int pa static void declareSuperStreamTopology(Client client, String superStream, String... rks) { List partitions = Arrays.stream(rks).map(rk -> superStream + "-" + rk).collect(toList()); - client.createSuperStream(superStream, partitions, Arrays.asList(rks), null); + if (atLeastVersion("3.13.0", client.brokerVersion())) { + client.createSuperStream(superStream, partitions, asList(rks), null); + } else { + try (Connection connection = connection(); + Channel ch = connection.createChannel()) { + ch.exchangeDeclare( + superStream, + BuiltinExchangeType.DIRECT, + true, + false, + Collections.singletonMap("x-super-stream", true)); + List> bindings = new ArrayList<>(rks.length); + for (int i = 0; i < rks.length; i++) { + bindings.add(of(rks[i], i)); + } + // shuffle the order to make sure we get in the correct order from the server + Collections.shuffle(bindings); + + for (Tuple2 binding : bindings) { + String routingKey = binding._1(); + String partitionName = superStream + "-" + routingKey; + ch.queueDeclare( + partitionName, + true, + false, + false, + Collections.singletonMap("x-queue-type", "stream")); + ch.queueBind( + partitionName, + superStream, + routingKey, + Collections.singletonMap("x-stream-partition-order", binding._2())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } } static void deleteSuperStreamTopology(Client client, String superStream) { - client.deleteSuperStream(superStream); + if (atLeastVersion("3.13.0", client.brokerVersion())) { + client.deleteSuperStream(superStream); + } else { + try (Connection connection = connection(); + Channel ch = connection.createChannel()) { + ch.exchangeDelete(superStream); + for (String partition : client.partitions(superStream)) { + ch.queueDelete(partition); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } } public static String streamName(TestInfo info) { @@ -1034,4 +1087,8 @@ static void repeatIfFailure(RunnableWithException test) throws Exception { throw (Exception) lastException; } } + + private static Connection connection() throws IOException, TimeoutException { + return AMQP_CF.newConnection(); + } } From fa1d13349374cfbc55785a0827a58d50d9b47dc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 9 Nov 2023 15:50:16 +0100 Subject: [PATCH 6/8] Support super streams in StreamCreator --- src/docs/asciidoc/super-streams.adoc | 45 ++++++- .../java/com/rabbitmq/stream/Constants.java | 2 +- .../java/com/rabbitmq/stream/Environment.java | 12 +- .../com/rabbitmq/stream/StreamCreator.java | 60 ++++++++- .../stream/impl/StreamEnvironment.java | 16 +++ .../stream/impl/StreamStreamCreator.java | 115 ++++++++++++++++-- src/test/java/com/rabbitmq/stream/Host.java | 2 +- .../stream/docs/SuperStreamUsage.java | 16 +++ .../com/rabbitmq/stream/impl/ClientTest.java | 12 +- .../stream/impl/RoutePartitionsTest.java | 2 +- .../rabbitmq/stream/impl/SacClientTest.java | 2 +- .../stream/impl/StreamConsumerTest.java | 2 +- .../stream/impl/StreamEnvironmentTest.java | 64 +++++++++- .../rabbitmq/stream/impl/SuperStreamTest.java | 2 +- .../com/rabbitmq/stream/impl/TestUtils.java | 2 +- 15 files changed, 317 insertions(+), 37 deletions(-) diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc index c7cede605a..9d073abc94 100644 --- a/src/docs/asciidoc/super-streams.adoc +++ b/src/docs/asciidoc/super-streams.adoc @@ -58,14 +58,51 @@ When a super stream is in use, the stream Java client queries this information t From the application code point of view, using a super stream is mostly configuration-based. Some logic must also be provided to extract routing information from messages. -==== Super Stream Creation +==== Super Stream Creation and Deletion -It is possible to create the topology of a super stream with any AMQP 0.9.1 library or with the https://www.rabbitmq.com/management.html[management plugin], but the `rabbitmq-streams add_super_stream` command is a handy shortcut. -Here is how to create an invoices super stream with 3 partitions: +It is possible to manage super streams with + +* the stream Java client, by using `Environment#streamCreator()` and `Environment#deleteSuperStream(String)` +* the `add_super_stream` and `delete_super_stream` commands in `rabbitmq-streams` (CLI) +* any AMQP 0.9.1 client library +* the https://www.rabbitmq.com/management.html[management plugin] + +The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings). + +===== With the Client Library + +Here is how to create an `invoices` super stream with 5 partitions: + +.Creating a super stream by specifying the number of partitions +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions] +-------- + +The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`. +We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to. +This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order. + +It is also possible to specify routing keys when creating a super stream: + +.Creating a super stream by specifying the routing keys +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=creation-routing-keys] +-------- + +The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case. + +Using one type of topology or the other depends on the use cases, especially how messages are processed. +See the next sections on publishing and consuming to find out more. + +===== With the CLI + +Here is how to create an `invoices` super stream with 5 partitions: .Creating a super stream from the CLI ---- -rabbitmq-streams add_super_stream invoices --partitions 3 +rabbitmq-streams add_super_stream invoices --partitions 5 ---- Use `rabbitmq-streams add_super_stream --help` to learn more about the command. diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java index f3530767dc..dd1f448b8e 100644 --- a/src/main/java/com/rabbitmq/stream/Constants.java +++ b/src/main/java/com/rabbitmq/stream/Constants.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/main/java/com/rabbitmq/stream/Environment.java b/src/main/java/com/rabbitmq/stream/Environment.java index eb8185a119..7963c835ec 100644 --- a/src/main/java/com/rabbitmq/stream/Environment.java +++ b/src/main/java/com/rabbitmq/stream/Environment.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -58,9 +58,19 @@ static EnvironmentBuilder builder() { * Delete a stream * * @param stream the stream to delete + * @since 0.15.0 */ void deleteStream(String stream); + /** + * Delete a super stream. + * + *

Requires RabbitMQ 3.13.0 or more. + * + * @param superStream the super stream to delete + */ + void deleteSuperStream(String superStream); + /** * Query statistics on a stream. * diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index ad205b5998..2e5bf73351 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -23,13 +23,24 @@ public interface StreamCreator { ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB"); /** - * The name of the stream + * The name of the stream. + * + *

Alias for {@link #name(String)}. * * @param stream * @return this creator instance */ StreamCreator stream(String stream); + /** + * The name of the (super) stream. + * + * @param name + * @return this creator instance + * @since 0.15.0 + */ + StreamCreator name(String name); + /** * The maximum size of the stream before it gets truncated. * @@ -80,6 +91,16 @@ public interface StreamCreator { */ StreamCreator filterSize(int size); + /** + * Configure the super stream to create. + * + *

Requires RabbitMQ 3.13.0 or more. + * + * @return the super stream configuration + * @since 0.15.0 + */ + SuperStreamConfiguration superStream(); + /** * Create the stream. * @@ -142,4 +163,39 @@ public String value() { return this.value; } } + + /** + * Super stream configuration. + * + * @since 0.15.0 + */ + interface SuperStreamConfiguration { + + /** + * The number of partitions of the super stream. + * + *

Mutually exclusive with {@link #routingKeys(String...)}. Default is 3. + * + * @param partitions + * @return this super stream configuration instance + */ + SuperStreamConfiguration partitions(int partitions); + + /** + * The routing keys to use when declaring the super stream partitions. + * + *

Mutually exclusive with {@link #partitions(int)}. Default is null. + * + * @param routingKeys + * @return this super stream configuration instance + */ + SuperStreamConfiguration routingKeys(String... routingKeys); + + /** + * Go back to the creator. + * + * @return the stream creator + */ + StreamCreator creator(); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 1c190546d2..476da5781d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -460,6 +460,22 @@ public void deleteStream(String stream) { } } + @Override + public void deleteSuperStream(String superStream) { + checkNotClosed(); + this.maybeInitializeLocator(); + Client.Response response = this.locator().deleteSuperStream(superStream); + if (!response.isOk()) { + throw new StreamException( + "Error while deleting super stream " + + superStream + + " (" + + formatConstant(response.getResponseCode()) + + ")", + response.getResponseCode()); + } + } + @Override public StreamStats queryStreamStats(String stream) { checkNotClosed(); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index ed48565ec1..7e1146f3ca 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -15,19 +15,25 @@ import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedFunction; +import static java.util.stream.Collectors.toList; import com.rabbitmq.stream.ByteCapacity; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.IntStream; class StreamStreamCreator implements StreamCreator { private final StreamEnvironment environment; private final Client.StreamParametersBuilder streamParametersBuilder = new Client.StreamParametersBuilder().leaderLocator(LeaderLocator.LEAST_LEADERS); - private String stream; + private String name; + private DefaultSuperStreamConfiguration superStreamConfiguration; StreamStreamCreator(StreamEnvironment environment) { this.environment = environment; @@ -35,7 +41,13 @@ class StreamStreamCreator implements StreamCreator { @Override public StreamCreator stream(String stream) { - this.stream = stream; + this.name = stream; + return this; + } + + @Override + public StreamCreator name(String name) { + this.name = name; return this; } @@ -73,27 +85,104 @@ public StreamCreator filterSize(int size) { return this; } + @Override + public SuperStreamConfiguration superStream() { + if (this.superStreamConfiguration == null) { + this.superStreamConfiguration = new DefaultSuperStreamConfiguration(this); + } + return this.superStreamConfiguration; + } + @Override public void create() { - if (stream == null) { - throw new IllegalArgumentException("Stream cannot be null"); + if (name == null) { + throw new IllegalArgumentException("Stream name cannot be null"); + } + Function function; + boolean superStream = this.superStreamConfiguration != null; + if (superStream) { + List partitions, routingKeys; + if (this.superStreamConfiguration.routingKeys == null) { + partitions = + IntStream.range(0, this.superStreamConfiguration.partitions) + .mapToObj(i -> this.name + "-" + i) + .collect(toList()); + routingKeys = + IntStream.range(0, this.superStreamConfiguration.partitions) + .mapToObj(String::valueOf) + .collect(toList()); + } else { + partitions = + this.superStreamConfiguration.routingKeys.stream() + .map(rk -> this.name + "-" + rk) + .collect(toList()); + routingKeys = this.superStreamConfiguration.routingKeys; + } + function = + namedFunction( + c -> + c.createSuperStream( + this.name, partitions, routingKeys, streamParametersBuilder.build()), + "Creation of super stream '%s'", + this.name); + } else { + function = + namedFunction( + c -> c.create(name, streamParametersBuilder.build()), + "Creation of stream '%s'", + this.name); } this.environment.maybeInitializeLocator(); - Client.Response response = - environment.locatorOperation( - namedFunction( - c -> c.create(stream, streamParametersBuilder.build()), - "Creation of stream '%s'", - this.stream)); + Client.Response response = environment.locatorOperation(function); if (!response.isOk() && response.getResponseCode() != Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) { + String label = superStream ? "super stream" : "stream"; throw new StreamException( - "Error while creating stream '" - + stream + "Error while creating " + + label + + " '" + + name + "' (" + formatConstant(response.getResponseCode()) + ")", response.getResponseCode()); } } + + private static class DefaultSuperStreamConfiguration implements SuperStreamConfiguration { + + private final StreamCreator creator; + + private int partitions = 3; + private List routingKeys = null; + + private DefaultSuperStreamConfiguration(StreamCreator creator) { + this.creator = creator; + } + + @Override + public SuperStreamConfiguration partitions(int partitions) { + if (partitions <= 0) { + throw new IllegalArgumentException("The number of partitions must be greater than 0"); + } + this.partitions = partitions; + this.routingKeys = null; + return this; + } + + @Override + public SuperStreamConfiguration routingKeys(String... routingKeys) { + if (routingKeys == null || routingKeys.length == 0) { + throw new IllegalArgumentException("There must be at least 1 routing key"); + } + this.routingKeys = Arrays.asList(routingKeys); + this.partitions = -1; + return this; + } + + @Override + public StreamCreator creator() { + return this.creator; + } + } } diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index 4f0564c091..80fdd05ae8 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java index ca2765fdbb..25da030e7a 100644 --- a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java @@ -26,6 +26,22 @@ public class SuperStreamUsage { + void creation() { + Environment environment = Environment.builder().build(); + // tag::creation-partitions[] + environment.streamCreator().name("invoices") + .superStream() + .partitions(5).creator() + .create(); + // end::creation-partitions[] + // tag::creation-routing-keys[] + environment.streamCreator().name("invoices") + .superStream() + .routingKeys("amer", "emea", "apac").creator() + .create(); + // end::creation-routing-keys[] + } + void producerSimple() { Environment environment = Environment.builder().build(); // tag::producer-simple[] diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 4b068e5cb7..24736a8612 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -844,7 +844,7 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void exchangeCommandVersions() { Client client = cf.get(); List infos = client.exchangeCommandVersions(); @@ -853,7 +853,7 @@ void exchangeCommandVersions() { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception { int publishCount = 20_000; byte correlationId = 42; @@ -887,7 +887,7 @@ void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { int publishCount = 20_000; CountDownLatch latch = new CountDownLatch(publishCount); @@ -923,14 +923,14 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamStatsShouldReturnErrorWhenStreamDoesNotExist() { assertThat(cf.get().streamStats("does not exist").getResponseCode()) .isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST); } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) throws Exception { // this test is flaky in some CI environments, so we have to retry it repeatIfFailure( diff --git a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java index 7ca0193fee..863e9019e1 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java index a3879d0e13..1ef940a89c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2022-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 3bb03d6810..0781b6a01b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -132,7 +132,7 @@ void nameShouldBeSetIfTrackingStrategyIsSet() { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void committedOffsetShouldBeSet() throws Exception { int messageCount = 20_000; publishAndWaitForConfirms(cf, messageCount, this.stream); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 0203c8f536..01f884f579 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -495,6 +495,62 @@ void streamCreationShouldBeIdempotent(TestInfo info) { } } + @ParameterizedTest + @ValueSource(ints = {3, 5}) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) + void superStreamCreationSetPartitions(int partitionCount, TestInfo info) { + int defaultPartitionCount = 3; + String s = streamName(info); + Client client = cf.get(); + Environment env = environmentBuilder.build(); + try { + StreamCreator.SuperStreamConfiguration configuration = + env.streamCreator().name(s).superStream(); + if (partitionCount != defaultPartitionCount) { + configuration.partitions(partitionCount); + } + configuration.creator().create(); + + assertThat(client.partitions(s)) + .hasSize(partitionCount) + .containsAll( + IntStream.range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList())); + IntStream.range(0, partitionCount) + .forEach( + i -> assertThat(client.route(String.valueOf(i), s)).hasSize(1).contains(s + "-" + i)); + } finally { + env.deleteSuperStream(s); + env.close(); + assertThat(client.partitions(s)).isEmpty(); + } + } + + @Test + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) + void superStreamCreationSetRoutingKeys(TestInfo info) { + List routingKeys = Arrays.asList("a", "b", "c", "d", "e"); + String s = streamName(info); + Client client = cf.get(); + Environment env = environmentBuilder.build(); + try { + env.streamCreator() + .name(s) + .superStream() + .routingKeys(routingKeys.toArray(new String[] {})) + .creator() + .create(); + + assertThat(client.partitions(s)) + .hasSize(routingKeys.size()) + .containsAll(routingKeys.stream().map(rk -> s + "-" + rk).collect(toList())); + routingKeys.forEach(rk -> assertThat(client.route(rk, s)).hasSize(1).contains(s + "-" + rk)); + } finally { + env.deleteSuperStream(s); + env.close(); + assertThat(client.partitions(s)).isEmpty(); + } + } + @Test void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() { String dummyHost = UUID.randomUUID().toString(); @@ -556,7 +612,7 @@ void createPublishConsumeDelete(boolean lazyInit, TestInfo info) { @ParameterizedTest @ValueSource(booleans = {true, false}) - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) throws Exception { try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) { @@ -589,7 +645,7 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() { try (Environment env = environmentBuilder.build()) { assertThatThrownBy(() -> env.queryStreamStats("does not exist")) @@ -599,7 +655,7 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() { @ParameterizedTest @ValueSource(booleans = {true, false}) - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamExists(boolean lazyInit) { AtomicBoolean metadataCalled = new AtomicBoolean(false); Function clientFactory = @@ -623,7 +679,7 @@ public Map metadata(String... streams) { @ParameterizedTest @ValueSource(booleans = {true, false}) - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamExistsMetadataDataFallback(boolean lazyInit) { AtomicInteger metadataCallCount = new AtomicInteger(0); Function clientFactory = diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index 20395539b2..8d50a4aa89 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -163,7 +163,7 @@ void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void allMessagesForSameUserShouldEndUpInSamePartition() { int messageCount = 10_000 * partitions; int userCount = 10; diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index b65638d63a..3130745921 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1018,7 +1018,7 @@ static void waitMs(long waitTime) { public @interface SingleActiveConsumer {} public enum BrokerVersion { - RABBITMQ_3_11("3.11.0"), + RABBITMQ_3_11_0("3.11.0"), RABBITMQ_3_11_7("3.11.7"), RABBITMQ_3_11_9("3.11.9"), RABBITMQ_3_11_11("3.11.11"), From 6868fb3a03480a43cb2074ebd468a822f1135606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 9 Nov 2023 16:11:14 +0100 Subject: [PATCH 7/8] Check super stream management commands are supported --- .../java/com/rabbitmq/stream/impl/Client.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index c989304dcc..7d760b5ce8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -189,8 +189,9 @@ public long applyAsLong(Object value) { private final Duration rpcTimeout; private final List saslMechanisms; private volatile ShutdownReason shutdownReason = null; - private final Runnable exchangeCommandVersionsCheck; + private final Runnable streamStatsCommandVersionsCheck; private final boolean filteringSupported; + private final Runnable superStreamManagementCommandVersionsCheck; public Client() { this(new ClientParameters()); @@ -376,25 +377,36 @@ public void initChannel(SocketChannel ch) { tuneState.getHeartbeat()); this.connectionProperties = open(parameters.virtualHost); Set supportedCommands = maybeExchangeCommandVersions(); - AtomicReference exchangeCommandVersionsCheckReference = new AtomicReference<>(); + AtomicBoolean streamStatsSupported = new AtomicBoolean(false); AtomicBoolean filteringSupportedReference = new AtomicBoolean(false); + AtomicBoolean superStreamManagementSupported = new AtomicBoolean(false); supportedCommands.forEach( c -> { if (c.getKey() == COMMAND_STREAM_STATS) { - exchangeCommandVersionsCheckReference.set(() -> {}); + streamStatsSupported.set(true); } if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) { filteringSupportedReference.set(true); } + if (c.getKey() == COMMAND_CREATE_SUPER_STREAM) { + superStreamManagementSupported.set(true); + } }); - this.exchangeCommandVersionsCheck = - exchangeCommandVersionsCheckReference.get() == null - ? () -> { + this.streamStatsCommandVersionsCheck = + streamStatsSupported.get() + ? () -> {} + : () -> { throw new UnsupportedOperationException( "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); - } - : exchangeCommandVersionsCheckReference.get(); + }; this.filteringSupported = filteringSupportedReference.get(); + this.superStreamManagementCommandVersionsCheck = + superStreamManagementSupported.get() + ? () -> {} + : () -> { + throw new UnsupportedOperationException( + "Super stream management is available only on RabbitMQ 3.13 or more."); + }; started.set(true); this.metricsCollector.openConnection(); } catch (RuntimeException e) { @@ -678,6 +690,7 @@ Response createSuperStream( List partitions, List routingKeys, Map arguments) { + this.superStreamManagementCommandVersionsCheck.run(); if (partitions.isEmpty() || routingKeys.isEmpty()) { throw new IllegalArgumentException( "Partitions and routing keys of a super stream cannot be empty"); @@ -724,6 +737,7 @@ Response createSuperStream( } Response deleteSuperStream(String superStream) { + this.superStreamManagementCommandVersionsCheck.run(); int length = 2 + 2 + 4 + 2 + superStream.length(); int correlationId = correlationSequence.incrementAndGet(); try { @@ -1594,7 +1608,7 @@ List exchangeCommandVersions() { } StreamStatsResponse streamStats(String stream) { - this.exchangeCommandVersionsCheck.run(); + this.streamStatsCommandVersionsCheck.run(); if (stream == null) { throw new IllegalArgumentException("stream must not be null"); } From 57376dac7109f5bc086420cf44bdce6a6c5d3538 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 13 Nov 2023 14:51:55 +0100 Subject: [PATCH 8/8] Use "binding key" lingo for super stream creation Instead of "routing key". --- src/docs/asciidoc/super-streams.adoc | 6 +-- .../com/rabbitmq/stream/StreamCreator.java | 8 ++-- .../java/com/rabbitmq/stream/impl/Client.java | 10 ++--- .../stream/impl/StreamStreamCreator.java | 24 ++++++------ .../stream/docs/SuperStreamUsage.java | 6 +-- .../stream/impl/StreamEnvironmentTest.java | 12 +++--- .../impl/SuperStreamManagementTest.java | 38 +++++++++---------- 7 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc index 9d073abc94..e866d50562 100644 --- a/src/docs/asciidoc/super-streams.adoc +++ b/src/docs/asciidoc/super-streams.adoc @@ -83,12 +83,12 @@ The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5 We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to. This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order. -It is also possible to specify routing keys when creating a super stream: +It is also possible to specify binding keys when creating a super stream: -.Creating a super stream by specifying the routing keys +.Creating a super stream by specifying the binding keys [source,java,indent=0] -------- -include::{test-examples}/SuperStreamUsage.java[tag=creation-routing-keys] +include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys] -------- The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case. diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index 2e5bf73351..5d6a753d8d 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -174,7 +174,7 @@ interface SuperStreamConfiguration { /** * The number of partitions of the super stream. * - *

Mutually exclusive with {@link #routingKeys(String...)}. Default is 3. + *

Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3. * * @param partitions * @return this super stream configuration instance @@ -182,14 +182,14 @@ interface SuperStreamConfiguration { SuperStreamConfiguration partitions(int partitions); /** - * The routing keys to use when declaring the super stream partitions. + * The binding keys to use when declaring the super stream partitions. * *

Mutually exclusive with {@link #partitions(int)}. Default is null. * - * @param routingKeys + * @param bindingKeys * @return this super stream configuration instance */ - SuperStreamConfiguration routingKeys(String... routingKeys); + SuperStreamConfiguration bindingKeys(String... bindingKeys); /** * Go back to the creator. diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 7d760b5ce8..5d83866bb9 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -688,14 +688,14 @@ public Response create(String stream, Map arguments) { Response createSuperStream( String superStream, List partitions, - List routingKeys, + List bindingKeys, Map arguments) { this.superStreamManagementCommandVersionsCheck.run(); - if (partitions.isEmpty() || routingKeys.isEmpty()) { + if (partitions.isEmpty() || bindingKeys.isEmpty()) { throw new IllegalArgumentException( "Partitions and routing keys of a super stream cannot be empty"); } - if (partitions.size() != routingKeys.size()) { + if (partitions.size() != bindingKeys.size()) { throw new IllegalArgumentException( "Partitions and routing keys of a super stream must have " + "the same number of elements"); @@ -708,7 +708,7 @@ Response createSuperStream( + 2 + superStream.length() + collectionSize(partitions) - + collectionSize(routingKeys) + + collectionSize(bindingKeys) + mapSize(arguments); int correlationId = correlationSequence.incrementAndGet(); try { @@ -720,7 +720,7 @@ Response createSuperStream( bb.writeShort(superStream.length()); bb.writeBytes(superStream.getBytes(CHARSET)); writeCollection(bb, partitions); - writeCollection(bb, routingKeys); + writeCollection(bb, bindingKeys); writeMap(bb, arguments); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index 7e1146f3ca..264e858d44 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -101,28 +101,28 @@ public void create() { Function function; boolean superStream = this.superStreamConfiguration != null; if (superStream) { - List partitions, routingKeys; - if (this.superStreamConfiguration.routingKeys == null) { + List partitions, bindingKeys; + if (this.superStreamConfiguration.bindingKeys == null) { partitions = IntStream.range(0, this.superStreamConfiguration.partitions) .mapToObj(i -> this.name + "-" + i) .collect(toList()); - routingKeys = + bindingKeys = IntStream.range(0, this.superStreamConfiguration.partitions) .mapToObj(String::valueOf) .collect(toList()); } else { partitions = - this.superStreamConfiguration.routingKeys.stream() + this.superStreamConfiguration.bindingKeys.stream() .map(rk -> this.name + "-" + rk) .collect(toList()); - routingKeys = this.superStreamConfiguration.routingKeys; + bindingKeys = this.superStreamConfiguration.bindingKeys; } function = namedFunction( c -> c.createSuperStream( - this.name, partitions, routingKeys, streamParametersBuilder.build()), + this.name, partitions, bindingKeys, streamParametersBuilder.build()), "Creation of super stream '%s'", this.name); } else { @@ -154,7 +154,7 @@ private static class DefaultSuperStreamConfiguration implements SuperStreamConfi private final StreamCreator creator; private int partitions = 3; - private List routingKeys = null; + private List bindingKeys = null; private DefaultSuperStreamConfiguration(StreamCreator creator) { this.creator = creator; @@ -166,16 +166,16 @@ public SuperStreamConfiguration partitions(int partitions) { throw new IllegalArgumentException("The number of partitions must be greater than 0"); } this.partitions = partitions; - this.routingKeys = null; + this.bindingKeys = null; return this; } @Override - public SuperStreamConfiguration routingKeys(String... routingKeys) { - if (routingKeys == null || routingKeys.length == 0) { - throw new IllegalArgumentException("There must be at least 1 routing key"); + public SuperStreamConfiguration bindingKeys(String... bindingKeys) { + if (bindingKeys == null || bindingKeys.length == 0) { + throw new IllegalArgumentException("There must be at least 1 binding key"); } - this.routingKeys = Arrays.asList(routingKeys); + this.bindingKeys = Arrays.asList(bindingKeys); this.partitions = -1; return this; } diff --git a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java index 25da030e7a..f221893a90 100644 --- a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java @@ -34,12 +34,12 @@ void creation() { .partitions(5).creator() .create(); // end::creation-partitions[] - // tag::creation-routing-keys[] + // tag::creation-binding-keys[] environment.streamCreator().name("invoices") .superStream() - .routingKeys("amer", "emea", "apac").creator() + .bindingKeys("amer", "emea", "apac").creator() .create(); - // end::creation-routing-keys[] + // end::creation-binding-keys[] } void producerSimple() { diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 01f884f579..fb5e848616 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -527,8 +527,8 @@ void superStreamCreationSetPartitions(int partitionCount, TestInfo info) { @Test @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) - void superStreamCreationSetRoutingKeys(TestInfo info) { - List routingKeys = Arrays.asList("a", "b", "c", "d", "e"); + void superStreamCreationSetBindingKeys(TestInfo info) { + List bindingKeys = Arrays.asList("a", "b", "c", "d", "e"); String s = streamName(info); Client client = cf.get(); Environment env = environmentBuilder.build(); @@ -536,14 +536,14 @@ void superStreamCreationSetRoutingKeys(TestInfo info) { env.streamCreator() .name(s) .superStream() - .routingKeys(routingKeys.toArray(new String[] {})) + .bindingKeys(bindingKeys.toArray(new String[] {})) .creator() .create(); assertThat(client.partitions(s)) - .hasSize(routingKeys.size()) - .containsAll(routingKeys.stream().map(rk -> s + "-" + rk).collect(toList())); - routingKeys.forEach(rk -> assertThat(client.route(rk, s)).hasSize(1).contains(s + "-" + rk)); + .hasSize(bindingKeys.size()) + .containsAll(bindingKeys.stream().map(rk -> s + "-" + rk).collect(toList())); + bindingKeys.forEach(bk -> assertThat(client.route(bk, s)).hasSize(1).contains(s + "-" + bk)); } finally { env.deleteSuperStream(s); env.close(); diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java index 6bd9b33ae7..cba1234b48 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java @@ -42,28 +42,28 @@ public class SuperStreamManagementTest { static final int partitionCount = 3; String s; List partitions; - List routingKeys; + List bindingKeys; @BeforeEach void init(TestInfo info) { s = streamName(info); partitions = partitions(s); - routingKeys = routingKeys(); + bindingKeys = bindingKeys(); } @Test @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) void createDelete() { Client c = cf.get(); - Client.Response response = c.createSuperStream(s, partitions, routingKeys, null); + Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null); assertThat(response).is(ok()); assertThat(c.metadata(partitions)) .hasSameSizeAs(partitions) .allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue()); assertThat(c.partitions(s)).isEqualTo(partitions); - routingKeys.forEach(rk -> assertThat(c.route(rk, s)).hasSize(1).contains(s + "-" + rk)); + bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).hasSize(1).contains(s + "-" + bk)); - response = c.createSuperStream(s, partitions, routingKeys, null); + response = c.createSuperStream(s, partitions, bindingKeys, null); assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_STREAM_ALREADY_EXISTS)); response = c.deleteSuperStream(s); @@ -75,7 +75,7 @@ void createDelete() { assertThat(streamMetadata.getResponseCode()) .isEqualTo(RESPONSE_CODE_STREAM_DOES_NOT_EXIST)); assertThat(c.partitions(s)).isEmpty(); - routingKeys.forEach(rk -> assertThat(c.route(rk, s)).isEmpty()); + bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).isEmpty()); response = c.deleteSuperStream(s); assertThat(response).is(responseCode(RESPONSE_CODE_STREAM_DOES_NOT_EXIST)); @@ -85,7 +85,7 @@ void createDelete() { @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception { Client c = cf.get(); - Client.Response response = c.createSuperStream(s, partitions, routingKeys, null); + Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null); assertThat(response).is(ok()); Map notifications = new ConcurrentHashMap<>(partitions.size()); AtomicInteger notificationCount = new AtomicInteger(); @@ -114,37 +114,37 @@ void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exceptio @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) void authorisation() throws Exception { String user = "stream"; - // routing keys do not matter for authorisation - routingKeys = asList("1", "2", "3"); + // binding keys do not matter for authorisation + bindingKeys = asList("1", "2", "3"); try { addUser(user, user); setPermissions(user, asList("stream|partition.*$", "partition.*$", "stream.*$")); Client c = cf.get(new Client.ClientParameters().username(user).password(user)); - Client.Response response = c.createSuperStream("not-allowed", partitions, routingKeys, null); + Client.Response response = c.createSuperStream("not-allowed", partitions, bindingKeys, null); assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); s = name("stream"); - response = c.createSuperStream(s, asList("1", "2", "3"), routingKeys, null); + response = c.createSuperStream(s, asList("1", "2", "3"), bindingKeys, null); assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); partitions = range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList()); // we can create the queues, but can't bind them, as it requires write permission - response = c.createSuperStream(s, partitions, routingKeys, null); + response = c.createSuperStream(s, partitions, bindingKeys, null); assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); String partitionName = name("partition"); partitions = range(0, partitionCount).mapToObj(i -> partitionName + "-" + i).collect(toList()); - response = c.createSuperStream(s, partitions, routingKeys, null); + response = c.createSuperStream(s, partitions, bindingKeys, null); assertThat(response).is(ok()); assertThat(c.metadata(partitions)) .hasSameSizeAs(partitions) .allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue()); assertThat(c.partitions(s)).isEqualTo(partitions); - for (int i = 0; i < routingKeys.size(); i++) { - String rk = routingKeys.get(i); - assertThat(c.route(rk, s)).hasSize(1).contains(partitions.get(i)); + for (int i = 0; i < bindingKeys.size(); i++) { + String bk = bindingKeys.get(i); + assertThat(c.route(bk, s)).hasSize(1).contains(partitions.get(i)); } response = c.deleteSuperStream(s); @@ -154,11 +154,11 @@ void authorisation() throws Exception { } } - private static List routingKeys() { - return routingKeys(partitionCount); + private static List bindingKeys() { + return bindingKeys(partitionCount); } - private static List routingKeys(int partitions) { + private static List bindingKeys(int partitions) { return range(0, partitions).mapToObj(String::valueOf).collect(toList()); }