diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 6f79e73f99..1cf3e9035a 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -880,4 +880,4 @@ entry, which has its own offset. This means one must be careful when basing some decision on offset values, like a modulo to perform an operation every X messages. As the message offsets have -no guarantee to be contiguous, the operation may not happen exactly every X messages. \ No newline at end of file +no guarantee to be contiguous, the operation may not happen exactly every X messages. diff --git a/src/docs/asciidoc/index.adoc b/src/docs/asciidoc/index.adoc index 5b318d0f9e..cabc0626d3 100644 --- a/src/docs/asciidoc/index.adoc +++ b/src/docs/asciidoc/index.adoc @@ -22,6 +22,8 @@ include::sample-application.adoc[] include::api.adoc[] +include::super-streams.adoc[] + include::building.adoc[] include::performance-tool.adoc[] \ No newline at end of file diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc new file mode 100644 index 0000000000..b0b51c3ad1 --- /dev/null +++ b/src/docs/asciidoc/super-streams.adoc @@ -0,0 +1,168 @@ +:test-examples: ../../test/java/com/rabbitmq/stream/docs + +[[super-streams]] +==== Super Streams (Partitioned Streams) + +[WARNING] +.Experimental +==== +Super streams are an experimental feature, they are subject to change. +==== + +A super stream is a logical stream made of several individual streams. +In essence, a super stream is a partitioned stream that brings scalability compared to a single stream. + +The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use. +Application code should not be impacted whether it uses individual or super streams. + +===== Topology + +A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity. +The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them. +This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to _describe_ the super stream topology, that is the streams it is made of. + +Let's take the example of an `invoices` super stream made of 3 streams (i.e. partitions): + +* an `invoices` exchange represents the super stream +* the `invoices-0`, `invoices-1`, `invoices-2` streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ) +* 3 bindings between the exchange and the streams link the super stream to its partitions and represent _routing rules_ + +.The topology of a super stream is defined with bindings between an exchange and queues +[ditaa] +.... + 0 +------------+ + +----->+ invoices–0 | + | +------------+ ++----------+ | +| invoices | | 1 +------------+ +| +---+----->+ invoices–1 | +| exchange | | +------------+ ++----------+ | + | 2 +------------+ + +----->+ invoices–2 | + +------------+ +.... + +When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules. +From the application code point of view, using a super stream is mostly configuration-based. +Some logic must also be provided to extract routing information from messages. + +===== Publishing to a Super Stream + +When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward: + +.Creating a Producer for a Super Stream +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-simple] +-------- +<1> Use the super stream name +<2> Provide the logic to get the routing key from a message +<3> Create the producer instance +<4> Close the producer when it's no longer necessary + +Note that even though the `invoices` super stream is not an actual stream, its name must be used to declare the producer. +Internally the client will figure out the streams that compose the super stream. +The application code must provide the logic to extract a routing key from a message as a `Function`. +The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation). + +The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key. +This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function: + +.Specifying a custom hash function +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function] +-------- +<1> Use `String#hashCode()` to hash the routing key + +Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages. + +====== Resolving Routes with Bindings + +Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. +The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams. + +This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below: + +.A super stream with a partition for a region in a world +[ditaa] +.... + amer +---------------+ + +------>+ invoices–amer | + | +---------------+ ++----------+ | +| invoices | | emea +---------------+ +| +---+------>+ invoices–emea | +| exchange | | +---------------+ ++----------+ | + | apac +---------------+ + +------>+ invoices–apac | + +---------------+ +.... + +In such a case, the routing key will be a property of the message that represents the region: + +.Enabling the "key" routing strategy +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy] +-------- +<1> Extract the routing key +<2> Enable the "key" routing strategy + +Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. +Note the client caches results, it does not query the broker for every message. + +====== Using a Custom Routing Strategy + +The solution that provides the most control over routing is using a custom routing strategy. +This should be needed only for specific cases. + +The following code sample shows how to implement a simplistic round-robin `RoutingStrategy` and use it in the producer. +Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity's sake. + +.Setting a round-robin routing strategy +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-routing-strategy] +-------- +<1> No need to set the routing key extraction logic +<2> Set the custom routing strategy + +====== Deduplication + +Deduplication for a super stream producer works the same way as with a <>. +The publishing ID values are spread across the streams but this does affect the mechanism. + +===== Consuming From a Super Stream + +A super stream consumer is not much different from a single stream consumer. +The `ConsumerBuilder#superStream(String)` must be used to set the super stream to consume from: + +.Declaring a super stream consumer +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=consumer-simple] +-------- +<1> Set the super stream name +<2> Close the consumer when it is no longer necessary + +A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them. + +====== Offset Tracking + +The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer. +There are still some subtle differences, so a good understanding of <> in general and of the <> and <> offset tracking strategies is recommended. + +Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming: + +* *automatic offset tracking*: internally, _the client divides the `messageCountBeforeStorage` setting by the number of partitions for each individual consumer_. +Imagine a 3-partition super stream, `messageCountBeforeStorage` set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition). +In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition. +Making the client divide `messageCountBeforeStorage` by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions. +A good rule of thumb is to then multiply the expected per-stream `messageCountBeforeStorage` by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream. +* *manual offset tracking*: the `MessageHandler.Context#storeOffset()` method must be used, the `Consumer#store(long)` will fail, because an offset value has a meaning only in one stream, not in other streams. +A call to `MessageHandler.Context#storeOffset()` will store the current message offset in _its_ stream, but also the offset of the last dispatched message for the other streams of the super stream. + + diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java index a68a71327e..4655cb5a59 100644 --- a/src/main/java/com/rabbitmq/stream/Constants.java +++ b/src/main/java/com/rabbitmq/stream/Constants.java @@ -39,6 +39,7 @@ public final class Constants { public static final short CODE_PRODUCER_NOT_AVAILABLE = 10_002; public static final short CODE_PRODUCER_CLOSED = 10_003; public static final short CODE_PUBLISH_CONFIRM_TIMEOUT = 10_004; + public static final short CODE_NO_ROUTE_FOUND = 10_005; public static final short COMMAND_DECLARE_PUBLISHER = 1; public static final short COMMAND_PUBLISH = 2; diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index f6a6152f6a..61c3b2197a 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -27,7 +27,9 @@ public interface ConsumerBuilder { ConsumerBuilder stream(String stream); /** - * Set the consumer to consume from a super stream (partitioned stream). Experimental! + * Set the consumer to consume from a super stream (partitioned stream). + * + *

