diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 8885aec068..d7533241b6 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,6 +24,8 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh + env: + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-frames-otp-max-bazel' - name: Test run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc index c7cede605a..e866d50562 100644 --- a/src/docs/asciidoc/super-streams.adoc +++ b/src/docs/asciidoc/super-streams.adoc @@ -58,14 +58,51 @@ When a super stream is in use, the stream Java client queries this information t From the application code point of view, using a super stream is mostly configuration-based. Some logic must also be provided to extract routing information from messages. -==== Super Stream Creation +==== Super Stream Creation and Deletion -It is possible to create the topology of a super stream with any AMQP 0.9.1 library or with the https://www.rabbitmq.com/management.html[management plugin], but the `rabbitmq-streams add_super_stream` command is a handy shortcut. -Here is how to create an invoices super stream with 3 partitions: +It is possible to manage super streams with + +* the stream Java client, by using `Environment#streamCreator()` and `Environment#deleteSuperStream(String)` +* the `add_super_stream` and `delete_super_stream` commands in `rabbitmq-streams` (CLI) +* any AMQP 0.9.1 client library +* the https://www.rabbitmq.com/management.html[management plugin] + +The stream Java client and the dedicated CLI commands are easier to use as they take care of the topology details (exchange, streams, and bindings). + +===== With the Client Library + +Here is how to create an `invoices` super stream with 5 partitions: + +.Creating a super stream by specifying the number of partitions +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=creation-partitions] +-------- + +The super stream partitions will be `invoices-0`, `invoices-1`, ..., `invoices-5`. +We use this kind of topology when routing keys of outbound messages are hashed to pick the partition to publish them to. +This way, if the routing key is the customer ID of the invoice, all the invoices for a given customer end up in the same partition, and they can be processed in the publishing order. + +It is also possible to specify binding keys when creating a super stream: + +.Creating a super stream by specifying the binding keys +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=creation-binding-keys] +-------- + +The super stream partitions will be `invoices-amer`, `invoices-emea` and `invoices-apac` in this case. + +Using one type of topology or the other depends on the use cases, especially how messages are processed. +See the next sections on publishing and consuming to find out more. + +===== With the CLI + +Here is how to create an `invoices` super stream with 5 partitions: .Creating a super stream from the CLI ---- -rabbitmq-streams add_super_stream invoices --partitions 3 +rabbitmq-streams add_super_stream invoices --partitions 5 ---- Use `rabbitmq-streams add_super_stream --help` to learn more about the command. diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java index 99995a0dd1..dd1f448b8e 100644 --- a/src/main/java/com/rabbitmq/stream/Constants.java +++ b/src/main/java/com/rabbitmq/stream/Constants.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -72,6 +72,8 @@ public final class Constants { public static final short COMMAND_CONSUMER_UPDATE = 26; public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27; public static final short COMMAND_STREAM_STATS = 28; + public static final short COMMAND_CREATE_SUPER_STREAM = 29; + public static final short COMMAND_DELETE_SUPER_STREAM = 30; public static final short VERSION_1 = 1; public static final short VERSION_2 = 2; diff --git a/src/main/java/com/rabbitmq/stream/Environment.java b/src/main/java/com/rabbitmq/stream/Environment.java index eb8185a119..7963c835ec 100644 --- a/src/main/java/com/rabbitmq/stream/Environment.java +++ b/src/main/java/com/rabbitmq/stream/Environment.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -58,9 +58,19 @@ static EnvironmentBuilder builder() { * Delete a stream * * @param stream the stream to delete + * @since 0.15.0 */ void deleteStream(String stream); + /** + * Delete a super stream. + * + *

