Skip to content

Commit 7133365

Browse files
committed
Start first bits of super stream consumer
1 parent 72d1671 commit 7133365

File tree

8 files changed

+412
-15
lines changed

8 files changed

+412
-15
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
<buildnumber.plugin.version>1.4</buildnumber.plugin.version>
9090
<jruby.version>9.2.13.0</jruby.version>
9191
<jmh.version>1.33</jmh.version>
92-
<spotless.version>2.12.3</spotless.version>
92+
<spotless.version>2.13.0</spotless.version>
9393
<jacoco.version>0.8.7</jacoco.version>
9494
<!-- to sign artifacts when releasing -->
9595
<gpg.keyname>6026DFCA</gpg.keyname>

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ public interface ConsumerBuilder {
2626
*/
2727
ConsumerBuilder stream(String stream);
2828

29+
/**
30+
* Set the consumer to consume from a super stream (partitioned stream). Experimental!
31+
*
32+
* @param superStream
33+
* @return this builder instance
34+
*/
35+
ConsumerBuilder superStream(String superStream);
36+
2937
/**
3038
* The offset to start consuming from.
3139
*

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ private SubscriptionTracker(
237237
synchronized void cancel() {
238238
this.closing = true;
239239
if (this.manager != null) {
240-
LOGGER.debug("Removing consumer from manager");
240+
LOGGER.debug("Removing consumer from manager " + this.consumer);
241241
this.manager.remove(this);
242242
} else {
243243
LOGGER.debug("No manager to remove consumer from");

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+45-12
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
import com.rabbitmq.stream.ConsumerBuilder;
1818
import com.rabbitmq.stream.MessageHandler;
1919
import com.rabbitmq.stream.OffsetSpecification;
20+
import com.rabbitmq.stream.StreamException;
21+
import java.lang.reflect.Field;
22+
import java.lang.reflect.Modifier;
2023
import java.time.Duration;
2124

2225
class StreamConsumerBuilder implements ConsumerBuilder {
2326

2427
private static final int NAME_MAX_SIZE = 256; // server-side limitation
2528
private final StreamEnvironment environment;
2629

27-
private String stream;
30+
private String stream, superStream;
2831
private OffsetSpecification offsetSpecification = null;
2932
private MessageHandler messageHandler;
3033
private String name;
@@ -41,6 +44,12 @@ public ConsumerBuilder stream(String stream) {
4144
return this;
4245
}
4346

47+
@Override
48+
public ConsumerBuilder superStream(String superStream) {
49+
this.superStream = superStream;
50+
return this;
51+
}
52+
4453
@Override
4554
public ConsumerBuilder offset(OffsetSpecification offsetSpecification) {
4655
this.offsetSpecification = offsetSpecification;
@@ -79,8 +88,11 @@ public AutoTrackingStrategy autoTrackingStrategy() {
7988

8089
@Override
8190
public Consumer build() {
82-
if (this.stream == null) {
83-
throw new IllegalArgumentException("stream cannot be null");
91+
if (this.stream == null && this.superStream == null) {
92+
throw new IllegalArgumentException("A stream must be specified");
93+
}
94+
if (this.stream != null && this.superStream != null) {
95+
throw new IllegalArgumentException("Stream and superStream cannot be set at the same time");
8496
}
8597
if (this.name == null
8698
&& (this.autoTrackingStrategy != null || this.manualTrackingStrategy != null)) {
@@ -110,15 +122,20 @@ public Consumer build() {
110122
new TrackingConfiguration(false, false, -1, Duration.ZERO, Duration.ZERO);
111123
}
112124

113-
StreamConsumer consumer =
114-
new StreamConsumer(
115-
this.stream,
116-
this.offsetSpecification,
117-
this.messageHandler,
118-
this.name,
119-
this.environment,
120-
trackingConfiguration);
121-
environment.addConsumer(consumer);
125+
Consumer consumer;
126+
if (this.stream != null) {
127+
consumer =
128+
new StreamConsumer(
129+
this.stream,
130+
this.offsetSpecification,
131+
this.messageHandler,
132+
this.name,
133+
this.environment,
134+
trackingConfiguration);
135+
environment.addConsumer((StreamConsumer) consumer);
136+
} else {
137+
consumer = new SuperStreamConsumer(this, this.superStream, this.environment);
138+
}
122139
return consumer;
123140
}
124141

@@ -227,4 +244,20 @@ public ConsumerBuilder builder() {
227244
return this.builder;
228245
}
229246
}
247+
248+
StreamConsumerBuilder duplicate() {
249+
StreamConsumerBuilder duplicate = new StreamConsumerBuilder(this.environment);
250+
for (Field field : StreamConsumerBuilder.class.getDeclaredFields()) {
251+
if (Modifier.isStatic(field.getModifiers())) {
252+
continue;
253+
}
254+
field.setAccessible(true);
255+
try {
256+
field.set(duplicate, field.get(this));
257+
} catch (IllegalAccessException e) {
258+
throw new StreamException("Error while duplicating stream producer builder", e);
259+
}
260+
}
261+
return duplicate;
262+
}
230263
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import com.rabbitmq.stream.Consumer;
17+
import java.util.Map;
18+
import java.util.Map.Entry;
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
class SuperStreamConsumer implements Consumer {
24+
25+
private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamConsumer.class);
26+
private final String superStream;
27+
private final Map<String, Consumer> consumers = new ConcurrentHashMap<>();
28+
29+
SuperStreamConsumer(
30+
StreamConsumerBuilder builder, String superStream, StreamEnvironment environment) {
31+
this.superStream = superStream;
32+
for (String partition : environment.locator().partitions(superStream)) {
33+
Consumer consumer = builder.duplicate().superStream(null).stream(partition).build();
34+
consumers.put(partition, consumer);
35+
LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", partition, superStream);
36+
}
37+
}
38+
39+
@Override
40+
public void store(long offset) {
41+
throw new UnsupportedOperationException(
42+
"Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
43+
}
44+
45+
@Override
46+
public void close() {
47+
for (Entry<String, Consumer> entry : consumers.entrySet()) {
48+
LOGGER.debug(
49+
"Closing consumer for partition '{}' of super stream {}",
50+
entry.getKey(),
51+
this.superStream);
52+
try {
53+
entry.getValue().close();
54+
} catch (Exception e) {
55+
LOGGER.info(
56+
"Error while closing consumer for partition {} of super stream {}: {}",
57+
entry.getKey(),
58+
this.superStream,
59+
e.getMessage());
60+
}
61+
}
62+
}
63+
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,11 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
8989
for (String stream : streams) {
9090
Producer producer =
9191
producers.computeIfAbsent(
92-
stream, stream1 -> producerBuilder.duplicate().stream(stream1).build());
92+
stream,
93+
stream1 -> {
94+
Producer p = producerBuilder.duplicate().stream(stream1).build();
95+
return p;
96+
});
9397
producer.send(message, confirmationHandler);
9498
}
9599
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static com.rabbitmq.stream.impl.TestUtils.b;
17+
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
18+
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
19+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
20+
import static com.rabbitmq.stream.impl.TestUtils.localhost;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.rabbitmq.client.Connection;
24+
import com.rabbitmq.client.ConnectionFactory;
25+
import com.rabbitmq.stream.Consumer;
26+
import com.rabbitmq.stream.Environment;
27+
import com.rabbitmq.stream.EnvironmentBuilder;
28+
import com.rabbitmq.stream.OffsetSpecification;
29+
import com.rabbitmq.stream.impl.Client.ClientParameters;
30+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
31+
import io.netty.channel.EventLoopGroup;
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.Collections;
34+
import java.util.List;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ConcurrentMap;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.atomic.AtomicInteger;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.junit.jupiter.api.TestInfo;
43+
import org.junit.jupiter.api.extension.ExtendWith;
44+
45+
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
46+
public class SuperStreamConsumerTest {
47+
48+
EventLoopGroup eventLoopGroup;
49+
50+
Environment environment;
51+
52+
Connection connection;
53+
int partitionCount = 3;
54+
String superStream;
55+
String[] routingKeys = null;
56+
TestUtils.ClientFactory cf;
57+
58+
private static void publishToPartitions(
59+
TestUtils.ClientFactory cf, List<String> partitions, int messageCount) {
60+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
61+
Client client =
62+
cf.get(
63+
new ClientParameters()
64+
.publishConfirmListener((publisherId, publishingId) -> publishLatch.countDown()));
65+
for (int i = 0; i < partitions.size(); i++) {
66+
assertThat(client.declarePublisher(b(i), null, partitions.get(i)).isOk()).isTrue();
67+
}
68+
for (int i = 0; i < messageCount; i++) {
69+
int partitionIndex = i % partitions.size();
70+
String partition = partitions.get(partitionIndex);
71+
client.publish(
72+
b(partitionIndex),
73+
Collections.singletonList(
74+
client.messageBuilder().addData(partition.getBytes(StandardCharsets.UTF_8)).build()));
75+
}
76+
latchAssert(publishLatch).completes();
77+
}
78+
79+
@BeforeEach
80+
void init(TestInfo info) throws Exception {
81+
EnvironmentBuilder environmentBuilder = Environment.builder().eventLoopGroup(eventLoopGroup);
82+
environmentBuilder.addressResolver(add -> localhost());
83+
environment = environmentBuilder.build();
84+
superStream = TestUtils.streamName(info);
85+
connection = new ConnectionFactory().newConnection();
86+
}
87+
88+
@AfterEach
89+
void tearDown() throws Exception {
90+
environment.close();
91+
if (routingKeys == null) {
92+
deleteSuperStreamTopology(connection, superStream, partitionCount);
93+
} else {
94+
deleteSuperStreamTopology(connection, superStream, routingKeys);
95+
}
96+
connection.close();
97+
}
98+
99+
@Test
100+
@BrokerVersionAtLeast("3.9.6")
101+
void consumeAllMessagesFromAllPartitions() throws Exception {
102+
declareSuperStreamTopology(connection, superStream, partitionCount);
103+
Client client = cf.get();
104+
List<String> partitions = client.partitions(superStream);
105+
int messageCount = 10000 * partitionCount;
106+
publishToPartitions(cf, partitions, messageCount);
107+
ConcurrentMap<String, AtomicInteger> messagesReceived = new ConcurrentHashMap<>(partitionCount);
108+
partitions.forEach(p -> messagesReceived.put(p, new AtomicInteger(0)));
109+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
110+
Consumer consumer =
111+
environment
112+
.consumerBuilder()
113+
.superStream(superStream)
114+
.offset(OffsetSpecification.first())
115+
.messageHandler(
116+
(context, message) -> {
117+
String partition = new String(message.getBodyAsBinary());
118+
messagesReceived.get(partition).incrementAndGet();
119+
consumeLatch.countDown();
120+
})
121+
.build();
122+
latchAssert(consumeLatch).completes();
123+
assertThat(messagesReceived).hasSize(partitionCount);
124+
partitions.forEach(
125+
p -> {
126+
assertThat(messagesReceived).containsKey(p);
127+
assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount);
128+
});
129+
consumer.close();
130+
}
131+
}

0 commit comments

Comments
 (0)