This is an experimental API, subject to change. * * @param superStream * @return this builder instance diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 5104fec5bb..ddbdb82d5a 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -108,37 +108,85 @@ public interface ProducerBuilder { ProducerBuilder enqueueTimeout(Duration timeout); /** - * Routing strategy for super streams. Experimental! + * Create the {@link Producer} instance. * - * @param routingKeyExtractor - * @param routingType - * @return this builder instance + * @return the configured producer */ - ProducerBuilder routing(Function routingKeyExtractor, RoutingType routingType); + Producer build(); /** - * Routing strategy for super streams. Experimental! + * Configure the routing for super streams (partitioned streams). * - * @param routingKeyExtractor - * @param routingType - * @param hash - * @return this builder instance + *

This is an experimental API, subject to change. + * + *

The to-be-created producer will be a composite producer when this method is called. It will + * use the routing configuration to find out where a message should be routed. The application + * developer must provide the logic to extract a "routing key" from a message, which will decide + * the destination(s) of the message. + * + *

The default routing strategy hashes the routing key to choose the stream (partition) to send + * the message to. + * + * Note the routing key extraction logic is required only when the built-in routing strategies + * are used. It can set to null when a custom {@link RoutingStrategy} is set + * with {@link #routing(Function)}. + * + * @param routingKeyExtractor the logic to extract a routing key from a message + * @return the routing configuration instance + * @see RoutingConfiguration */ - ProducerBuilder routing( - Function routingKeyExtractor, - RoutingType routingType, - ToIntFunction hash); + RoutingConfiguration routing(Function routingKeyExtractor); /** - * Create the {@link Producer} instance. + * Routing configuration for super streams (partitioned streams). * - * @return the configured producer + *

This is an experimental API, subject to change. */ - Producer build(); + interface RoutingConfiguration { + + /** + * Enable the "hash" routing strategy (the default). + * + *

The default hash algorithm is 32-bit MurmurHash3. + * + * @return the routing configuration instance + */ + RoutingConfiguration hash(); + + /** + * Enable the "hash" routing strategy with a specific hash algorithm. + * + * @param hash + * @return + */ + RoutingConfiguration hash(ToIntFunction hash); + + /** + * Enable the "key" routing strategy. + * + *

It consists in using the "route" command of the RabbitMQ Stream protocol to determine the + * streams to send a message to. + * + * @return the routing configuration instance + */ + RoutingConfiguration key(); + + /** + * Set the routing strategy to use. + * + *

Providing the routing strategy provides control over the streams a message is routed to + * (routing key extraction logic if relevant and destination(s) decision). + * + * @param routingStrategy + * @return the routing configuration instance + */ + RoutingConfiguration strategy(RoutingStrategy routingStrategy); - /** Routing type when using super streams. Experimental! */ - enum RoutingType { - HASH, - KEY + /** + * Go back to the producer builder. + * + * @return the producer builder + */ + ProducerBuilder producerBuilder(); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java b/src/main/java/com/rabbitmq/stream/RoutingStrategy.java similarity index 51% rename from src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java rename to src/main/java/com/rabbitmq/stream/RoutingStrategy.java index 7ee4eee844..ef936d77e6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java +++ b/src/main/java/com/rabbitmq/stream/RoutingStrategy.java @@ -11,12 +11,36 @@ // // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com. -package com.rabbitmq.stream.impl; +package com.rabbitmq.stream; -import com.rabbitmq.stream.Message; import java.util.List; +import java.util.function.Function; -interface RoutingStrategy { +/** + * Strategy to route outbound messages to appropriate streams. + * + *

This is an experimental API, subject to change. + * + *

Used for super streams (partitioned stream). + * + * @see ProducerBuilder#routing(Function) + */ +public interface RoutingStrategy { - List route(Message message); + /** + * Where to route a message. + * + * @param message + * @param metadata + * @return + */ + List route(Message message, Metadata metadata); + + /** Metadata on the super stream. */ + interface Metadata { + + List partitions(); + + List route(String routingKey); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java b/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java index c695b12295..9b559a5d00 100644 --- a/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java +++ b/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java @@ -14,41 +14,28 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.RoutingStrategy; import java.util.Collections; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.ToIntFunction; -import java.util.stream.Collectors; class HashRoutingStrategy implements RoutingStrategy { private final Function routingKeyExtractor; - private final StreamEnvironment env; - - private final List> partitions; - private final ToIntFunction hash; - HashRoutingStrategy( - String superStream, - Function routingKeyExtractor, - StreamEnvironment env, - ToIntFunction hash) { + HashRoutingStrategy(Function routingKeyExtractor, ToIntFunction hash) { this.routingKeyExtractor = routingKeyExtractor; - this.env = env; - List ps = this.env.locatorOperation(c -> c.partitions(superStream)); - this.partitions = - new CopyOnWriteArrayList<>( - ps.stream().map(Collections::singletonList).collect(Collectors.toList())); this.hash = hash; } @Override - public List route(Message message) { + public List route(Message message, Metadata metadata) { String routingKey = routingKeyExtractor.apply(message); int hashValue = hash.applyAsInt(routingKey); - return this.partitions.get((hashValue & 0x7FFFFFFF) % this.partitions.size()); + List partitions = metadata.partitions(); + return Collections.singletonList(partitions.get((hashValue & 0x7FFFFFFF) % partitions.size())); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java b/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java index 71739aadd8..dc3585ce68 100644 --- a/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java +++ b/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.RoutingStrategy; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -25,27 +26,16 @@ class RoutingKeyRoutingStrategy implements RoutingStrategy { private final Map> routingKeysToStreams = new ConcurrentHashMap<>(); - private final StreamEnvironment env; - - private final String superStream; - - RoutingKeyRoutingStrategy( - String superStream, Function routingKeyExtractor, StreamEnvironment env) { + RoutingKeyRoutingStrategy(Function routingKeyExtractor) { this.routingKeyExtractor = routingKeyExtractor; - this.env = env; - this.superStream = superStream; } @Override - public List route(Message message) { + public List route(Message message, Metadata metadata) { String routingKey = this.routingKeyExtractor.apply(message); List streams = routingKeysToStreams.computeIfAbsent( - routingKey, - routingKey1 -> { - // TODO retry on locator lookup - return env.locatorOperation(c -> c.route(routingKey1, superStream)); - }); + routingKey, routingKey1 -> metadata.route(routingKey1)); return streams; } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 90302b4917..996b6ef639 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -28,7 +28,7 @@ class StreamConsumer implements Consumer { private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class); - private final Runnable closingCallback; + private volatile Runnable closingCallback; private final Runnable closingTrackingCallback; @@ -44,15 +44,20 @@ class StreamConsumer implements Consumer { private volatile Status status; + private volatile long lastRequestedStoredOffset = 0; + private final LongConsumer trackingCallback; + private final Runnable initCallback; + StreamConsumer( String stream, OffsetSpecification offsetSpecification, MessageHandler messageHandler, String name, StreamEnvironment environment, - TrackingConfiguration trackingConfiguration) { + TrackingConfiguration trackingConfiguration, + boolean lazyInit) { try { this.name = name; @@ -87,11 +92,33 @@ class StreamConsumer implements Consumer { messageHandlerWithOrWithoutTracking = messageHandler; } - this.closingCallback = - environment.registerConsumer( - this, stream, offsetSpecification, this.name, messageHandlerWithOrWithoutTracking); + Runnable init = + () -> { + this.closingCallback = + environment.registerConsumer( + this, + stream, + offsetSpecification, + this.name, + messageHandlerWithOrWithoutTracking); + + this.status = Status.RUNNING; + }; + if (lazyInit) { + this.initCallback = init; + } else { + this.initCallback = () -> {}; + init.run(); + } + } catch (RuntimeException e) { + this.closed.set(true); + throw e; + } + } - this.status = Status.RUNNING; + void start() { + try { + this.initCallback.run(); } catch (RuntimeException e) { this.closed.set(true); throw e; @@ -102,10 +129,13 @@ class StreamConsumer implements Consumer { public void store(long offset) { trackingCallback.accept(offset); if (canTrack()) { - try { - this.trackingClient.storeOffset(this.name, this.stream, offset); - } catch (Exception e) { - LOGGER.debug("Error while trying to store offset: {}", e.getMessage()); + if (Long.compareUnsigned(this.lastRequestedStoredOffset, offset) < 0) { + try { + this.trackingClient.storeOffset(this.name, this.stream, offset); + this.lastRequestedStoredOffset = offset; + } catch (Exception e) { + LOGGER.debug("Error while trying to store offset: {}", e.getMessage()); + } } } // nothing special to do if tracking is not possible or errors, e.g. because of a network @@ -114,7 +144,7 @@ public void store(long offset) { } private boolean canTrack() { - return this.status == Status.RUNNING; + return this.status == Status.RUNNING && this.name != null; } @Override diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index d67933689f..3e50878767 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -33,6 +33,7 @@ class StreamConsumerBuilder implements ConsumerBuilder { private String name; private DefaultAutoTrackingStrategy autoTrackingStrategy; private DefaultManualTrackingStrategy manualTrackingStrategy; + private boolean lazyInit = false; public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -62,6 +63,10 @@ public ConsumerBuilder messageHandler(MessageHandler messageHandler) { return this; } + MessageHandler messageHandler() { + return this.messageHandler; + } + @Override public ConsumerBuilder name(String name) { if (name == null || name.length() > NAME_MAX_SIZE) { @@ -86,6 +91,11 @@ public AutoTrackingStrategy autoTrackingStrategy() { return this.autoTrackingStrategy; } + StreamConsumerBuilder lazyInit(boolean lazyInit) { + this.lazyInit = lazyInit; + return this; + } + @Override public Consumer build() { if (this.stream == null && this.superStream == null) { @@ -131,10 +141,12 @@ public Consumer build() { this.messageHandler, this.name, this.environment, - trackingConfiguration); + trackingConfiguration, + this.lazyInit); environment.addConsumer((StreamConsumer) consumer); } else { - consumer = new SuperStreamConsumer(this, this.superStream, this.environment); + consumer = + new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration); } return consumer; } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 6f56114323..e7da540cd0 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -16,6 +16,7 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.ProducerBuilder; +import com.rabbitmq.stream.RoutingStrategy; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.compression.Compression; import java.lang.reflect.Field; @@ -45,11 +46,7 @@ class StreamProducerBuilder implements ProducerBuilder { private Duration enqueueTimeout = Duration.ofSeconds(10); - private Function routingKeyExtractor; - - private RoutingType routingType; - - private ToIntFunction hash = HashUtils.MURMUR3; + private DefaultRoutingConfiguration routingConfiguration; StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -126,32 +123,14 @@ public ProducerBuilder enqueueTimeout(Duration timeout) { } @Override - public ProducerBuilder routing( - Function routingKeyExtractor, RoutingType routingType) { - return this.routing(routingKeyExtractor, routingType, HashUtils.MURMUR3); - } - - @Override - public ProducerBuilder routing( - Function routingKeyExtractor, - RoutingType routingType, - ToIntFunction hash) { - if (routingKeyExtractor == null || routingType == null) { - throw new IllegalArgumentException( - "both routing key extractor and routing type must be non-null"); - } - this.routingKeyExtractor = routingKeyExtractor; - this.routingType = routingType; - if (hash != null) { - this.hash = hash; - } - return this; + public RoutingConfiguration routing(Function routingKeyExtractor) { + this.routingConfiguration = new DefaultRoutingConfiguration(this); + this.routingConfiguration.routingKeyExtractor = routingKeyExtractor; + return this.routingConfiguration; } void resetRouting() { - this.routingKeyExtractor = null; - this.routingType = null; - this.hash = null; + this.routingConfiguration = null; } public Producer build() { @@ -164,7 +143,7 @@ public Producer build() { } this.environment.maybeInitializeLocator(); Producer producer; - if (this.routingKeyExtractor == null) { + if (this.routingConfiguration == null) { producer = new StreamProducer( name, @@ -179,14 +158,17 @@ public Producer build() { environment); this.environment.addProducer((StreamProducer) producer); } else { - // FIXME propagate compression to super stream producer - ToIntFunction hashFunction = this.hash == null ? HashUtils.MURMUR3 : this.hash; - RoutingStrategy routingStrategy = - this.routingType == RoutingType.HASH - ? new HashRoutingStrategy( - this.stream, this.routingKeyExtractor, this.environment, hashFunction) - : new RoutingKeyRoutingStrategy( - this.stream, this.routingKeyExtractor, this.environment); + RoutingStrategy routingStrategy = this.routingConfiguration.routingStrategy; + if (routingStrategy == null) { + if (this.routingConfiguration.hash == null) { + routingStrategy = + new RoutingKeyRoutingStrategy(this.routingConfiguration.routingKeyExtractor); + } else { + routingStrategy = + new HashRoutingStrategy( + this.routingConfiguration.routingKeyExtractor, this.routingConfiguration.hash); + } + } producer = new SuperStreamProducer(this, this.name, this.stream, routingStrategy, this.environment); } @@ -205,4 +187,53 @@ StreamProducerBuilder duplicate() { } return duplicate; } + + static final class DefaultRoutingConfiguration implements RoutingConfiguration { + + private final StreamProducerBuilder producerBuilder; + + private Function routingKeyExtractor; + + private RoutingStrategy routingStrategy; + + private ToIntFunction hash = HashUtils.MURMUR3; + + DefaultRoutingConfiguration(StreamProducerBuilder producerBuilder) { + this.producerBuilder = producerBuilder; + } + + @Override + public RoutingConfiguration hash() { + if (this.hash == null) { + this.hash = HashUtils.MURMUR3; + } + this.routingStrategy = null; + return this; + } + + @Override + public RoutingConfiguration hash(ToIntFunction hash) { + this.hash = hash; + this.routingStrategy = null; + return this; + } + + @Override + public RoutingConfiguration key() { + this.hash = null; + this.routingStrategy = null; + return this; + } + + @Override + public RoutingConfiguration strategy(RoutingStrategy routingStrategy) { + this.routingStrategy = routingStrategy; + return this; + } + + @Override + public ProducerBuilder producerBuilder() { + return this.producerBuilder; + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index 7ea335a0fa..723c4f7972 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -14,6 +14,11 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -27,13 +32,108 @@ class SuperStreamConsumer implements Consumer { private final Map consumers = new ConcurrentHashMap<>(); SuperStreamConsumer( - StreamConsumerBuilder builder, String superStream, StreamEnvironment environment) { + StreamConsumerBuilder builder, + String superStream, + StreamEnvironment environment, + TrackingConfiguration trackingConfiguration) { this.superStream = superStream; - for (String partition : environment.locatorOperation(c -> c.partitions(superStream))) { - Consumer consumer = builder.duplicate().superStream(null).stream(partition).build(); + List partitions = environment.locatorOperation(c -> c.partitions(superStream)); + + // for manual offset tracking strategy only + ConsumerState[] states = new ConsumerState[partitions.size()]; + Map partitionToStates = new HashMap<>(partitions.size()); + for (int i = 0; i < partitions.size(); i++) { + ConsumerState state = new ConsumerState(); + states[i] = state; + partitionToStates.put(partitions.get(i), state); + } + // end of manual offset tracking strategy + + for (String partition : partitions) { + ConsumerState state = partitionToStates.get(partition); + MessageHandler messageHandler; + if (trackingConfiguration.enabled() && trackingConfiguration.manual()) { + messageHandler = + new ManualOffsetTrackingMessageHandler(builder.messageHandler(), states, state); + } else { + messageHandler = builder.messageHandler(); + } + StreamConsumerBuilder subConsumerBuilder = builder.duplicate(); + + if (trackingConfiguration.enabled() && trackingConfiguration.auto()) { + subConsumerBuilder = + (StreamConsumerBuilder) + subConsumerBuilder + .autoTrackingStrategy() + .messageCountBeforeStorage( + trackingConfiguration.autoMessageCountBeforeStorage() / partitions.size()) + .builder(); + } + + Consumer consumer = + subConsumerBuilder.lazyInit(true).superStream(null).messageHandler(messageHandler).stream( + partition) + .build(); consumers.put(partition, consumer); + state.consumer = consumer; LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", partition, superStream); } + + consumers.values().forEach(c -> ((StreamConsumer) c).start()); + } + + private static final class ConsumerState { + + private volatile long offset = 0; + private volatile Consumer consumer; + } + + private static final class ManualOffsetTrackingMessageHandler implements MessageHandler { + + private final MessageHandler delegate; + private final ConsumerState[] consumerStates; + private final ConsumerState consumerState; + + private ManualOffsetTrackingMessageHandler( + MessageHandler delegate, ConsumerState[] consumerStates, ConsumerState consumerState) { + this.delegate = delegate; + this.consumerStates = consumerStates; + this.consumerState = consumerState; + } + + @Override + public void handle(Context context, Message message) { + Context ctx = + new Context() { + @Override + public long offset() { + return context.offset(); + } + + @Override + public long timestamp() { + return context.timestamp(); + } + + @Override + public void storeOffset() { + for (ConsumerState state : consumerStates) { + if (ManualOffsetTrackingMessageHandler.this.consumerState == state) { + context.storeOffset(); + } else if (state.offset != 0) { + state.consumer.store(state.offset); + } + } + } + + @Override + public Consumer consumer() { + return context.consumer(); + } + }; + this.delegate.handle(ctx, message); + consumerState.offset = context.offset(); + } } @Override diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java index e36c9305fe..3f88876d07 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java @@ -15,13 +15,18 @@ import com.rabbitmq.stream.Codec; import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.ConfirmationStatus; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageBuilder; import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.RoutingStrategy; +import com.rabbitmq.stream.RoutingStrategy.Metadata; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +41,7 @@ class SuperStreamProducer implements Producer { private final StreamProducerBuilder producerBuilder; private final StreamEnvironment environment; private final String name; + private final Metadata superStreamMetadata; SuperStreamProducer( StreamProducerBuilder producerBuilder, @@ -48,6 +54,7 @@ class SuperStreamProducer implements Producer { this.name = name; this.superStream = superStream; this.environment = streamEnvironment; + this.superStreamMetadata = new DefaultSuperStreamMetadata(this.superStream, this.environment); this.producerBuilder = producerBuilder.duplicate(); this.producerBuilder.stream(null); this.producerBuilder.resetRouting(); @@ -86,16 +93,21 @@ public long getLastPublishingId() { public void send(Message message, ConfirmationHandler confirmationHandler) { // TODO handle when the stream is not found (no partition found for the message) // and call the confirmation handler with a failure - List streams = this.routingStrategy.route(message); - for (String stream : streams) { - Producer producer = - producers.computeIfAbsent( - stream, - stream1 -> { - Producer p = producerBuilder.duplicate().stream(stream1).build(); - return p; - }); - producer.send(message, confirmationHandler); + List streams = this.routingStrategy.route(message, superStreamMetadata); + if (streams.isEmpty()) { + confirmationHandler.handle( + new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND)); + } else { + for (String stream : streams) { + Producer producer = + producers.computeIfAbsent( + stream, + stream1 -> { + Producer p = producerBuilder.duplicate().stream(stream1).build(); + return p; + }); + producer.send(message, confirmationHandler); + } } } @@ -113,4 +125,31 @@ public void close() { } } } + + private static class DefaultSuperStreamMetadata implements Metadata { + + private final String superStream; + private final StreamEnvironment environment; + private final List partitions; + private final Map> routes = new ConcurrentHashMap<>(); + + private DefaultSuperStreamMetadata(String superStream, StreamEnvironment environment) { + this.superStream = superStream; + this.environment = environment; + List ps = environment.locatorOperation(c -> c.partitions(superStream)); + this.partitions = new CopyOnWriteArrayList<>(ps); + } + + @Override + public List partitions() { + return partitions; + } + + @Override + public List route(String routingKey) { + return routes.computeIfAbsent( + routingKey, + routingKey1 -> environment.locatorOperation(c -> c.route(routingKey1, superStream))); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java new file mode 100644 index 0000000000..e6a34f8e40 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java @@ -0,0 +1,99 @@ +// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.stream.docs; + +import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.RoutingStrategy; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public class SuperStreamUsage { + + void producerSimple() { + Environment environment = Environment.builder().build(); + // tag::producer-simple[] + Producer producer = environment.producerBuilder() + .stream("invoices") // <1> + .routing(message -> message.getProperties().getMessageIdAsString()) // <2> + .producerBuilder() + .build(); // <3> + // ... + producer.close(); // <4> + // end::producer-simple[] + } + + void producerCustomHashFunction() { + Environment environment = Environment.builder().build(); + // tag::producer-custom-hash-function[] + Producer producer = environment.producerBuilder() + .stream("invoices") + .routing(message -> message.getProperties().getMessageIdAsString()) + .hash(rk -> rk.hashCode()) // <1> + .producerBuilder() + .build(); + // end::producer-custom-hash-function[] + } + + void producerKeyRoutingStrategy() { + Environment environment = Environment.builder().build(); + // tag::producer-key-routing-strategy[] + Producer producer = environment.producerBuilder() + .stream("invoices") + .routing(msg -> msg.getApplicationProperties().get("region").toString()) // <1> + .key() // <2> + .producerBuilder() + .build(); + // end::producer-key-routing-strategy[] + } + + void producerCustomRoutingStrategy() { + Environment environment = Environment.builder().build(); + // tag::producer-custom-routing-strategy[] + AtomicLong messageCount = new AtomicLong(0); + RoutingStrategy routingStrategy = (message, metadata) -> { + List partitions = metadata.partitions(); + String stream = partitions.get( + (int) messageCount.getAndIncrement() % partitions.size() + ); + return Collections.singletonList(stream); + }; + Producer producer = environment.producerBuilder() + .stream("invoices") + .routing(null) // <1> + .strategy(routingStrategy) // <2> + .producerBuilder() + .build(); + // end::producer-custom-routing-strategy[] + } + + void consumerSimple() { + Environment environment = Environment.builder().build(); + // tag::consumer-simple[] + Consumer consumer = environment.consumerBuilder() + .superStream("invoices") // <1> + .messageHandler((context, message) -> { + // message processing + }) + .build(); + // ... + consumer.close(); // <2> + // end::consumer-simple[] + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java index 6daadb1d9b..5c7f0f5540 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java @@ -21,7 +21,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -55,7 +54,6 @@ void tearDown() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void routeShouldReturnEmptyListWhenExchangeDoesNotExist() { assertThat(cf.get().route("", UUID.randomUUID().toString())).isEmpty(); } @@ -66,7 +64,6 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() { } @Test - @BrokerVersionAtLeast("3.9.6") void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception { declareSuperStreamTopology(connection, superStream, partitions); @@ -84,7 +81,6 @@ void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void routeTopologyWithPartitionCount() throws Exception { declareSuperStreamTopology(connection, superStream, 3); @@ -100,7 +96,6 @@ void routeTopologyWithPartitionCount() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception { declareSuperStreamTopology(connection, superStream, 3); connection.createChannel().queueBind(superStream + "-1", superStream, "0"); @@ -118,7 +113,6 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception } @Test - @BrokerVersionAtLeast("3.9.6") void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { declareSuperStreamTopology(connection, superStream, 3); Channel channel = connection.createChannel(); @@ -137,7 +131,6 @@ void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void partitionsReturnsCorrectOrder() throws Exception { String[] partitionNames = {"z", "y", "x"}; declareSuperStreamTopology(connection, superStream, partitionNames); diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java index bb9cffefa4..35f67dbe49 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java @@ -27,7 +27,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.impl.Client.ClientParameters; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -97,7 +96,6 @@ void tearDown() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void consumeAllMessagesFromAllPartitions() throws Exception { declareSuperStreamTopology(connection, superStream, partitionCount); Client client = cf.get(); @@ -128,4 +126,107 @@ void consumeAllMessagesFromAllPartitions() throws Exception { }); consumer.close(); } + + @Test + void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { + declareSuperStreamTopology(connection, superStream, partitionCount); + Client client = cf.get(); + List partitions = client.partitions(superStream); + int messageCount = 10000 * partitionCount; + publishToPartitions(cf, partitions, messageCount); + ConcurrentMap messagesReceived = new ConcurrentHashMap<>(partitionCount); + ConcurrentMap lastOffsets = new ConcurrentHashMap<>(partitionCount); + partitions.forEach( + p -> { + messagesReceived.put(p, new AtomicInteger(0)); + }); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + String consumerName = "my-app"; + AtomicInteger totalCount = new AtomicInteger(); + Consumer consumer = + environment + .consumerBuilder() + .superStream(superStream) + .offset(OffsetSpecification.first()) + .name(consumerName) + .manualTrackingStrategy() + .builder() + .messageHandler( + (context, message) -> { + String partition = new String(message.getBodyAsBinary()); + messagesReceived.get(partition).incrementAndGet(); + lastOffsets.put(partition, context.offset()); + totalCount.incrementAndGet(); + if (totalCount.get() % 50 == 0) { + context.storeOffset(); + } + consumeLatch.countDown(); + }) + .build(); + latchAssert(consumeLatch).completes(); + assertThat(messagesReceived).hasSize(partitionCount); + partitions.forEach( + p -> { + assertThat(messagesReceived).containsKey(p); + assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount); + }); + // checking stored offsets are big enough + // offset near the end (the message count per partition minus a few messages) + long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10); + partitions.forEach( + p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset)); + consumer.close(); + } + + @Test + void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { + declareSuperStreamTopology(connection, superStream, partitionCount); + Client client = cf.get(); + List partitions = client.partitions(superStream); + int messageCount = 10000 * partitionCount; + publishToPartitions(cf, partitions, messageCount); + ConcurrentMap messagesReceived = new ConcurrentHashMap<>(partitionCount); + ConcurrentMap lastOffsets = new ConcurrentHashMap<>(partitionCount); + partitions.forEach( + p -> { + messagesReceived.put(p, new AtomicInteger(0)); + }); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + String consumerName = "my-app"; + AtomicInteger totalCount = new AtomicInteger(); + Consumer consumer = + environment + .consumerBuilder() + .superStream(superStream) + .offset(OffsetSpecification.first()) + .name(consumerName) + .autoTrackingStrategy() + .messageCountBeforeStorage(messageCount / partitionCount / 50) + .builder() + .messageHandler( + (context, message) -> { + String partition = new String(message.getBodyAsBinary()); + messagesReceived.get(partition).incrementAndGet(); + lastOffsets.put(partition, context.offset()); + totalCount.incrementAndGet(); + if (totalCount.get() % 50 == 0) { + context.storeOffset(); + } + consumeLatch.countDown(); + }) + .build(); + latchAssert(consumeLatch).completes(); + assertThat(messagesReceived).hasSize(partitionCount); + partitions.forEach( + p -> { + assertThat(messagesReceived).containsKey(p); + assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount); + }); + // checking stored offsets are big enough + // offset near the end (the message count per partition minus a few messages) + long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10); + partitions.forEach( + p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset)); + consumer.close(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index 37f28988e9..c256168449 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -22,17 +22,18 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.ProducerBuilder.RoutingType; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; @@ -75,13 +76,13 @@ void tearDown() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; declareSuperStreamTopology(connection, superStream, partitions); Producer producer = environment.producerBuilder().stream(superStream) - .routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH) + .routing(message -> message.getProperties().getMessageIdAsString()) + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); @@ -127,16 +128,15 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; routingKeys = new String[] {"amer", "emea", "apac"}; declareSuperStreamTopology(connection, superStream, routingKeys); Producer producer = environment.producerBuilder().stream(superStream) - .routing( - message -> message.getApplicationProperties().get("region").toString(), - RoutingType.KEY) + .routing(message -> message.getApplicationProperties().get("region").toString()) + .key() + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); @@ -179,14 +179,46 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr } @Test - @BrokerVersionAtLeast("3.9.6") + void messageIsNackedIfNoRouteFound() throws Exception { + routingKeys = new String[] {"amer", "emea", "apac"}; + declareSuperStreamTopology(connection, superStream, routingKeys); + Producer producer = + environment.producerBuilder().stream(superStream) + .routing(message -> message.getApplicationProperties().get("region").toString()) + .key() + .producerBuilder() + .build(); + + AtomicBoolean confirmed = new AtomicBoolean(true); + AtomicInteger code = new AtomicInteger(); + CountDownLatch publishLatch = new CountDownLatch(1); + producer.send( + producer + .messageBuilder() + .applicationProperties() + .entry("region", "atlantis") + .messageBuilder() + .build(), + confirmationStatus -> { + confirmed.set(confirmationStatus.isConfirmed()); + code.set(confirmationStatus.getCode()); + publishLatch.countDown(); + }); + + assertThat(latchAssert(publishLatch)).completes(5); + assertThat(confirmed).isFalse(); + assertThat(code).hasValue(Constants.CODE_NO_ROUTE_FOUND); + } + + @Test void getLastPublishingIdShouldReturnLowestValue() throws Exception { int messageCount = 10_000; declareSuperStreamTopology(connection, superStream, partitions); String producerName = "super-stream-application"; Producer producer = environment.producerBuilder().name(producerName).stream(superStream) - .routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH) + .routing(message -> message.getProperties().getMessageIdAsString()) + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index 2ff8e1b939..ca3ece9e6c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology; +import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static com.rabbitmq.stream.impl.TestUtils.localhost; import static org.assertj.core.api.Assertions.assertThat; @@ -24,8 +25,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.ProducerBuilder.RoutingType; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -62,21 +61,21 @@ void init(TestInfo info) throws Exception { void tearDown() throws Exception { environment.close(); if (routingKeys == null) { - // deleteSuperStreamTopology(connection, superStream, partitions); + deleteSuperStreamTopology(connection, superStream, partitions); } else { - // deleteSuperStreamTopology(connection, superStream, routingKeys); + deleteSuperStreamTopology(connection, superStream, routingKeys); } connection.close(); } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000 * partitions; declareSuperStreamTopology(connection, superStream, partitions); Producer producer = environment.producerBuilder().stream(superStream) - .routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH) + .routing(message -> message.getProperties().getMessageIdAsString()) + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); @@ -112,16 +111,15 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000 * partitions; routingKeys = new String[] {"amer", "emea", "apac"}; declareSuperStreamTopology(connection, superStream, routingKeys); Producer producer = environment.producerBuilder().stream(superStream) - .routing( - message -> message.getApplicationProperties().get("region").toString(), - RoutingType.KEY) + .routing(message -> message.getApplicationProperties().get("region").toString()) + .key() + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount);