From 2be8bc74078da55950119d1cf08fd5087f3d3c2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 17 Sep 2021 09:12:41 +0200 Subject: [PATCH 1/7] Handle manual offset tracking in super stream consumer Offset storage on a given message trigger the storage for other partitions in the super stream. --- .../rabbitmq/stream/impl/StreamConsumer.java | 52 +++++++--- .../stream/impl/StreamConsumerBuilder.java | 16 +++- .../stream/impl/SuperStreamConsumer.java | 96 ++++++++++++++++++- .../stream/impl/SuperStreamConsumerTest.java | 52 ++++++++++ 4 files changed, 200 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 90302b4917..996b6ef639 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -28,7 +28,7 @@ class StreamConsumer implements Consumer { private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class); - private final Runnable closingCallback; + private volatile Runnable closingCallback; private final Runnable closingTrackingCallback; @@ -44,15 +44,20 @@ class StreamConsumer implements Consumer { private volatile Status status; + private volatile long lastRequestedStoredOffset = 0; + private final LongConsumer trackingCallback; + private final Runnable initCallback; + StreamConsumer( String stream, OffsetSpecification offsetSpecification, MessageHandler messageHandler, String name, StreamEnvironment environment, - TrackingConfiguration trackingConfiguration) { + TrackingConfiguration trackingConfiguration, + boolean lazyInit) { try { this.name = name; @@ -87,11 +92,33 @@ class StreamConsumer implements Consumer { messageHandlerWithOrWithoutTracking = messageHandler; } - this.closingCallback = - environment.registerConsumer( - this, stream, offsetSpecification, this.name, messageHandlerWithOrWithoutTracking); + Runnable init = + () -> { + this.closingCallback = + environment.registerConsumer( + this, + stream, + offsetSpecification, + this.name, + messageHandlerWithOrWithoutTracking); + + this.status = Status.RUNNING; + }; + if (lazyInit) { + this.initCallback = init; + } else { + this.initCallback = () -> {}; + init.run(); + } + } catch (RuntimeException e) { + this.closed.set(true); + throw e; + } + } - this.status = Status.RUNNING; + void start() { + try { + this.initCallback.run(); } catch (RuntimeException e) { this.closed.set(true); throw e; @@ -102,10 +129,13 @@ class StreamConsumer implements Consumer { public void store(long offset) { trackingCallback.accept(offset); if (canTrack()) { - try { - this.trackingClient.storeOffset(this.name, this.stream, offset); - } catch (Exception e) { - LOGGER.debug("Error while trying to store offset: {}", e.getMessage()); + if (Long.compareUnsigned(this.lastRequestedStoredOffset, offset) < 0) { + try { + this.trackingClient.storeOffset(this.name, this.stream, offset); + this.lastRequestedStoredOffset = offset; + } catch (Exception e) { + LOGGER.debug("Error while trying to store offset: {}", e.getMessage()); + } } } // nothing special to do if tracking is not possible or errors, e.g. because of a network @@ -114,7 +144,7 @@ public void store(long offset) { } private boolean canTrack() { - return this.status == Status.RUNNING; + return this.status == Status.RUNNING && this.name != null; } @Override diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index d67933689f..3e50878767 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -33,6 +33,7 @@ class StreamConsumerBuilder implements ConsumerBuilder { private String name; private DefaultAutoTrackingStrategy autoTrackingStrategy; private DefaultManualTrackingStrategy manualTrackingStrategy; + private boolean lazyInit = false; public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -62,6 +63,10 @@ public ConsumerBuilder messageHandler(MessageHandler messageHandler) { return this; } + MessageHandler messageHandler() { + return this.messageHandler; + } + @Override public ConsumerBuilder name(String name) { if (name == null || name.length() > NAME_MAX_SIZE) { @@ -86,6 +91,11 @@ public AutoTrackingStrategy autoTrackingStrategy() { return this.autoTrackingStrategy; } + StreamConsumerBuilder lazyInit(boolean lazyInit) { + this.lazyInit = lazyInit; + return this; + } + @Override public Consumer build() { if (this.stream == null && this.superStream == null) { @@ -131,10 +141,12 @@ public Consumer build() { this.messageHandler, this.name, this.environment, - trackingConfiguration); + trackingConfiguration, + this.lazyInit); environment.addConsumer((StreamConsumer) consumer); } else { - consumer = new SuperStreamConsumer(this, this.superStream, this.environment); + consumer = + new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration); } return consumer; } diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index 7ea335a0fa..c37287ac0f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -14,6 +14,11 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.Consumer; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -27,13 +32,98 @@ class SuperStreamConsumer implements Consumer { private final Map consumers = new ConcurrentHashMap<>(); SuperStreamConsumer( - StreamConsumerBuilder builder, String superStream, StreamEnvironment environment) { + StreamConsumerBuilder builder, + String superStream, + StreamEnvironment environment, + TrackingConfiguration trackingConfiguration) { this.superStream = superStream; - for (String partition : environment.locatorOperation(c -> c.partitions(superStream))) { - Consumer consumer = builder.duplicate().superStream(null).stream(partition).build(); + List partitions = environment.locatorOperation(c -> c.partitions(superStream)); + + // for manual tracking offset strategy only + ConsumerState[] states = new ConsumerState[partitions.size()]; + Map partitionToStates = new HashMap<>(partitions.size()); + for (int i = 0; i < partitions.size(); i++) { + ConsumerState state = new ConsumerState(); + states[i] = state; + partitionToStates.put(partitions.get(i), state); + } + for (String partition : partitions) { + ConsumerState state = partitionToStates.get(partition); + MessageHandler messageHandler; + if (trackingConfiguration.enabled() && trackingConfiguration.manual()) { + messageHandler = + new ManualOffsetTrackingMessageHandler(builder.messageHandler(), states, state); + } else { + messageHandler = builder.messageHandler(); + } + Consumer consumer = + builder + .duplicate() + .lazyInit(true) + .superStream(null) + .messageHandler(messageHandler) + .stream(partition) + .build(); consumers.put(partition, consumer); + state.consumer = consumer; LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", partition, superStream); } + + consumers.values().forEach(c -> ((StreamConsumer) c).start()); + } + + private static final class ConsumerState { + + private volatile long offset = 0; + private volatile Consumer consumer; + } + + private static final class ManualOffsetTrackingMessageHandler implements MessageHandler { + + private final MessageHandler delegate; + private final ConsumerState[] consumerStates; + private final ConsumerState consumerState; + + private ManualOffsetTrackingMessageHandler( + MessageHandler delegate, ConsumerState[] consumerStates, ConsumerState consumerState) { + this.delegate = delegate; + this.consumerStates = consumerStates; + this.consumerState = consumerState; + } + + @Override + public void handle(Context context, Message message) { + Context ctx = + new Context() { + @Override + public long offset() { + return context.offset(); + } + + @Override + public long timestamp() { + return context.timestamp(); + } + + @Override + public void storeOffset() { + for (ConsumerState state : consumerStates) { + if (ManualOffsetTrackingMessageHandler.this.consumerState == state) { + context.storeOffset(); + } else if (state.offset != 0) { + state.consumer.store(state.offset); + } + } + } + + @Override + public Consumer consumer() { + return context.consumer(); + } + }; + this.delegate.handle(ctx, message); + consumerState.offset = context.offset(); + } } @Override diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java index bb9cffefa4..f0ad42cb45 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java @@ -128,4 +128,56 @@ void consumeAllMessagesFromAllPartitions() throws Exception { }); consumer.close(); } + + @Test + @BrokerVersionAtLeast("3.9.6") + void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { + declareSuperStreamTopology(connection, superStream, partitionCount); + Client client = cf.get(); + List partitions = client.partitions(superStream); + int messageCount = 10000 * partitionCount; + publishToPartitions(cf, partitions, messageCount); + ConcurrentMap messagesReceived = new ConcurrentHashMap<>(partitionCount); + ConcurrentMap lastOffsets = new ConcurrentHashMap<>(partitionCount); + partitions.forEach( + p -> { + messagesReceived.put(p, new AtomicInteger(0)); + }); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + String consumerName = "my-app"; + AtomicInteger totalCount = new AtomicInteger(); + Consumer consumer = + environment + .consumerBuilder() + .superStream(superStream) + .offset(OffsetSpecification.first()) + .name(consumerName) + .manualTrackingStrategy() + .builder() + .messageHandler( + (context, message) -> { + String partition = new String(message.getBodyAsBinary()); + messagesReceived.get(partition).incrementAndGet(); + lastOffsets.put(partition, context.offset()); + totalCount.incrementAndGet(); + if (totalCount.get() % 50 == 0) { + context.storeOffset(); + } + consumeLatch.countDown(); + }) + .build(); + latchAssert(consumeLatch).completes(); + assertThat(messagesReceived).hasSize(partitionCount); + partitions.forEach( + p -> { + assertThat(messagesReceived).containsKey(p); + assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount); + }); + // checking stored offsets are big enough + // offset near the end (the message count per partition minus a few messages) + long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10); + partitions.forEach( + p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset)); + consumer.close(); + } } From a7ba4d13610c9e4dc3c570c0ab3d5dda46c63b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 20 Sep 2021 11:28:06 +0200 Subject: [PATCH 2/7] Make RoutingStratey a public API The metadata of the super stream are now provided in the route method. This abstraction hides the infrastructure needed to find out about the super stream topology. The user can now have full control over the routing strategy, even though the provided implementations ("hash" and "key") should be enough most of the time. The configuration of the routing strategy has been also refactored to reflect the change and to be simpler. --- .../com/rabbitmq/stream/ConsumerBuilder.java | 4 +- .../com/rabbitmq/stream/ProducerBuilder.java | 86 ++++++++++---- .../stream/{impl => }/RoutingStrategy.java | 32 +++++- .../stream/impl/HashRoutingStrategy.java | 23 +--- .../impl/RoutingKeyRoutingStrategy.java | 18 +-- .../stream/impl/StreamProducerBuilder.java | 105 ++++++++++++------ .../stream/impl/SuperStreamProducer.java | 32 +++++- .../stream/impl/SuperStreamProducerTest.java | 13 ++- .../rabbitmq/stream/impl/SuperStreamTest.java | 10 +- 9 files changed, 216 insertions(+), 107 deletions(-) rename src/main/java/com/rabbitmq/stream/{impl => }/RoutingStrategy.java (51%) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index f6a6152f6a..61c3b2197a 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -27,7 +27,9 @@ public interface ConsumerBuilder { ConsumerBuilder stream(String stream); /** - * Set the consumer to consume from a super stream (partitioned stream). Experimental! + * Set the consumer to consume from a super stream (partitioned stream). + * + *

This is an experimental API, subject to change. * * @param superStream * @return this builder instance diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 5104fec5bb..5c3d4adf7d 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -108,37 +108,81 @@ public interface ProducerBuilder { ProducerBuilder enqueueTimeout(Duration timeout); /** - * Routing strategy for super streams. Experimental! + * Create the {@link Producer} instance. * - * @param routingKeyExtractor - * @param routingType - * @return this builder instance + * @return the configured producer */ - ProducerBuilder routing(Function routingKeyExtractor, RoutingType routingType); + Producer build(); /** - * Routing strategy for super streams. Experimental! + * Configure the routing for super streams (partitioned streams). * - * @param routingKeyExtractor - * @param routingType - * @param hash - * @return this builder instance + *

This is an experimental API, subject to change. + * + *

The to-be-created producer will be a composite producer when this method is called. It will + * use the routing configuration to find out where a message should be routed. The application + * developer must provide the logic to extract a "routing key" from a message, which will decide + * the destination(s) of the message. + * + *

The default routing strategy hashes the routing key to choose the stream (partition) to send + * the message to. + * + * @param routingKeyExtractor the logic to extract a routing key from a message + * @return the routing configuration instance + * @see RoutingConfiguration */ - ProducerBuilder routing( - Function routingKeyExtractor, - RoutingType routingType, - ToIntFunction hash); + RoutingConfiguration routing(Function routingKeyExtractor); /** - * Create the {@link Producer} instance. + * Routing configuration for super streams (partitioned streams). * - * @return the configured producer + *

This is an experimental API, subject to change. */ - Producer build(); + interface RoutingConfiguration { + + /** + * Enable the "hash" routing strategy (the default). + * + *

The default hash algorithm is 32-bit MurmurHash3. + * + * @return the routing configuration instance + */ + RoutingConfiguration hash(); + + /** + * Enable the "hash" routing strategy with a specific hash algorithm. + * + * @param hash + * @return + */ + RoutingConfiguration hash(ToIntFunction hash); + + /** + * Enable the "key" routing strategy. + * + *

It consists in using the "route" command of the RabbitMQ Stream protocol to determine the + * streams to send a message to. + * + * @return the routing configuration instance + */ + RoutingConfiguration key(); + + /** + * Set the routing strategy to use. + * + *

Providing the routing strategy provides control over the streams a message is routed to + * (routing key extraction logic if relevant and destination(s) decision). + * + * @param routingStrategy + * @return the routing configuration instance + */ + RoutingConfiguration strategy(RoutingStrategy routingStrategy); - /** Routing type when using super streams. Experimental! */ - enum RoutingType { - HASH, - KEY + /** + * Go back to the producer builder. + * + * @return the producer builder + */ + ProducerBuilder producerBuilder(); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java b/src/main/java/com/rabbitmq/stream/RoutingStrategy.java similarity index 51% rename from src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java rename to src/main/java/com/rabbitmq/stream/RoutingStrategy.java index 7ee4eee844..ef936d77e6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/RoutingStrategy.java +++ b/src/main/java/com/rabbitmq/stream/RoutingStrategy.java @@ -11,12 +11,36 @@ // // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com. -package com.rabbitmq.stream.impl; +package com.rabbitmq.stream; -import com.rabbitmq.stream.Message; import java.util.List; +import java.util.function.Function; -interface RoutingStrategy { +/** + * Strategy to route outbound messages to appropriate streams. + * + *

This is an experimental API, subject to change. + * + *

Used for super streams (partitioned stream). + * + * @see ProducerBuilder#routing(Function) + */ +public interface RoutingStrategy { - List route(Message message); + /** + * Where to route a message. + * + * @param message + * @param metadata + * @return + */ + List route(Message message, Metadata metadata); + + /** Metadata on the super stream. */ + interface Metadata { + + List partitions(); + + List route(String routingKey); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java b/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java index c695b12295..9b559a5d00 100644 --- a/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java +++ b/src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java @@ -14,41 +14,28 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.RoutingStrategy; import java.util.Collections; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.ToIntFunction; -import java.util.stream.Collectors; class HashRoutingStrategy implements RoutingStrategy { private final Function routingKeyExtractor; - private final StreamEnvironment env; - - private final List> partitions; - private final ToIntFunction hash; - HashRoutingStrategy( - String superStream, - Function routingKeyExtractor, - StreamEnvironment env, - ToIntFunction hash) { + HashRoutingStrategy(Function routingKeyExtractor, ToIntFunction hash) { this.routingKeyExtractor = routingKeyExtractor; - this.env = env; - List ps = this.env.locatorOperation(c -> c.partitions(superStream)); - this.partitions = - new CopyOnWriteArrayList<>( - ps.stream().map(Collections::singletonList).collect(Collectors.toList())); this.hash = hash; } @Override - public List route(Message message) { + public List route(Message message, Metadata metadata) { String routingKey = routingKeyExtractor.apply(message); int hashValue = hash.applyAsInt(routingKey); - return this.partitions.get((hashValue & 0x7FFFFFFF) % this.partitions.size()); + List partitions = metadata.partitions(); + return Collections.singletonList(partitions.get((hashValue & 0x7FFFFFFF) % partitions.size())); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java b/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java index 71739aadd8..dc3585ce68 100644 --- a/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java +++ b/src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.RoutingStrategy; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -25,27 +26,16 @@ class RoutingKeyRoutingStrategy implements RoutingStrategy { private final Map> routingKeysToStreams = new ConcurrentHashMap<>(); - private final StreamEnvironment env; - - private final String superStream; - - RoutingKeyRoutingStrategy( - String superStream, Function routingKeyExtractor, StreamEnvironment env) { + RoutingKeyRoutingStrategy(Function routingKeyExtractor) { this.routingKeyExtractor = routingKeyExtractor; - this.env = env; - this.superStream = superStream; } @Override - public List route(Message message) { + public List route(Message message, Metadata metadata) { String routingKey = this.routingKeyExtractor.apply(message); List streams = routingKeysToStreams.computeIfAbsent( - routingKey, - routingKey1 -> { - // TODO retry on locator lookup - return env.locatorOperation(c -> c.route(routingKey1, superStream)); - }); + routingKey, routingKey1 -> metadata.route(routingKey1)); return streams; } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 6f56114323..e7da540cd0 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -16,6 +16,7 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.ProducerBuilder; +import com.rabbitmq.stream.RoutingStrategy; import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.compression.Compression; import java.lang.reflect.Field; @@ -45,11 +46,7 @@ class StreamProducerBuilder implements ProducerBuilder { private Duration enqueueTimeout = Duration.ofSeconds(10); - private Function routingKeyExtractor; - - private RoutingType routingType; - - private ToIntFunction hash = HashUtils.MURMUR3; + private DefaultRoutingConfiguration routingConfiguration; StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -126,32 +123,14 @@ public ProducerBuilder enqueueTimeout(Duration timeout) { } @Override - public ProducerBuilder routing( - Function routingKeyExtractor, RoutingType routingType) { - return this.routing(routingKeyExtractor, routingType, HashUtils.MURMUR3); - } - - @Override - public ProducerBuilder routing( - Function routingKeyExtractor, - RoutingType routingType, - ToIntFunction hash) { - if (routingKeyExtractor == null || routingType == null) { - throw new IllegalArgumentException( - "both routing key extractor and routing type must be non-null"); - } - this.routingKeyExtractor = routingKeyExtractor; - this.routingType = routingType; - if (hash != null) { - this.hash = hash; - } - return this; + public RoutingConfiguration routing(Function routingKeyExtractor) { + this.routingConfiguration = new DefaultRoutingConfiguration(this); + this.routingConfiguration.routingKeyExtractor = routingKeyExtractor; + return this.routingConfiguration; } void resetRouting() { - this.routingKeyExtractor = null; - this.routingType = null; - this.hash = null; + this.routingConfiguration = null; } public Producer build() { @@ -164,7 +143,7 @@ public Producer build() { } this.environment.maybeInitializeLocator(); Producer producer; - if (this.routingKeyExtractor == null) { + if (this.routingConfiguration == null) { producer = new StreamProducer( name, @@ -179,14 +158,17 @@ public Producer build() { environment); this.environment.addProducer((StreamProducer) producer); } else { - // FIXME propagate compression to super stream producer - ToIntFunction hashFunction = this.hash == null ? HashUtils.MURMUR3 : this.hash; - RoutingStrategy routingStrategy = - this.routingType == RoutingType.HASH - ? new HashRoutingStrategy( - this.stream, this.routingKeyExtractor, this.environment, hashFunction) - : new RoutingKeyRoutingStrategy( - this.stream, this.routingKeyExtractor, this.environment); + RoutingStrategy routingStrategy = this.routingConfiguration.routingStrategy; + if (routingStrategy == null) { + if (this.routingConfiguration.hash == null) { + routingStrategy = + new RoutingKeyRoutingStrategy(this.routingConfiguration.routingKeyExtractor); + } else { + routingStrategy = + new HashRoutingStrategy( + this.routingConfiguration.routingKeyExtractor, this.routingConfiguration.hash); + } + } producer = new SuperStreamProducer(this, this.name, this.stream, routingStrategy, this.environment); } @@ -205,4 +187,53 @@ StreamProducerBuilder duplicate() { } return duplicate; } + + static final class DefaultRoutingConfiguration implements RoutingConfiguration { + + private final StreamProducerBuilder producerBuilder; + + private Function routingKeyExtractor; + + private RoutingStrategy routingStrategy; + + private ToIntFunction hash = HashUtils.MURMUR3; + + DefaultRoutingConfiguration(StreamProducerBuilder producerBuilder) { + this.producerBuilder = producerBuilder; + } + + @Override + public RoutingConfiguration hash() { + if (this.hash == null) { + this.hash = HashUtils.MURMUR3; + } + this.routingStrategy = null; + return this; + } + + @Override + public RoutingConfiguration hash(ToIntFunction hash) { + this.hash = hash; + this.routingStrategy = null; + return this; + } + + @Override + public RoutingConfiguration key() { + this.hash = null; + this.routingStrategy = null; + return this; + } + + @Override + public RoutingConfiguration strategy(RoutingStrategy routingStrategy) { + this.routingStrategy = routingStrategy; + return this; + } + + @Override + public ProducerBuilder producerBuilder() { + return this.producerBuilder; + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java index e36c9305fe..ea1b24ffaa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java @@ -18,10 +18,13 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageBuilder; import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.RoutingStrategy; +import com.rabbitmq.stream.RoutingStrategy.Metadata; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,7 @@ class SuperStreamProducer implements Producer { private final StreamProducerBuilder producerBuilder; private final StreamEnvironment environment; private final String name; + private final Metadata superStreamMetadata; SuperStreamProducer( StreamProducerBuilder producerBuilder, @@ -48,6 +52,7 @@ class SuperStreamProducer implements Producer { this.name = name; this.superStream = superStream; this.environment = streamEnvironment; + this.superStreamMetadata = new DefaultSuperStreamMetadata(this.superStream, this.environment); this.producerBuilder = producerBuilder.duplicate(); this.producerBuilder.stream(null); this.producerBuilder.resetRouting(); @@ -86,7 +91,7 @@ public long getLastPublishingId() { public void send(Message message, ConfirmationHandler confirmationHandler) { // TODO handle when the stream is not found (no partition found for the message) // and call the confirmation handler with a failure - List streams = this.routingStrategy.route(message); + List streams = this.routingStrategy.route(message, superStreamMetadata); for (String stream : streams) { Producer producer = producers.computeIfAbsent( @@ -113,4 +118,29 @@ public void close() { } } } + + private static class DefaultSuperStreamMetadata implements Metadata { + + private final String superStream; + private final StreamEnvironment environment; + private final List partitions; + private final Map> routes = new ConcurrentHashMap<>(); + + private DefaultSuperStreamMetadata(String superStream, StreamEnvironment environment) { + this.superStream = superStream; + this.environment = environment; + this.partitions = new CopyOnWriteArrayList<>(environment.locator().partitions(superStream)); + } + + @Override + public List partitions() { + return partitions; + } + + @Override + public List route(String routingKey) { + return routes.computeIfAbsent( + routingKey, routingKey1 -> environment.locator().route(routingKey1, superStream)); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index 37f28988e9..d99ae9ab83 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.ProducerBuilder.RoutingType; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.util.Map; @@ -81,7 +80,8 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex declareSuperStreamTopology(connection, superStream, partitions); Producer producer = environment.producerBuilder().stream(superStream) - .routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH) + .routing(message -> message.getProperties().getMessageIdAsString()) + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); @@ -134,9 +134,9 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr declareSuperStreamTopology(connection, superStream, routingKeys); Producer producer = environment.producerBuilder().stream(superStream) - .routing( - message -> message.getApplicationProperties().get("region").toString(), - RoutingType.KEY) + .routing(message -> message.getApplicationProperties().get("region").toString()) + .key() + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); @@ -186,7 +186,8 @@ void getLastPublishingIdShouldReturnLowestValue() throws Exception { String producerName = "super-stream-application"; Producer producer = environment.producerBuilder().name(producerName).stream(superStream) - .routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH) + .routing(message -> message.getProperties().getMessageIdAsString()) + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index 2ff8e1b939..cea3129c36 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -24,7 +24,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.ProducerBuilder.RoutingType; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.util.UUID; @@ -76,7 +75,8 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { declareSuperStreamTopology(connection, superStream, partitions); Producer producer = environment.producerBuilder().stream(superStream) - .routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH) + .routing(message -> message.getProperties().getMessageIdAsString()) + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); @@ -119,9 +119,9 @@ void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception declareSuperStreamTopology(connection, superStream, routingKeys); Producer producer = environment.producerBuilder().stream(superStream) - .routing( - message -> message.getApplicationProperties().get("region").toString(), - RoutingType.KEY) + .routing(message -> message.getApplicationProperties().get("region").toString()) + .key() + .producerBuilder() .build(); CountDownLatch publishLatch = new CountDownLatch(messageCount); From b4a8ffeba916547615587af9a963fb42e911d089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 21 Sep 2021 10:11:28 +0200 Subject: [PATCH 3/7] Adapt auto offset tracking for super stream consumers The message count before storage setting is divided by the number of partitions to make sure we store offsets approximately as set on the consumer builder (assuming partitions are balanced). --- .../stream/impl/SuperStreamConsumer.java | 24 +++++--- .../stream/impl/RoutePartitionsTest.java | 7 --- .../stream/impl/SuperStreamConsumerTest.java | 55 ++++++++++++++++++- .../stream/impl/SuperStreamProducerTest.java | 4 -- .../rabbitmq/stream/impl/SuperStreamTest.java | 8 +-- 5 files changed, 72 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index c37287ac0f..723c4f7972 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -39,7 +39,7 @@ class SuperStreamConsumer implements Consumer { this.superStream = superStream; List partitions = environment.locatorOperation(c -> c.partitions(superStream)); - // for manual tracking offset strategy only + // for manual offset tracking strategy only ConsumerState[] states = new ConsumerState[partitions.size()]; Map partitionToStates = new HashMap<>(partitions.size()); for (int i = 0; i < partitions.size(); i++) { @@ -47,6 +47,8 @@ class SuperStreamConsumer implements Consumer { states[i] = state; partitionToStates.put(partitions.get(i), state); } + // end of manual offset tracking strategy + for (String partition : partitions) { ConsumerState state = partitionToStates.get(partition); MessageHandler messageHandler; @@ -56,13 +58,21 @@ class SuperStreamConsumer implements Consumer { } else { messageHandler = builder.messageHandler(); } + StreamConsumerBuilder subConsumerBuilder = builder.duplicate(); + + if (trackingConfiguration.enabled() && trackingConfiguration.auto()) { + subConsumerBuilder = + (StreamConsumerBuilder) + subConsumerBuilder + .autoTrackingStrategy() + .messageCountBeforeStorage( + trackingConfiguration.autoMessageCountBeforeStorage() / partitions.size()) + .builder(); + } + Consumer consumer = - builder - .duplicate() - .lazyInit(true) - .superStream(null) - .messageHandler(messageHandler) - .stream(partition) + subConsumerBuilder.lazyInit(true).superStream(null).messageHandler(messageHandler).stream( + partition) .build(); consumers.put(partition, consumer); state.consumer = consumer; diff --git a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java index 6daadb1d9b..5c7f0f5540 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java @@ -21,7 +21,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -55,7 +54,6 @@ void tearDown() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void routeShouldReturnEmptyListWhenExchangeDoesNotExist() { assertThat(cf.get().route("", UUID.randomUUID().toString())).isEmpty(); } @@ -66,7 +64,6 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() { } @Test - @BrokerVersionAtLeast("3.9.6") void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception { declareSuperStreamTopology(connection, superStream, partitions); @@ -84,7 +81,6 @@ void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void routeTopologyWithPartitionCount() throws Exception { declareSuperStreamTopology(connection, superStream, 3); @@ -100,7 +96,6 @@ void routeTopologyWithPartitionCount() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception { declareSuperStreamTopology(connection, superStream, 3); connection.createChannel().queueBind(superStream + "-1", superStream, "0"); @@ -118,7 +113,6 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception } @Test - @BrokerVersionAtLeast("3.9.6") void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { declareSuperStreamTopology(connection, superStream, 3); Channel channel = connection.createChannel(); @@ -137,7 +131,6 @@ void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void partitionsReturnsCorrectOrder() throws Exception { String[] partitionNames = {"z", "y", "x"}; declareSuperStreamTopology(connection, superStream, partitionNames); diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java index f0ad42cb45..35f67dbe49 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java @@ -27,7 +27,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.impl.Client.ClientParameters; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -97,7 +96,6 @@ void tearDown() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void consumeAllMessagesFromAllPartitions() throws Exception { declareSuperStreamTopology(connection, superStream, partitionCount); Client client = cf.get(); @@ -130,7 +128,6 @@ void consumeAllMessagesFromAllPartitions() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { declareSuperStreamTopology(connection, superStream, partitionCount); Client client = cf.get(); @@ -180,4 +177,56 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception { p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset)); consumer.close(); } + + @Test + void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { + declareSuperStreamTopology(connection, superStream, partitionCount); + Client client = cf.get(); + List partitions = client.partitions(superStream); + int messageCount = 10000 * partitionCount; + publishToPartitions(cf, partitions, messageCount); + ConcurrentMap messagesReceived = new ConcurrentHashMap<>(partitionCount); + ConcurrentMap lastOffsets = new ConcurrentHashMap<>(partitionCount); + partitions.forEach( + p -> { + messagesReceived.put(p, new AtomicInteger(0)); + }); + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + String consumerName = "my-app"; + AtomicInteger totalCount = new AtomicInteger(); + Consumer consumer = + environment + .consumerBuilder() + .superStream(superStream) + .offset(OffsetSpecification.first()) + .name(consumerName) + .autoTrackingStrategy() + .messageCountBeforeStorage(messageCount / partitionCount / 50) + .builder() + .messageHandler( + (context, message) -> { + String partition = new String(message.getBodyAsBinary()); + messagesReceived.get(partition).incrementAndGet(); + lastOffsets.put(partition, context.offset()); + totalCount.incrementAndGet(); + if (totalCount.get() % 50 == 0) { + context.storeOffset(); + } + consumeLatch.countDown(); + }) + .build(); + latchAssert(consumeLatch).completes(); + assertThat(messagesReceived).hasSize(partitionCount); + partitions.forEach( + p -> { + assertThat(messagesReceived).containsKey(p); + assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount); + }); + // checking stored offsets are big enough + // offset near the end (the message count per partition minus a few messages) + long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10); + partitions.forEach( + p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset)); + consumer.close(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index d99ae9ab83..66cc516b86 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.util.Map; import java.util.UUID; @@ -74,7 +73,6 @@ void tearDown() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; declareSuperStreamTopology(connection, superStream, partitions); @@ -127,7 +125,6 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000; routingKeys = new String[] {"amer", "emea", "apac"}; @@ -179,7 +176,6 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr } @Test - @BrokerVersionAtLeast("3.9.6") void getLastPublishingIdShouldReturnLowestValue() throws Exception { int messageCount = 10_000; declareSuperStreamTopology(connection, superStream, partitions); diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index cea3129c36..ca3ece9e6c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -14,6 +14,7 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology; +import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static com.rabbitmq.stream.impl.TestUtils.localhost; import static org.assertj.core.api.Assertions.assertThat; @@ -24,7 +25,6 @@ import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.channel.EventLoopGroup; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -61,15 +61,14 @@ void init(TestInfo info) throws Exception { void tearDown() throws Exception { environment.close(); if (routingKeys == null) { - // deleteSuperStreamTopology(connection, superStream, partitions); + deleteSuperStreamTopology(connection, superStream, partitions); } else { - // deleteSuperStreamTopology(connection, superStream, routingKeys); + deleteSuperStreamTopology(connection, superStream, routingKeys); } connection.close(); } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000 * partitions; declareSuperStreamTopology(connection, superStream, partitions); @@ -112,7 +111,6 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception { } @Test - @BrokerVersionAtLeast("3.9.6") void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception { int messageCount = 10_000 * partitions; routingKeys = new String[] {"amer", "emea", "apac"}; From 0e3826bd0a6830e5107effa3ebfb21e4c17e4b06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 21 Sep 2021 10:29:07 +0200 Subject: [PATCH 4/7] Fail message if no route is found --- .../java/com/rabbitmq/stream/Constants.java | 1 + .../stream/impl/SuperStreamProducer.java | 25 ++++++++----- .../stream/impl/SuperStreamProducerTest.java | 36 +++++++++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/Constants.java b/src/main/java/com/rabbitmq/stream/Constants.java index a68a71327e..4655cb5a59 100644 --- a/src/main/java/com/rabbitmq/stream/Constants.java +++ b/src/main/java/com/rabbitmq/stream/Constants.java @@ -39,6 +39,7 @@ public final class Constants { public static final short CODE_PRODUCER_NOT_AVAILABLE = 10_002; public static final short CODE_PRODUCER_CLOSED = 10_003; public static final short CODE_PUBLISH_CONFIRM_TIMEOUT = 10_004; + public static final short CODE_NO_ROUTE_FOUND = 10_005; public static final short COMMAND_DECLARE_PUBLISHER = 1; public static final short COMMAND_PUBLISH = 2; diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java index ea1b24ffaa..8a502ece79 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java @@ -15,6 +15,8 @@ import com.rabbitmq.stream.Codec; import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.ConfirmationStatus; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageBuilder; import com.rabbitmq.stream.Producer; @@ -92,15 +94,20 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { // TODO handle when the stream is not found (no partition found for the message) // and call the confirmation handler with a failure List streams = this.routingStrategy.route(message, superStreamMetadata); - for (String stream : streams) { - Producer producer = - producers.computeIfAbsent( - stream, - stream1 -> { - Producer p = producerBuilder.duplicate().stream(stream1).build(); - return p; - }); - producer.send(message, confirmationHandler); + if (streams.isEmpty()) { + confirmationHandler.handle( + new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND)); + } else { + for (String stream : streams) { + Producer producer = + producers.computeIfAbsent( + stream, + stream1 -> { + Producer p = producerBuilder.duplicate().stream(stream1).build(); + return p; + }); + producer.send(message, confirmationHandler); + } } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index 66cc516b86..b0e711a0f0 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -22,6 +22,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import com.rabbitmq.stream.OffsetSpecification; @@ -31,6 +32,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; @@ -175,6 +178,39 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr .isEqualTo(messageCount); } + @Test + void messageIsNackedIfNoRouteFound() throws Exception { + int messageCount = 10_000; + routingKeys = new String[] {"amer", "emea", "apac"}; + declareSuperStreamTopology(connection, superStream, routingKeys); + Producer producer = + environment.producerBuilder().stream(superStream) + .routing(message -> message.getApplicationProperties().get("region").toString()) + .key() + .producerBuilder() + .build(); + + AtomicBoolean confirmed = new AtomicBoolean(true); + AtomicInteger code = new AtomicInteger(); + CountDownLatch publishLatch = new CountDownLatch(1); + producer.send( + producer + .messageBuilder() + .applicationProperties() + .entry("region", "atlantis") + .messageBuilder() + .build(), + confirmationStatus -> { + confirmed.set(confirmationStatus.isConfirmed()); + code.set(confirmationStatus.getCode()); + publishLatch.countDown(); + }); + + assertThat(latchAssert(publishLatch)).completes(5); + assertThat(confirmed).isFalse(); + assertThat(code).hasValue(Constants.CODE_NO_ROUTE_FOUND); + } + @Test void getLastPublishingIdShouldReturnLowestValue() throws Exception { int messageCount = 10_000; From cf4bddbfb455161275ec8a1b8ea68a4340b788ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 21 Sep 2021 14:02:08 +0200 Subject: [PATCH 5/7] Use locator retry in for partitions functions --- .../java/com/rabbitmq/stream/impl/SuperStreamProducer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java index 8a502ece79..3f88876d07 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java @@ -136,7 +136,8 @@ private static class DefaultSuperStreamMetadata implements Metadata { private DefaultSuperStreamMetadata(String superStream, StreamEnvironment environment) { this.superStream = superStream; this.environment = environment; - this.partitions = new CopyOnWriteArrayList<>(environment.locator().partitions(superStream)); + List ps = environment.locatorOperation(c -> c.partitions(superStream)); + this.partitions = new CopyOnWriteArrayList<>(ps); } @Override @@ -147,7 +148,8 @@ public List partitions() { @Override public List route(String routingKey) { return routes.computeIfAbsent( - routingKey, routingKey1 -> environment.locator().route(routingKey1, superStream)); + routingKey, + routingKey1 -> environment.locatorOperation(c -> c.route(routingKey1, superStream))); } } } From 5c6fe3c97dd9373bae19b5d54d354153cc024cb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 21 Sep 2021 17:39:31 +0200 Subject: [PATCH 6/7] Start super stream documentation --- src/docs/asciidoc/api.adoc | 2 +- src/docs/asciidoc/index.adoc | 2 + src/docs/asciidoc/super-streams.adoc | 114 ++++++++++++++++++ .../stream/docs/SuperStreamUsage.java | 58 +++++++++ .../stream/impl/SuperStreamProducerTest.java | 1 - 5 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 src/docs/asciidoc/super-streams.adoc create mode 100644 src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 6f79e73f99..1cf3e9035a 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -880,4 +880,4 @@ entry, which has its own offset. This means one must be careful when basing some decision on offset values, like a modulo to perform an operation every X messages. As the message offsets have -no guarantee to be contiguous, the operation may not happen exactly every X messages. \ No newline at end of file +no guarantee to be contiguous, the operation may not happen exactly every X messages. diff --git a/src/docs/asciidoc/index.adoc b/src/docs/asciidoc/index.adoc index 5b318d0f9e..cabc0626d3 100644 --- a/src/docs/asciidoc/index.adoc +++ b/src/docs/asciidoc/index.adoc @@ -22,6 +22,8 @@ include::sample-application.adoc[] include::api.adoc[] +include::super-streams.adoc[] + include::building.adoc[] include::performance-tool.adoc[] \ No newline at end of file diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc new file mode 100644 index 0000000000..7a6f970617 --- /dev/null +++ b/src/docs/asciidoc/super-streams.adoc @@ -0,0 +1,114 @@ +:test-examples: ../../test/java/com/rabbitmq/stream/docs + +==== Super Streams (Partitioned Streams) + +[WARNING] +.Experimental +==== +Super streams are an experimental feature, they are subject to change. +==== + +A super stream is a logical stream made of several individual streams. +In essence, a super stream is a partitioned stream that brings scalability compared to a single stream. + +The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use. +Application code should not be impacted whether it uses individual or super streams. + +==== Topology + +A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity. +The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them. +This does not mean AMQP resources are used to transport or store stream messages, it means that they are used to _describe_ the super stream topology, that is the streams it is made of. + +Let's take the example of an `invoices` super stream made of 3 streams (i.e. partitions): + +* an `invoices` exchange represents the super stream +* the `invoices-0`, `invoices-1`, `invoices-2` streams are the partitions of the super stream (streams are also AMQP queues in RabbitMQ) +* 3 bindings between the exchange and the streams link the super stream to its partitions and represent _routing rules_ + +.The topology of a super stream is defined with bindings between an exchange and queues +[ditaa] +.... + 0 +------------+ + +----->+ invoices–0 | + | +------------+ ++----------+ | +| invoices | | 1 +------------+ +| +---+----->+ invoices–1 | +| exchange | | +------------+ ++----------+ | + | 2 +------------+ + +----->+ invoices–2 | + +------------+ +.... + +When a super stream is in use, the stream Java client queries this information to find out about the partitions of a super stream and the routing rules. +From the application code point of view, using a super stream is mostly configuration-based. +Some logic must also be provided to extract routing information from messages. + +==== Publishing to a Super Stream + +When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward: + +.Creating a Producer for a Super Stream +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-simple] +-------- +<1> Use the super stream name +<2> Provide the logic to get the routing key from a message +<3> Create the producer instance +<4> Close the producer when it's no longer necessary + +Note that even though the `invoices` super stream is not an actual stream, its name must be used to declare the producer. +Internally the client will figure out the streams that compose the super stream. +The application code must provide the logic to extract a routing key from a message as a `Function`. +The client will hash the routing key to determine the stream to send the message to (using partition list and a modulo operation). + +The client uses 32-bit https://en.wikipedia.org/wiki/MurmurHash[MurmurHash3] by default to hash the routing key. +This hash function provides good uniformity, performance, and portability, making it a good default choice, but it is possible to specify a custom hash function: + +.Specifying a custom hash function +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function] +-------- +<1> Use `String#hashCode()` to hash the routing key + +Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages. + +==== Resolving Routes with Bindings + +Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. +The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams. + +This routing strategy makes sense when the partitioning has a business meaning, e.g. with a partition for a region in the world, like in the diagram below: + +.A super stream with a partition for a region in a world +[ditaa] +.... + amer +---------------+ + +------>+ invoices–amer | + | +---------------+ ++----------+ | +| invoices | | emea +---------------+ +| +---+------>+ invoices–emea | +| exchange | | +---------------+ ++----------+ | + | apac +---------------+ + +------>+ invoices–apac | + +---------------+ +.... + +In such a case, the routing key will be a property of the message that represents the region: + +.Enabling the "key" routing strategy +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy] +-------- +<1> Extract the routing key +<2> Enable the "key" routing strategy + +Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. +Note the client caches results, it does not query the broker for every message. \ No newline at end of file diff --git a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java new file mode 100644 index 0000000000..3a46d36152 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java @@ -0,0 +1,58 @@ +// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.stream.docs; + +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Producer; + +public class SuperStreamUsage { + + void producerSimple() { + Environment environment = Environment.builder().build(); + // tag::producer-simple[] + Producer producer = environment.producerBuilder() + .stream("invoices") // <1> + .routing(message -> message.getProperties().getMessageIdAsString()) // <2> + .producerBuilder() + .build(); // <3> + // ... + producer.close(); // <4> + // end::producer-simple[] + } + + void producerCustomHashFunction() { + Environment environment = Environment.builder().build(); + // tag::producer-custom-hash-function[] + Producer producer = environment.producerBuilder() + .stream("invoices") + .routing(message -> message.getProperties().getMessageIdAsString()) + .hash(rk -> rk.hashCode()) // <1> + .producerBuilder() + .build(); + // end::producer-custom-hash-function[] + } + + void producerKeyRoutingStrategy() { + Environment environment = Environment.builder().build(); + // tag::producer-key-routing-strategy[] + Producer producer = environment.producerBuilder() + .stream("invoices") + .routing(msg -> msg.getApplicationProperties().get("region").toString()) // <1> + .key() // <2> + .producerBuilder() + .build(); + // end::producer-key-routing-strategy[] + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index b0e711a0f0..c256168449 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -180,7 +180,6 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr @Test void messageIsNackedIfNoRouteFound() throws Exception { - int messageCount = 10_000; routingKeys = new String[] {"amer", "emea", "apac"}; declareSuperStreamTopology(connection, superStream, routingKeys); Producer producer = From 9ff897b796dcdf5c4c274f3d09e5a43ae6a1c565 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 22 Sep 2021 11:33:17 +0200 Subject: [PATCH 7/7] Write super stream consumer documentation --- src/docs/asciidoc/super-streams.adoc | 62 +++++++++++++++++-- .../com/rabbitmq/stream/ProducerBuilder.java | 4 ++ .../stream/docs/SuperStreamUsage.java | 41 ++++++++++++ 3 files changed, 103 insertions(+), 4 deletions(-) diff --git a/src/docs/asciidoc/super-streams.adoc b/src/docs/asciidoc/super-streams.adoc index 7a6f970617..b0b51c3ad1 100644 --- a/src/docs/asciidoc/super-streams.adoc +++ b/src/docs/asciidoc/super-streams.adoc @@ -1,5 +1,6 @@ :test-examples: ../../test/java/com/rabbitmq/stream/docs +[[super-streams]] ==== Super Streams (Partitioned Streams) [WARNING] @@ -14,7 +15,7 @@ In essence, a super stream is a partitioned stream that brings scalability compa The stream Java client uses the same programming model for super streams as with individual streams, that is the `Producer`, `Consumer`, `Message`, etc API are still valid when super streams are in use. Application code should not be impacted whether it uses individual or super streams. -==== Topology +===== Topology A super stream is made of several individual streams, so it can be considered a logical entity rather than an actual physical entity. The topology of a super stream is based on the https://www.rabbitmq.com/tutorials/amqp-concepts.html[AMQP 0.9.1 model], that is exchange, queues, and bindings between them. @@ -46,7 +47,7 @@ When a super stream is in use, the stream Java client queries this information t From the application code point of view, using a super stream is mostly configuration-based. Some logic must also be provided to extract routing information from messages. -==== Publishing to a Super Stream +===== Publishing to a Super Stream When the topology of a super stream like the one described above has been set, creating a producer for it is straightforward: @@ -77,7 +78,7 @@ include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-hash-function Note using Java's `hashCode()` method is a debatable choice as potential producers in other languages are unlikely to implement it, making the routing different between producers in different languages. -==== Resolving Routes with Bindings +====== Resolving Routes with Bindings Hashing the routing key to pick a partition is only one way to route messages to the appropriate streams. The stream Java client provides another way to resolve streams, based on the routing key _and_ the bindings between the super stream exchange and the streams. @@ -111,4 +112,57 @@ include::{test-examples}/SuperStreamUsage.java[tag=producer-key-routing-strategy <2> Enable the "key" routing strategy Internally the client will query the broker to resolve the destination streams for a given routing key, making the routing logic from any exchange type available to streams. -Note the client caches results, it does not query the broker for every message. \ No newline at end of file +Note the client caches results, it does not query the broker for every message. + +====== Using a Custom Routing Strategy + +The solution that provides the most control over routing is using a custom routing strategy. +This should be needed only for specific cases. + +The following code sample shows how to implement a simplistic round-robin `RoutingStrategy` and use it in the producer. +Note this implementation should not be used in production as the modulo operation is not sign-safe for simplicity's sake. + +.Setting a round-robin routing strategy +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=producer-custom-routing-strategy] +-------- +<1> No need to set the routing key extraction logic +<2> Set the custom routing strategy + +====== Deduplication + +Deduplication for a super stream producer works the same way as with a <>. +The publishing ID values are spread across the streams but this does affect the mechanism. + +===== Consuming From a Super Stream + +A super stream consumer is not much different from a single stream consumer. +The `ConsumerBuilder#superStream(String)` must be used to set the super stream to consume from: + +.Declaring a super stream consumer +[source,java,indent=0] +-------- +include::{test-examples}/SuperStreamUsage.java[tag=consumer-simple] +-------- +<1> Set the super stream name +<2> Close the consumer when it is no longer necessary + +A super stream consumer is a composite consumer: it will look up the super stream partitions and create a consumer for each or them. + +====== Offset Tracking + +The semantic of offset tracking for a super stream consumer are roughly the same as for an individual stream consumer. +There are still some subtle differences, so a good understanding of <> in general and of the <> and <> offset tracking strategies is recommended. + +Here are the main differences for the automatic/manual offset tracking strategies between single and super stream consuming: + +* *automatic offset tracking*: internally, _the client divides the `messageCountBeforeStorage` setting by the number of partitions for each individual consumer_. +Imagine a 3-partition super stream, `messageCountBeforeStorage` set to 10,000, and 10,000 messages coming in, perfectly balanced across the partitions (that is about 3,333 messages for each partition). +In this case, the automatic offset tracking strategy will not kick in, because the expected count message has not been reached on any partition. +Making the client divide `messageCountBeforeStorage` by the number of partitions can be considered "more accurate" if the message are well balanced across the partitions. +A good rule of thumb is to then multiply the expected per-stream `messageCountBeforeStorage` by the number of partitions, to avoid storing offsets too often. So the default being 10,000, it can be set to 30,000 for a 3-partition super stream. +* *manual offset tracking*: the `MessageHandler.Context#storeOffset()` method must be used, the `Consumer#store(long)` will fail, because an offset value has a meaning only in one stream, not in other streams. +A call to `MessageHandler.Context#storeOffset()` will store the current message offset in _its_ stream, but also the offset of the last dispatched message for the other streams of the super stream. + + diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 5c3d4adf7d..ddbdb82d5a 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -127,6 +127,10 @@ public interface ProducerBuilder { *

The default routing strategy hashes the routing key to choose the stream (partition) to send * the message to. * + * Note the routing key extraction logic is required only when the built-in routing strategies + * are used. It can set to null when a custom {@link RoutingStrategy} is set + * with {@link #routing(Function)}. + * * @param routingKeyExtractor the logic to extract a routing key from a message * @return the routing configuration instance * @see RoutingConfiguration diff --git a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java index 3a46d36152..e6a34f8e40 100644 --- a/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/SuperStreamUsage.java @@ -14,8 +14,15 @@ package com.rabbitmq.stream.docs; +import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.RoutingStrategy; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class SuperStreamUsage { @@ -55,4 +62,38 @@ void producerKeyRoutingStrategy() { .build(); // end::producer-key-routing-strategy[] } + + void producerCustomRoutingStrategy() { + Environment environment = Environment.builder().build(); + // tag::producer-custom-routing-strategy[] + AtomicLong messageCount = new AtomicLong(0); + RoutingStrategy routingStrategy = (message, metadata) -> { + List partitions = metadata.partitions(); + String stream = partitions.get( + (int) messageCount.getAndIncrement() % partitions.size() + ); + return Collections.singletonList(stream); + }; + Producer producer = environment.producerBuilder() + .stream("invoices") + .routing(null) // <1> + .strategy(routingStrategy) // <2> + .producerBuilder() + .build(); + // end::producer-custom-routing-strategy[] + } + + void consumerSimple() { + Environment environment = Environment.builder().build(); + // tag::consumer-simple[] + Consumer consumer = environment.consumerBuilder() + .superStream("invoices") // <1> + .messageHandler((context, message) -> { + // message processing + }) + .build(); + // ... + consumer.close(); // <2> + // end::consumer-simple[] + } }