Requires RabbitMQ 3.13.0 or more. + * + * @param superStream the super stream to delete + */ + void deleteSuperStream(String superStream); + /** * Query statistics on a stream. * diff --git a/src/main/java/com/rabbitmq/stream/StreamCreator.java b/src/main/java/com/rabbitmq/stream/StreamCreator.java index ad205b5998..5d6a753d8d 100644 --- a/src/main/java/com/rabbitmq/stream/StreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/StreamCreator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -23,13 +23,24 @@ public interface StreamCreator { ByteCapacity MAX_SEGMENT_SIZE = ByteCapacity.from("3GB"); /** - * The name of the stream + * The name of the stream. + * + *

Alias for {@link #name(String)}. * * @param stream * @return this creator instance */ StreamCreator stream(String stream); + /** + * The name of the (super) stream. + * + * @param name + * @return this creator instance + * @since 0.15.0 + */ + StreamCreator name(String name); + /** * The maximum size of the stream before it gets truncated. * @@ -80,6 +91,16 @@ public interface StreamCreator { */ StreamCreator filterSize(int size); + /** + * Configure the super stream to create. + * + *

Requires RabbitMQ 3.13.0 or more. + * + * @return the super stream configuration + * @since 0.15.0 + */ + SuperStreamConfiguration superStream(); + /** * Create the stream. * @@ -142,4 +163,39 @@ public String value() { return this.value; } } + + /** + * Super stream configuration. + * + * @since 0.15.0 + */ + interface SuperStreamConfiguration { + + /** + * The number of partitions of the super stream. + * + *

Mutually exclusive with {@link #bindingKeys(String...)}. Default is 3. + * + * @param partitions + * @return this super stream configuration instance + */ + SuperStreamConfiguration partitions(int partitions); + + /** + * The binding keys to use when declaring the super stream partitions. + * + *

Mutually exclusive with {@link #partitions(int)}. Default is null. + * + * @param bindingKeys + * @return this super stream configuration instance + */ + SuperStreamConfiguration bindingKeys(String... bindingKeys); + + /** + * Go back to the creator. + * + * @return the stream creator + */ + StreamCreator creator(); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index b443bddf51..5d83866bb9 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -22,7 +22,9 @@ import static com.rabbitmq.stream.impl.Utils.noOpConsumer; import static java.lang.String.format; import static java.lang.String.join; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.StreamSupport.stream; import com.rabbitmq.stream.AuthenticationFailureException; import com.rabbitmq.stream.ByteCapacity; @@ -83,16 +85,10 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -127,6 +123,7 @@ */ public class Client implements AutoCloseable { + private static final Charset CHARSET = StandardCharsets.UTF_8; public static final int DEFAULT_PORT = 5552; public static final int DEFAULT_TLS_PORT = 5551; static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK = @@ -192,8 +189,9 @@ public long applyAsLong(Object value) { private final Duration rpcTimeout; private final List saslMechanisms; private volatile ShutdownReason shutdownReason = null; - private final Runnable exchangeCommandVersionsCheck; + private final Runnable streamStatsCommandVersionsCheck; private final boolean filteringSupported; + private final Runnable superStreamManagementCommandVersionsCheck; public Client() { this(new ClientParameters()); @@ -379,25 +377,36 @@ public void initChannel(SocketChannel ch) { tuneState.getHeartbeat()); this.connectionProperties = open(parameters.virtualHost); Set supportedCommands = maybeExchangeCommandVersions(); - AtomicReference exchangeCommandVersionsCheckReference = new AtomicReference<>(); + AtomicBoolean streamStatsSupported = new AtomicBoolean(false); AtomicBoolean filteringSupportedReference = new AtomicBoolean(false); + AtomicBoolean superStreamManagementSupported = new AtomicBoolean(false); supportedCommands.forEach( c -> { if (c.getKey() == COMMAND_STREAM_STATS) { - exchangeCommandVersionsCheckReference.set(() -> {}); + streamStatsSupported.set(true); } if (c.getKey() == COMMAND_PUBLISH && c.getMaxVersion() >= VERSION_2) { filteringSupportedReference.set(true); } + if (c.getKey() == COMMAND_CREATE_SUPER_STREAM) { + superStreamManagementSupported.set(true); + } }); - this.exchangeCommandVersionsCheck = - exchangeCommandVersionsCheckReference.get() == null - ? () -> { + this.streamStatsCommandVersionsCheck = + streamStatsSupported.get() + ? () -> {} + : () -> { throw new UnsupportedOperationException( "QueryStreamInfo is available only on RabbitMQ 3.11 or more."); - } - : exchangeCommandVersionsCheckReference.get(); + }; this.filteringSupported = filteringSupportedReference.get(); + this.superStreamManagementCommandVersionsCheck = + superStreamManagementSupported.get() + ? () -> {} + : () -> { + throw new UnsupportedOperationException( + "Super stream management is available only on RabbitMQ 3.13 or more."); + }; started.set(true); this.metricsCollector.openConnection(); } catch (RuntimeException e) { @@ -446,12 +455,7 @@ int maxFrameSize() { } private Map peerProperties() { - int clientPropertiesSize = 4; // size of the map, always there - if (!clientProperties.isEmpty()) { - for (Map.Entry entry : clientProperties.entrySet()) { - clientPropertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length(); - } - } + int clientPropertiesSize = mapSize(this.clientProperties); int length = 2 + 2 + 4 + clientPropertiesSize; int correlationId = correlationSequence.incrementAndGet(); try { @@ -460,13 +464,7 @@ private Map peerProperties() { bb.writeShort(encodeRequestCode(COMMAND_PEER_PROPERTIES)); bb.writeShort(VERSION_1); bb.writeInt(correlationId); - bb.writeInt(clientProperties.size()); - for (Map.Entry entry : clientProperties.entrySet()) { - bb.writeShort(entry.getKey().length()) - .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8)) - .writeShort(entry.getValue().length()) - .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8)); - } + writeMap(bb, this.clientProperties); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -540,7 +538,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate( bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(saslMechanism.getName().length()); - bb.writeBytes(saslMechanism.getName().getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(saslMechanism.getName().getBytes(CHARSET)); if (challengeResponse == null) { bb.writeInt(-1); } else { @@ -570,7 +568,7 @@ private Map open(String virtualHost) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(virtualHost.length()); - bb.writeBytes(virtualHost.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(virtualHost.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -612,7 +610,7 @@ private void sendClose(short code, String reason) { bb.writeInt(correlationId); bb.writeShort(code); bb.writeShort(reason.length()); - bb.writeBytes(reason.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reason.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -662,10 +660,7 @@ public Response create(String stream) { } public Response create(String stream, Map arguments) { - int length = 2 + 2 + 4 + 2 + stream.length() + 4; - for (Map.Entry argument : arguments.entrySet()) { - length = length + 2 + argument.getKey().length() + 2 + argument.getValue().length(); - } + int length = 2 + 2 + 4 + 2 + stream.length() + mapSize(arguments); int correlationId = correlationSequence.incrementAndGet(); try { ByteBuf bb = allocate(length + 4); @@ -674,14 +669,8 @@ public Response create(String stream, Map arguments) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); - bb.writeInt(arguments.size()); - for (Map.Entry argument : arguments.entrySet()) { - bb.writeShort(argument.getKey().length()); - bb.writeBytes(argument.getKey().getBytes(StandardCharsets.UTF_8)); - bb.writeShort(argument.getValue().length()); - bb.writeBytes(argument.getValue().getBytes(StandardCharsets.UTF_8)); - } + bb.writeBytes(stream.getBytes(CHARSET)); + writeMap(bb, arguments); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -696,6 +685,119 @@ public Response create(String stream, Map arguments) { } } + Response createSuperStream( + String superStream, + List partitions, + List bindingKeys, + Map arguments) { + this.superStreamManagementCommandVersionsCheck.run(); + if (partitions.isEmpty() || bindingKeys.isEmpty()) { + throw new IllegalArgumentException( + "Partitions and routing keys of a super stream cannot be empty"); + } + if (partitions.size() != bindingKeys.size()) { + throw new IllegalArgumentException( + "Partitions and routing keys of a super stream must have " + + "the same number of elements"); + } + arguments = arguments == null ? Collections.emptyMap() : arguments; + int length = + 2 + + 2 + + 4 + + 2 + + superStream.length() + + collectionSize(partitions) + + collectionSize(bindingKeys) + + mapSize(arguments); + int correlationId = correlationSequence.incrementAndGet(); + try { + ByteBuf bb = allocate(length + 4); + bb.writeInt(length); + bb.writeShort(encodeRequestCode(COMMAND_CREATE_SUPER_STREAM)); + bb.writeShort(VERSION_1); + bb.writeInt(correlationId); + bb.writeShort(superStream.length()); + bb.writeBytes(superStream.getBytes(CHARSET)); + writeCollection(bb, partitions); + writeCollection(bb, bindingKeys); + writeMap(bb, arguments); + OutstandingRequest request = outstandingRequest(); + outstandingRequests.put(correlationId, request); + channel.writeAndFlush(bb); + request.block(); + return request.response.get(); + } catch (StreamException e) { + outstandingRequests.remove(correlationId); + throw e; + } catch (RuntimeException e) { + outstandingRequests.remove(correlationId); + throw new StreamException(format("Error while creating super stream '%s'", superStream), e); + } + } + + Response deleteSuperStream(String superStream) { + this.superStreamManagementCommandVersionsCheck.run(); + int length = 2 + 2 + 4 + 2 + superStream.length(); + int correlationId = correlationSequence.incrementAndGet(); + try { + ByteBuf bb = allocate(length + 4); + bb.writeInt(length); + bb.writeShort(encodeRequestCode(COMMAND_DELETE_SUPER_STREAM)); + bb.writeShort(VERSION_1); + bb.writeInt(correlationId); + bb.writeShort(superStream.length()); + bb.writeBytes(superStream.getBytes(CHARSET)); + OutstandingRequest request = outstandingRequest(); + outstandingRequests.put(correlationId, request); + channel.writeAndFlush(bb); + request.block(); + return request.response.get(); + } catch (StreamException e) { + outstandingRequests.remove(correlationId); + throw e; + } catch (RuntimeException e) { + outstandingRequests.remove(correlationId); + throw new StreamException(format("Error while deleting stream '%s'", superStream), e); + } + } + + private static int collectionSize(Collection elements) { + return 4 + elements.stream().mapToInt(v -> 2 + v.length()).sum(); + } + + private static int arraySize(String... elements) { + return collectionSize(asList(elements)); + } + + private static int mapSize(Map elements) { + return 4 + + elements.entrySet().stream() + .mapToInt(e -> 2 + e.getKey().length() + 2 + e.getValue().length()) + .sum(); + } + + private static ByteBuf writeCollection(ByteBuf bb, Collection elements) { + bb.writeInt(elements.size()); + elements.forEach(e -> bb.writeShort(e.length()).writeBytes(e.getBytes(CHARSET))); + return bb; + } + + private static ByteBuf writeArray(ByteBuf bb, String... elements) { + return writeCollection(bb, asList(elements)); + } + + private static ByteBuf writeMap(ByteBuf bb, Map elements) { + bb.writeInt(elements.size()); + elements.forEach( + (key, value) -> + bb.writeShort(key.length()) + .writeBytes(key.getBytes(CHARSET)) + .writeShort(value.length()) + .writeBytes(value.getBytes(CHARSET))); + return bb; + } + ByteBuf allocate(ByteBufAllocator allocator, int capacity) { if (frameSizeCopped && capacity > this.maxFrameSize()) { throw new IllegalArgumentException( @@ -729,7 +831,7 @@ public Response delete(String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -744,15 +846,15 @@ public Response delete(String stream) { } } + Map metadata(List streams) { + return this.metadata(streams.toArray(new String[] {})); + } + public Map metadata(String... streams) { if (streams == null || streams.length == 0) { throw new IllegalArgumentException("At least one stream must be specified"); } - int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, size of array - for (String stream : streams) { - length += 2; - length += stream.length(); - } + int length = 2 + 2 + 4 + arraySize(streams); // API code, version, correlation ID, array size int correlationId = correlationSequence.incrementAndGet(); try { ByteBuf bb = allocate(length + 4); @@ -760,11 +862,7 @@ public Map metadata(String... streams) { bb.writeShort(encodeRequestCode(COMMAND_METADATA)); bb.writeShort(VERSION_1); bb.writeInt(correlationId); - bb.writeInt(streams.length); - for (String stream : streams) { - bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); - } + writeArray(bb, streams); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -800,10 +898,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St bb.writeByte(publisherId); bb.writeShort(publisherReferenceSize); if (publisherReferenceSize > 0) { - bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(publisherReference.getBytes(CHARSET)); } bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1142,10 +1240,7 @@ public Response subscribe( } int propertiesSize = 0; if (properties != null && !properties.isEmpty()) { - propertiesSize = 4; // size of the map - for (Map.Entry entry : properties.entrySet()) { - propertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length(); - } + propertiesSize = mapSize(properties); } length += propertiesSize; int correlationId = correlationSequence.getAndIncrement(); @@ -1157,20 +1252,14 @@ public Response subscribe( bb.writeInt(correlationId); bb.writeByte(subscriptionId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); bb.writeShort(offsetSpecification.getType()); if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) { bb.writeLong(offsetSpecification.getOffset()); } bb.writeShort(initialCredits); if (properties != null && !properties.isEmpty()) { - bb.writeInt(properties.size()); - for (Map.Entry entry : properties.entrySet()) { - bb.writeShort(entry.getKey().length()) - .writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8)) - .writeShort(entry.getValue().length()) - .writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8)); - } + writeMap(bb, properties); } OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); @@ -1205,9 +1294,9 @@ public void storeOffset(String reference, String stream, long offset) { bb.writeShort(encodeRequestCode(COMMAND_STORE_OFFSET)); bb.writeShort(VERSION_1); bb.writeShort(reference.length()); - bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); bb.writeLong(offset); channel.writeAndFlush(bb); } @@ -1230,9 +1319,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(reference.length()); - bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(reference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1271,9 +1360,9 @@ public long queryPublisherSequence(String publisherReference, String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(publisherReference.length()); - bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(publisherReference.getBytes(CHARSET)); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1436,9 +1525,9 @@ public List route(String routingKey, String superStream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(routingKey.length()); - bb.writeBytes(routingKey.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(routingKey.getBytes(CHARSET)); bb.writeShort(superStream.length()); - bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1471,7 +1560,7 @@ public List partitions(String superStream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(superStream.length()); - bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(superStream.getBytes(CHARSET)); OutstandingRequest> request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); @@ -1519,7 +1608,7 @@ List exchangeCommandVersions() { } StreamStatsResponse streamStats(String stream) { - this.exchangeCommandVersionsCheck.run(); + this.streamStatsCommandVersionsCheck.run(); if (stream == null) { throw new IllegalArgumentException("stream must not be null"); } @@ -1532,7 +1621,7 @@ StreamStatsResponse streamStats(String stream) { bb.writeShort(VERSION_1); bb.writeInt(correlationId); bb.writeShort(stream.length()); - bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8)); + bb.writeBytes(stream.getBytes(CHARSET)); OutstandingRequest request = outstandingRequest(); outstandingRequests.put(correlationId, request); channel.writeAndFlush(bb); diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index 2b02a475ff..04b45446ea 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -13,37 +13,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.Constants.COMMAND_CLOSE; -import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_CREDIT; -import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER; -import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM; -import static com.rabbitmq.stream.Constants.COMMAND_DELIVER; -import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS; -import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA; -import static com.rabbitmq.stream.Constants.COMMAND_METADATA_UPDATE; -import static com.rabbitmq.stream.Constants.COMMAND_OPEN; -import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS; -import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH_CONFIRM; -import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH_ERROR; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET; -import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE; -import static com.rabbitmq.stream.Constants.COMMAND_ROUTE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE; -import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE; -import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS; -import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE; -import static com.rabbitmq.stream.Constants.COMMAND_TUNE; -import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE; -import static com.rabbitmq.stream.Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE; -import static com.rabbitmq.stream.Constants.VERSION_1; -import static com.rabbitmq.stream.Constants.VERSION_2; +import static com.rabbitmq.stream.Constants.*; import static com.rabbitmq.stream.impl.Utils.encodeResponseCode; import com.rabbitmq.stream.ChunkChecksum; @@ -128,6 +98,8 @@ class ServerFrameHandler { handlers.put(COMMAND_CONSUMER_UPDATE, new ConsumerUpdateFrameHandler()); handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler()); handlers.put(COMMAND_STREAM_STATS, new StreamStatsFrameHandler()); + handlers.put(COMMAND_CREATE_SUPER_STREAM, RESPONSE_FRAME_HANDLER); + handlers.put(COMMAND_DELETE_SUPER_STREAM, RESPONSE_FRAME_HANDLER); HANDLERS = new FrameHandler[maxCommandKey + 1][]; handlers .entrySet() diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 1c190546d2..476da5781d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -460,6 +460,22 @@ public void deleteStream(String stream) { } } + @Override + public void deleteSuperStream(String superStream) { + checkNotClosed(); + this.maybeInitializeLocator(); + Client.Response response = this.locator().deleteSuperStream(superStream); + if (!response.isOk()) { + throw new StreamException( + "Error while deleting super stream " + + superStream + + " (" + + formatConstant(response.getResponseCode()) + + ")", + response.getResponseCode()); + } + } + @Override public StreamStats queryStreamStats(String stream) { checkNotClosed(); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java index ed48565ec1..264e858d44 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -15,19 +15,25 @@ import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedFunction; +import static java.util.stream.Collectors.toList; import com.rabbitmq.stream.ByteCapacity; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.IntStream; class StreamStreamCreator implements StreamCreator { private final StreamEnvironment environment; private final Client.StreamParametersBuilder streamParametersBuilder = new Client.StreamParametersBuilder().leaderLocator(LeaderLocator.LEAST_LEADERS); - private String stream; + private String name; + private DefaultSuperStreamConfiguration superStreamConfiguration; StreamStreamCreator(StreamEnvironment environment) { this.environment = environment; @@ -35,7 +41,13 @@ class StreamStreamCreator implements StreamCreator { @Override public StreamCreator stream(String stream) { - this.stream = stream; + this.name = stream; + return this; + } + + @Override + public StreamCreator name(String name) { + this.name = name; return this; } @@ -73,27 +85,104 @@ public StreamCreator filterSize(int size) { return this; } + @Override + public SuperStreamConfiguration superStream() { + if (this.superStreamConfiguration == null) { + this.superStreamConfiguration = new DefaultSuperStreamConfiguration(this); + } + return this.superStreamConfiguration; + } + @Override public void create() { - if (stream == null) { - throw new IllegalArgumentException("Stream cannot be null"); + if (name == null) { + throw new IllegalArgumentException("Stream name cannot be null"); + } + Function function; + boolean superStream = this.superStreamConfiguration != null; + if (superStream) { + List partitions, bindingKeys; + if (this.superStreamConfiguration.bindingKeys == null) { + partitions = + IntStream.range(0, this.superStreamConfiguration.partitions) + .mapToObj(i -> this.name + "-" + i) + .collect(toList()); + bindingKeys = + IntStream.range(0, this.superStreamConfiguration.partitions) + .mapToObj(String::valueOf) + .collect(toList()); + } else { + partitions = + this.superStreamConfiguration.bindingKeys.stream() + .map(rk -> this.name + "-" + rk) + .collect(toList()); + bindingKeys = this.superStreamConfiguration.bindingKeys; + } + function = + namedFunction( + c -> + c.createSuperStream( + this.name, partitions, bindingKeys, streamParametersBuilder.build()), + "Creation of super stream '%s'", + this.name); + } else { + function = + namedFunction( + c -> c.create(name, streamParametersBuilder.build()), + "Creation of stream '%s'", + this.name); } this.environment.maybeInitializeLocator(); - Client.Response response = - environment.locatorOperation( - namedFunction( - c -> c.create(stream, streamParametersBuilder.build()), - "Creation of stream '%s'", - this.stream)); + Client.Response response = environment.locatorOperation(function); if (!response.isOk() && response.getResponseCode() != Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) { + String label = superStream ? "super stream" : "stream"; throw new StreamException( - "Error while creating stream '" - + stream + "Error while creating " + + label + + " '" + + name + "' (" + formatConstant(response.getResponseCode()) + ")", response.getResponseCode()); } } + + private static class DefaultSuperStreamConfiguration implements SuperStreamConfiguration { + + private final StreamCreator creator; + + private int partitions = 3; + private List bindingKeys = null; + + private DefaultSuperStreamConfiguration(StreamCreator creator) { + this.creator = creator; + } + + @Override + public SuperStreamConfiguration partitions(int partitions) { + if (partitions <= 0) { + throw new IllegalArgumentException("The number of partitions must be greater than 0"); + } + this.partitions = partitions; + this.bindingKeys = null; + return this; + } + + @Override + public SuperStreamConfiguration bindingKeys(String... bindingKeys) { + if (bindingKeys == null || bindingKeys.length == 0) { + throw new IllegalArgumentException("There must be at least 1 binding key"); + } + this.bindingKeys = Arrays.asList(bindingKeys); + this.partitions = -1; + return this; + } + + @Override + public StreamCreator creator() { + return this.creator; + } + } } diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index b5bd0a6934..80fdd05ae8 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -14,6 +14,7 @@ package com.rabbitmq.stream; import static java.lang.String.format; +import static java.util.Arrays.asList; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; @@ -151,12 +152,24 @@ public static void addUser(String username, String password) throws IOException rabbitmqctl(format("add_user %s %s", username, password)); } + public static void setPermissions(String username, List permissions) throws IOException { + setPermissions(username, "/", permissions); + } + public static void setPermissions(String username, String vhost, String permission) throws IOException { + setPermissions(username, vhost, asList(permission, permission, permission)); + } + + public static void setPermissions(String username, String vhost, List permissions) + throws IOException { + if (permissions.size() != 3) { + throw new IllegalArgumentException(); + } rabbitmqctl( format( "set_permissions --vhost %s %s '%s' '%s' '%s'", - vhost, username, permission, permission, permission)); + vhost, username, permissions.get(0), permissions.get(1), permissions.get(2))); } public static void changePassword(String username, String newPassword) throws IOException { diff --git a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java index ca2765fdbb..f221893a90 100644 --- a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java @@ -26,6 +26,22 @@ public class SuperStreamUsage { + void creation() { + Environment environment = Environment.builder().build(); + // tag::creation-partitions[] + environment.streamCreator().name("invoices") + .superStream() + .partitions(5).creator() + .create(); + // end::creation-partitions[] + // tag::creation-binding-keys[] + environment.streamCreator().name("invoices") + .superStream() + .bindingKeys("amer", "emea", "apac").creator() + .create(); + // end::creation-binding-keys[] + } + void producerSimple() { Environment environment = Environment.builder().build(); // tag::producer-simple[] diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 4b068e5cb7..24736a8612 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -844,7 +844,7 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void exchangeCommandVersions() { Client client = cf.get(); List infos = client.exchangeCommandVersions(); @@ -853,7 +853,7 @@ void exchangeCommandVersions() { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception { int publishCount = 20_000; byte correlationId = 42; @@ -887,7 +887,7 @@ void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { int publishCount = 20_000; CountDownLatch latch = new CountDownLatch(publishCount); @@ -923,14 +923,14 @@ void streamStatsShouldReturnFirstOffsetAndCommittedOffset() throws Exception { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamStatsShouldReturnErrorWhenStreamDoesNotExist() { assertThat(cf.get().streamStats("does not exist").getResponseCode()) .isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST); } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) throws Exception { // this test is flaky in some CI environments, so we have to retry it repeatIfFailure( diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index fd477345c7..f9c9d9ab97 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -58,6 +58,7 @@ public class FilteringTest { Environment environment; String stream; + ClientFactory cf; @BeforeEach void init() throws Exception { @@ -278,9 +279,9 @@ void superStream(TestInfo info) throws Exception { () -> { String superStream = TestUtils.streamName(info); int partitionCount = 3; - Connection connection = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); try { - TestUtils.declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Producer producer = environment @@ -362,8 +363,7 @@ void superStream(TestInfo info) throws Exception { CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); } finally { - TestUtils.deleteSuperStreamTopology(connection, superStream, partitionCount); - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } }); } diff --git a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java index 5da95d97de..863e9019e1 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -36,21 +36,20 @@ public class RoutePartitionsTest { TestUtils.ClientFactory cf; - Connection connection; + Client configurationClient; int partitions = 3; String superStream; @BeforeEach - void init(TestInfo info) throws Exception { - connection = new ConnectionFactory().newConnection(); + void init(TestInfo info) { + configurationClient = cf.get(); superStream = TestUtils.streamName(info); } @AfterEach - void tearDown() throws Exception { - deleteSuperStreamTopology(connection, superStream, partitions); - connection.close(); + void tearDown() { + deleteSuperStreamTopology(configurationClient, superStream); } @Test @@ -64,8 +63,8 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() { } @Test - void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception { - declareSuperStreamTopology(connection, superStream, partitions); + void routeShouldReturnNullWhenNoStreamForRoutingKey() { + declareSuperStreamTopology(configurationClient, superStream, partitions); Client client = cf.get(); assertThat(client.route("0", superStream)).hasSize(1).contains(superStream + "-0"); @@ -73,16 +72,14 @@ void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception { } @Test - void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception { - declareSuperStreamTopology(connection, superStream, 0); - + void partitionsShouldReturnEmptyListWhenSuperStreamDoesNotExist() { Client client = cf.get(); assertThat(client.partitions(superStream)).isEmpty(); } @Test - void routeTopologyWithPartitionCount() throws Exception { - declareSuperStreamTopology(connection, superStream, 3); + void routeTopologyWithPartitionCount() { + declareSuperStreamTopology(configurationClient, superStream, 3); Client client = cf.get(); List streams = client.partitions(superStream); @@ -97,8 +94,10 @@ void routeTopologyWithPartitionCount() throws Exception { @Test void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception { - declareSuperStreamTopology(connection, superStream, 3); - connection.createChannel().queueBind(superStream + "-1", superStream, "0"); + declareSuperStreamTopology(configurationClient, superStream, 3); + try (Connection connection = new ConnectionFactory().newConnection()) { + connection.createChannel().queueBind(superStream + "-1", superStream, "0"); + } Client client = cf.get(); List streams = client.partitions(superStream); assertThat(streams) @@ -114,10 +113,12 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception @Test void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { - declareSuperStreamTopology(connection, superStream, 3); - Channel channel = connection.createChannel(); - String nonStreamQueue = channel.queueDeclare().getQueue(); - connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream"); + declareSuperStreamTopology(configurationClient, superStream, 3); + try (Connection connection = new ConnectionFactory().newConnection()) { + Channel channel = connection.createChannel(); + String nonStreamQueue = channel.queueDeclare().getQueue(); + connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream"); + } Client client = cf.get(); List streams = client.partitions(superStream); assertThat(streams) @@ -131,9 +132,9 @@ void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { } @Test - void partitionsReturnsCorrectOrder() throws Exception { + void partitionsReturnsCorrectOrder() { String[] partitionNames = {"z", "y", "x"}; - declareSuperStreamTopology(connection, superStream, partitionNames); + declareSuperStreamTopology(configurationClient, superStream, partitionNames); try { Client client = cf.get(); List streams = client.partitions(superStream); @@ -142,7 +143,7 @@ void partitionsReturnsCorrectOrder() throws Exception { .containsSequence( Arrays.stream(partitionNames).map(p -> superStream + "-" + p).toArray(String[]::new)); } finally { - deleteSuperStreamTopology(connection, superStream, partitionNames); + deleteSuperStreamTopology(configurationClient, superStream); } } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java index 42125fc8dd..1ef940a89c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacClientTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2022-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). @@ -26,8 +26,6 @@ import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Host; import com.rabbitmq.stream.OffsetSpecification; @@ -230,9 +228,9 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info) Map receivedMessages = receivedMessages(2); String superStream = streamName(info); String consumerName = "foo"; - Connection c = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); try { - TestUtils.declareSuperStreamTopology(c, superStream, 3); + declareSuperStreamTopology(configurationClient, superStream, 3); // working with the second partition String partition = superStream + "-1"; @@ -330,8 +328,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info) .isEqualTo(messageCount * 2); } finally { - TestUtils.deleteSuperStreamTopology(c, superStream, 3); - c.close(); + deleteSuperStreamTopology(configurationClient, superStream); } } @@ -340,11 +337,11 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr Map consumerStates = consumerStates(3 * 3); String superStream = streamName(info); String consumerName = "foo"; - Connection c = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); // subscription distribution // client 1: 0, 1, 2 / client 2: 3, 4, 5, / client 3: 6, 7, 8 try { - declareSuperStreamTopology(c, superStream, 3); + declareSuperStreamTopology(configurationClient, superStream, 3); List partitions = IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(toList()); ConsumerUpdateListener consumerUpdateListener = @@ -422,7 +419,7 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr client.set(client3); partitions.forEach(unsubscriptionCallback); } finally { - deleteSuperStreamTopology(c, superStream, 3); + deleteSuperStreamTopology(configurationClient, superStream); } } @@ -485,10 +482,10 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep Map consumerStates = consumerStates(3 * 3); String superStream = streamName(info); String consumerName = "foo"; - Connection c = new ConnectionFactory().newConnection(); + Client configurationClient = cf.get(); AtomicBoolean keepPublishing = new AtomicBoolean(true); try { - declareSuperStreamTopology(c, superStream, 3); + declareSuperStreamTopology(configurationClient, superStream, 3); // we use the second partition because a rebalancing occurs // when the second consumer joins String partitionInUse = superStream + "-1"; @@ -584,8 +581,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep assertThat(response).is(ok()); } finally { keepPublishing.set(false); - deleteSuperStreamTopology(c, superStream, 3); - c.close(); + deleteSuperStreamTopology(configurationClient, superStream); } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java index c37c5857b9..d00f59bef5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java @@ -19,8 +19,6 @@ import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.ConsumerUpdateListener; import com.rabbitmq.stream.Environment; @@ -56,26 +54,25 @@ public class SacSuperStreamConsumerTest { Environment environment; - Connection connection; + Client configurationClient; int partitionCount = 3; String superStream; TestUtils.ClientFactory cf; @BeforeEach - void init(TestInfo info) throws Exception { + void init(TestInfo info) { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); superStream = TestUtils.streamName(info); - connection = new ConnectionFactory().newConnection(); - declareSuperStreamTopology(connection, superStream, partitionCount); + configurationClient = cf.get(); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); } @AfterEach - void tearDown() throws Exception { + void tearDown() { environment.close(); - deleteSuperStreamTopology(connection, superStream, partitionCount); - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } @Test @@ -171,7 +168,7 @@ void sacShouldSpreadAcrossPartitions() throws Exception { @Test void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 5_000; AtomicInteger messageWaveCount = new AtomicInteger(); int superConsumerCount = 3; @@ -357,7 +354,7 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { @Test void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 5_000; int storeEvery = messageCount / 100; AtomicInteger messageWaveCount = new AtomicInteger(); @@ -561,7 +558,7 @@ void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { @Test void sacExternalOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 5_000; AtomicInteger messageWaveCount = new AtomicInteger(); int superConsumerCount = 3; @@ -731,8 +728,8 @@ void sacExternalOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception { } @Test - void consumerGroupsOnSameSuperStreamShouldBeIndependent() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void consumerGroupsOnSameSuperStreamShouldBeIndependent() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); int messageCount = 20_000; int consumerGroupsCount = 3; List consumerNames = diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 3bb03d6810..0781b6a01b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -132,7 +132,7 @@ void nameShouldBeSetIfTrackingStrategyIsSet() { } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void committedOffsetShouldBeSet() throws Exception { int messageCount = 20_000; publishAndWaitForConfirms(cf, messageCount, this.stream); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 0203c8f536..fb5e848616 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -495,6 +495,62 @@ void streamCreationShouldBeIdempotent(TestInfo info) { } } + @ParameterizedTest + @ValueSource(ints = {3, 5}) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) + void superStreamCreationSetPartitions(int partitionCount, TestInfo info) { + int defaultPartitionCount = 3; + String s = streamName(info); + Client client = cf.get(); + Environment env = environmentBuilder.build(); + try { + StreamCreator.SuperStreamConfiguration configuration = + env.streamCreator().name(s).superStream(); + if (partitionCount != defaultPartitionCount) { + configuration.partitions(partitionCount); + } + configuration.creator().create(); + + assertThat(client.partitions(s)) + .hasSize(partitionCount) + .containsAll( + IntStream.range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList())); + IntStream.range(0, partitionCount) + .forEach( + i -> assertThat(client.route(String.valueOf(i), s)).hasSize(1).contains(s + "-" + i)); + } finally { + env.deleteSuperStream(s); + env.close(); + assertThat(client.partitions(s)).isEmpty(); + } + } + + @Test + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) + void superStreamCreationSetBindingKeys(TestInfo info) { + List bindingKeys = Arrays.asList("a", "b", "c", "d", "e"); + String s = streamName(info); + Client client = cf.get(); + Environment env = environmentBuilder.build(); + try { + env.streamCreator() + .name(s) + .superStream() + .bindingKeys(bindingKeys.toArray(new String[] {})) + .creator() + .create(); + + assertThat(client.partitions(s)) + .hasSize(bindingKeys.size()) + .containsAll(bindingKeys.stream().map(rk -> s + "-" + rk).collect(toList())); + bindingKeys.forEach(bk -> assertThat(client.route(bk, s)).hasSize(1).contains(s + "-" + bk)); + } finally { + env.deleteSuperStream(s); + env.close(); + assertThat(client.partitions(s)).isEmpty(); + } + } + @Test void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() { String dummyHost = UUID.randomUUID().toString(); @@ -556,7 +612,7 @@ void createPublishConsumeDelete(boolean lazyInit, TestInfo info) { @ParameterizedTest @ValueSource(booleans = {true, false}) - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) throws Exception { try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) { @@ -589,7 +645,7 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() { try (Environment env = environmentBuilder.build()) { assertThatThrownBy(() -> env.queryStreamStats("does not exist")) @@ -599,7 +655,7 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() { @ParameterizedTest @ValueSource(booleans = {true, false}) - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamExists(boolean lazyInit) { AtomicBoolean metadataCalled = new AtomicBoolean(false); Function clientFactory = @@ -623,7 +679,7 @@ public Map metadata(String... streams) { @ParameterizedTest @ValueSource(booleans = {true, false}) - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) void streamExistsMetadataDataFallback(boolean lazyInit) { AtomicInteger metadataCallCount = new AtomicInteger(0); Function clientFactory = diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java index 55a92dbabd..319d664feb 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java @@ -23,8 +23,6 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; @@ -53,7 +51,7 @@ public class SuperStreamConsumerTest { Environment environment; - Connection connection; + Client configurationClient; int partitionCount = 3; String superStream; String[] routingKeys = null; @@ -65,18 +63,13 @@ void init(TestInfo info) throws Exception { Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); superStream = TestUtils.streamName(info); - connection = new ConnectionFactory().newConnection(); + configurationClient = cf.get(); } @AfterEach void tearDown() throws Exception { environment.close(); - if (routingKeys == null) { - deleteSuperStreamTopology(connection, superStream, partitionCount); - } else { - deleteSuperStreamTopology(connection, superStream, routingKeys); - } - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } private static void publishToPartitions( @@ -101,8 +94,8 @@ private static void publishToPartitions( } @Test - void consumeAllMessagesFromAllPartitions() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void consumeAllMessagesFromAllPartitions() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10000 * partitionCount; @@ -133,8 +126,8 @@ void consumeAllMessagesFromAllPartitions() throws Exception { } @Test - void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void manualOffsetTrackingShouldStoreOnAllPartitions() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10000 * partitionCount; @@ -191,8 +184,8 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { } @Test - void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void autoOffsetTrackingShouldStoreOnAllPartitions() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10000 * partitionCount; @@ -247,8 +240,8 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { } @Test - void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + void autoOffsetTrackingShouldStoreOffsetZero() { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = partitionCount; @@ -305,7 +298,7 @@ void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { @BrokerVersionAtLeast(RABBITMQ_3_11_11) void rebalancedPartitionShouldGetMessagesWhenItComesBackToOriginalConsumerInstance() throws Exception { - declareSuperStreamTopology(connection, superStream, partitionCount); + declareSuperStreamTopology(configurationClient, superStream, partitionCount); Client client = cf.get(); List partitions = client.partitions(superStream); int messageCount = 10_000; diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java new file mode 100644 index 0000000000..cba1234b48 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamManagementTest.java @@ -0,0 +1,179 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.Constants.*; +import static com.rabbitmq.stream.Host.*; +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*; +import static com.rabbitmq.stream.impl.TestUtils.streamName; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.OffsetSpecification; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@TestUtils.DisabledIfRabbitMqCtlNotSet +public class SuperStreamManagementTest { + + TestUtils.ClientFactory cf; + static final int partitionCount = 3; + String s; + List partitions; + List bindingKeys; + + @BeforeEach + void init(TestInfo info) { + s = streamName(info); + partitions = partitions(s); + bindingKeys = bindingKeys(); + } + + @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) + void createDelete() { + Client c = cf.get(); + Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null); + assertThat(response).is(ok()); + assertThat(c.metadata(partitions)) + .hasSameSizeAs(partitions) + .allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue()); + assertThat(c.partitions(s)).isEqualTo(partitions); + bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).hasSize(1).contains(s + "-" + bk)); + + response = c.createSuperStream(s, partitions, bindingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_STREAM_ALREADY_EXISTS)); + + response = c.deleteSuperStream(s); + assertThat(response).is(ok()); + assertThat(c.metadata(partitions)) + .hasSameSizeAs(partitions) + .allSatisfy( + (s, streamMetadata) -> + assertThat(streamMetadata.getResponseCode()) + .isEqualTo(RESPONSE_CODE_STREAM_DOES_NOT_EXIST)); + assertThat(c.partitions(s)).isEmpty(); + bindingKeys.forEach(bk -> assertThat(c.route(bk, s)).isEmpty()); + + response = c.deleteSuperStream(s); + assertThat(response).is(responseCode(RESPONSE_CODE_STREAM_DOES_NOT_EXIST)); + } + + @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) + void clientWithSubscriptionShouldReceiveNotificationOnDeletion() throws Exception { + Client c = cf.get(); + Client.Response response = c.createSuperStream(s, partitions, bindingKeys, null); + assertThat(response).is(ok()); + Map notifications = new ConcurrentHashMap<>(partitions.size()); + AtomicInteger notificationCount = new AtomicInteger(); + partitions.forEach( + p -> { + Client client = + cf.get( + new Client.ClientParameters() + .metadataListener( + (stream, code) -> { + notifications.put(stream, code); + notificationCount.incrementAndGet(); + })); + Client.Response r = client.subscribe((byte) 0, p, OffsetSpecification.first(), 1); + assertThat(r).is(ok()); + }); + + response = c.deleteSuperStream(s); + assertThat(response).is(ok()); + waitAtMost(() -> notificationCount.get() == partitionCount); + assertThat(notifications).hasSize(partitionCount).containsOnlyKeys(partitions); + assertThat(notifications.values()).containsOnly(RESPONSE_CODE_STREAM_NOT_AVAILABLE); + } + + @Test + @TestUtils.BrokerVersionAtLeast(TestUtils.BrokerVersion.RABBITMQ_3_13_0) + void authorisation() throws Exception { + String user = "stream"; + // binding keys do not matter for authorisation + bindingKeys = asList("1", "2", "3"); + try { + addUser(user, user); + setPermissions(user, asList("stream|partition.*$", "partition.*$", "stream.*$")); + Client c = cf.get(new Client.ClientParameters().username(user).password(user)); + Client.Response response = c.createSuperStream("not-allowed", partitions, bindingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); + + s = name("stream"); + response = c.createSuperStream(s, asList("1", "2", "3"), bindingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); + + partitions = range(0, partitionCount).mapToObj(i -> s + "-" + i).collect(toList()); + // we can create the queues, but can't bind them, as it requires write permission + response = c.createSuperStream(s, partitions, bindingKeys, null); + assertThat(response).is(ko()).is(responseCode(RESPONSE_CODE_ACCESS_REFUSED)); + + String partitionName = name("partition"); + partitions = + range(0, partitionCount).mapToObj(i -> partitionName + "-" + i).collect(toList()); + response = c.createSuperStream(s, partitions, bindingKeys, null); + assertThat(response).is(ok()); + + assertThat(c.metadata(partitions)) + .hasSameSizeAs(partitions) + .allSatisfy((s, streamMetadata) -> assertThat(streamMetadata.isResponseOk()).isTrue()); + assertThat(c.partitions(s)).isEqualTo(partitions); + for (int i = 0; i < bindingKeys.size(); i++) { + String bk = bindingKeys.get(i); + assertThat(c.route(bk, s)).hasSize(1).contains(partitions.get(i)); + } + + response = c.deleteSuperStream(s); + assertThat(response).is(ok()); + } finally { + deleteUser(user); + } + } + + private static List bindingKeys() { + return bindingKeys(partitionCount); + } + + private static List bindingKeys(int partitions) { + return range(0, partitions).mapToObj(String::valueOf).collect(toList()); + } + + private static List partitions(String superStream) { + return partitions(superStream, partitionCount); + } + + private static List partitions(String superStream, int partitions) { + return range(0, partitions).mapToObj(i -> superStream + "-" + i).collect(toList()); + } + + private static String name(String value) { + String uuid = UUID.randomUUID().toString(); + return String.format( + "%s-%s%s", + SuperStreamManagementTest.class.getSimpleName(), value, uuid.substring(uuid.length() / 2)); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index 75ff916bd2..e8fc7250ee 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -20,8 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; @@ -50,30 +48,25 @@ public class SuperStreamProducerTest { Environment environment; - Connection connection; + Client configurationClient; int partitions = 3; String superStream; String[] routingKeys = null; TestUtils.ClientFactory cf; @BeforeEach - void init(TestInfo info) throws Exception { + void init(TestInfo info) { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); - connection = new ConnectionFactory().newConnection(); superStream = TestUtils.streamName(info); + configurationClient = cf.get(); } @AfterEach void tearDown() throws Exception { environment.close(); - if (routingKeys == null) { - deleteSuperStreamTopology(connection, superStream, partitions); - } else { - deleteSuperStreamTopology(connection, superStream, routingKeys); - } - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } @Test @@ -85,7 +78,7 @@ void exceptionShouldBeThrownWhenSuperStreamIsSetAndRoutingIsNotConfigured() { @Test void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); Producer producer = environment .producerBuilder() @@ -140,7 +133,7 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; routingKeys = new String[] {"amer", "emea", "apac"}; - declareSuperStreamTopology(connection, superStream, routingKeys); + declareSuperStreamTopology(configurationClient, superStream, routingKeys); Producer producer = environment .producerBuilder() @@ -190,9 +183,9 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr } @Test - void messageIsNackedIfNoRouteFound() throws Exception { + void messageIsNackedIfNoRouteFound() { routingKeys = new String[] {"amer", "emea", "apac"}; - declareSuperStreamTopology(connection, superStream, routingKeys); + declareSuperStreamTopology(configurationClient, superStream, routingKeys); Producer producer = environment .producerBuilder() @@ -226,7 +219,7 @@ void messageIsNackedIfNoRouteFound() throws Exception { @Test void getLastPublishingIdShouldReturnLowestValue() throws Exception { int messageCount = 10_000; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); String producerName = "super-stream-application"; Producer producer = environment @@ -292,9 +285,9 @@ void getLastPublishingIdShouldReturnLowestValue() throws Exception { } @Test - void producerShouldNotPublishMessagesOnceClosed() throws Exception { + void producerShouldNotPublishMessagesOnceClosed() { int messageCount = 100; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); String producerName = "super-stream-application"; Producer producer = environment @@ -331,8 +324,7 @@ void producerShouldNotPublishMessagesOnceClosed() throws Exception { } @Test - void producerCreationShouldFailIfNoPartition() throws Exception { - declareSuperStreamTopology(connection, superStream, 0); + void producerCreationShouldFailIfNoPartition() { String producerName = "super-stream-application"; assertThatThrownBy( () -> { diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index 04b15df47e..8d50a4aa89 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -18,8 +18,6 @@ import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; @@ -53,35 +51,31 @@ public class SuperStreamTest { Environment environment; - Connection connection; + TestUtils.ClientFactory cf; + Client configurationClient; int partitions = 3; String superStream; String[] routingKeys = null; @BeforeEach - void init(TestInfo info) throws Exception { + void init(TestInfo info) { EnvironmentBuilder environmentBuilder = Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); environment = environmentBuilder.build(); - connection = new ConnectionFactory().newConnection(); + configurationClient = cf.get(); superStream = TestUtils.streamName(info); } @AfterEach - void tearDown() throws Exception { + void tearDown() { environment.close(); - if (routingKeys == null) { - deleteSuperStreamTopology(connection, superStream, partitions); - } else { - deleteSuperStreamTopology(connection, superStream, routingKeys); - } - connection.close(); + deleteSuperStreamTopology(configurationClient, superStream); } @Test - void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { + void allMessagesSentWithHashRoutingShouldBeThenConsumed() { int messageCount = 10_000 * partitions; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); Producer producer = environment .producerBuilder() @@ -123,10 +117,10 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { } @Test - void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { + void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() { int messageCount = 10_000 * partitions; routingKeys = new String[] {"amer", "emea", "apac"}; - declareSuperStreamTopology(connection, superStream, routingKeys); + declareSuperStreamTopology(configurationClient, superStream, routingKeys); Producer producer = environment .producerBuilder() @@ -169,11 +163,11 @@ void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception } @Test - @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) - void allMessagesForSameUserShouldEndUpInSamePartition() throws Exception { + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_0) + void allMessagesForSameUserShouldEndUpInSamePartition() { int messageCount = 10_000 * partitions; int userCount = 10; - declareSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(configurationClient, superStream, partitions); AtomicInteger totalReceivedCount = new AtomicInteger(0); // . => count diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 5dafd5bf94..3130745921 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -13,8 +13,11 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static io.vavr.Tuple.*; import static java.lang.String.format; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; @@ -22,6 +25,7 @@ import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.stream.Address; import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Host; @@ -34,7 +38,6 @@ import com.rabbitmq.stream.impl.Client.StreamMetadata; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.vavr.Tuple; import io.vavr.Tuple2; import java.io.BufferedReader; import java.io.IOException; @@ -49,18 +52,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.*; @@ -91,6 +84,8 @@ public final class TestUtils { private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10); + private static final ConnectionFactory AMQP_CF = new ConnectionFactory(); + private TestUtils() {} public static Duration waitAtMost(CallableBooleanSupplier condition) throws Exception { @@ -281,60 +276,67 @@ static void doIfNotNull(T obj, Consumer action) { } } - static void declareSuperStreamTopology(Connection connection, String superStream, int partitions) - throws Exception { + static void declareSuperStreamTopology(Client client, String superStream, int partitions) { declareSuperStreamTopology( - connection, + client, superStream, IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new)); } - static void declareSuperStreamTopology(Connection connection, String superStream, String... rks) - throws Exception { - try (Channel ch = connection.createChannel()) { - ch.exchangeDeclare( - superStream, - BuiltinExchangeType.DIRECT, - true, - false, - Collections.singletonMap("x-super-stream", true)); - - List> bindings = new ArrayList<>(rks.length); - for (int i = 0; i < rks.length; i++) { - bindings.add(Tuple.of(rks[i], i)); - } - // shuffle the order to make sure we get in the correct order from the server - Collections.shuffle(bindings); - - for (Tuple2 binding : bindings) { - String routingKey = binding._1(); - String partitionName = superStream + "-" + routingKey; - ch.queueDeclare( - partitionName, true, false, false, Collections.singletonMap("x-queue-type", "stream")); - ch.queueBind( - partitionName, + static void declareSuperStreamTopology(Client client, String superStream, String... rks) { + List partitions = + Arrays.stream(rks).map(rk -> superStream + "-" + rk).collect(toList()); + if (atLeastVersion("3.13.0", client.brokerVersion())) { + client.createSuperStream(superStream, partitions, asList(rks), null); + } else { + try (Connection connection = connection(); + Channel ch = connection.createChannel()) { + ch.exchangeDeclare( superStream, - routingKey, - Collections.singletonMap("x-stream-partition-order", binding._2())); + BuiltinExchangeType.DIRECT, + true, + false, + Collections.singletonMap("x-super-stream", true)); + List> bindings = new ArrayList<>(rks.length); + for (int i = 0; i < rks.length; i++) { + bindings.add(of(rks[i], i)); + } + // shuffle the order to make sure we get in the correct order from the server + Collections.shuffle(bindings); + + for (Tuple2 binding : bindings) { + String routingKey = binding._1(); + String partitionName = superStream + "-" + routingKey; + ch.queueDeclare( + partitionName, + true, + false, + false, + Collections.singletonMap("x-queue-type", "stream")); + ch.queueBind( + partitionName, + superStream, + routingKey, + Collections.singletonMap("x-stream-partition-order", binding._2())); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } - static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions) - throws Exception { - deleteSuperStreamTopology( - connection, - superStream, - IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new)); - } - - static void deleteSuperStreamTopology( - Connection connection, String superStream, String... routingKeys) throws Exception { - try (Channel ch = connection.createChannel()) { - ch.exchangeDelete(superStream); - for (String routingKey : routingKeys) { - String partitionName = superStream + "-" + routingKey; - ch.queueDelete(partitionName); + static void deleteSuperStreamTopology(Client client, String superStream) { + if (atLeastVersion("3.13.0", client.brokerVersion())) { + client.deleteSuperStream(superStream); + } else { + try (Connection connection = connection(); + Channel ch = connection.createChannel()) { + ch.exchangeDelete(superStream); + for (String partition : client.partitions(superStream)) { + ch.queueDelete(partition); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } @@ -1016,7 +1018,7 @@ static void waitMs(long waitTime) { public @interface SingleActiveConsumer {} public enum BrokerVersion { - RABBITMQ_3_11("3.11.0"), + RABBITMQ_3_11_0("3.11.0"), RABBITMQ_3_11_7("3.11.7"), RABBITMQ_3_11_9("3.11.9"), RABBITMQ_3_11_11("3.11.11"), @@ -1085,4 +1087,8 @@ static void repeatIfFailure(RunnableWithException test) throws Exception { throw (Exception) lastException; } } + + private static Connection connection() throws IOException, TimeoutException { + return AMQP_CF.newConnection(); + } }