Skip to content

Commit 1a6bf16

Browse files
committed
Implement getLastPublishingId in super stream producer
1 parent e70ad6b commit 1a6bf16

File tree

3 files changed

+92
-4
lines changed

3 files changed

+92
-4
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ public Producer build() {
187187
this.stream, this.routingKeyExtractor, this.environment, hashFunction)
188188
: new RoutingKeyRoutingStrategy(
189189
this.stream, this.routingKeyExtractor, this.environment);
190-
producer = new SuperStreamProducer(this, stream, routingStrategy, environment);
190+
producer =
191+
new SuperStreamProducer(this, this.name, this.stream, routingStrategy, this.environment);
191192
}
192193
return producer;
193194
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.stream.Message;
1919
import com.rabbitmq.stream.MessageBuilder;
2020
import com.rabbitmq.stream.Producer;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Map.Entry;
2324
import java.util.concurrent.ConcurrentHashMap;
@@ -33,15 +34,20 @@ class SuperStreamProducer implements Producer {
3334
private final String superStream;
3435
private final Map<String, Producer> producers = new ConcurrentHashMap<>();
3536
private final StreamProducerBuilder producerBuilder;
37+
private final StreamEnvironment environment;
38+
private final String name;
3639

3740
SuperStreamProducer(
3841
StreamProducerBuilder producerBuilder,
42+
String name,
3943
String superStream,
4044
RoutingStrategy routingStrategy,
4145
StreamEnvironment streamEnvironment) {
4246
this.routingStrategy = routingStrategy;
4347
this.codec = streamEnvironment.codec();
48+
this.name = name;
4449
this.superStream = superStream;
50+
this.environment = streamEnvironment;
4551
this.producerBuilder = producerBuilder.duplicate();
4652
this.producerBuilder.stream(null);
4753
this.producerBuilder.resetRouting();
@@ -54,9 +60,25 @@ public MessageBuilder messageBuilder() {
5460

5561
@Override
5662
public long getLastPublishingId() {
57-
// TODO get all partitions for this super stream, query the last publishing ID for each of team,
58-
// return the highest (or the lowest, because duplicates will be filtered out anyway?)
59-
return 0;
63+
if (this.name != null && !this.name.isEmpty()) {
64+
List<String> streams = this.environment.locator().partitions(superStream);
65+
long publishingId = 0;
66+
boolean first = true;
67+
for (String partition : streams) {
68+
long pubId = this.environment.locator().queryPublisherSequence(this.name, partition);
69+
if (first) {
70+
publishingId = pubId;
71+
first = false;
72+
} else {
73+
if (Long.compareUnsigned(publishingId, pubId) > 0) {
74+
publishingId = pubId;
75+
}
76+
}
77+
}
78+
return publishingId;
79+
} else {
80+
throw new IllegalStateException("The producer has no name");
81+
}
6082
}
6183

6284
@Override

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

+65
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class SuperStreamProducerTest {
5151
int partitions = 3;
5252
String superStream;
5353
String[] routingKeys = null;
54+
TestUtils.ClientFactory cf;
5455

5556
@BeforeEach
5657
void init(TestInfo info) throws Exception {
@@ -173,4 +174,68 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr
173174
assertThat(counts.values().stream().map(AtomicLong::get).reduce(0L, Long::sum))
174175
.isEqualTo(messageCount);
175176
}
177+
178+
@Test
179+
void getLastPublishingIdShouldReturnLowestValue() throws Exception {
180+
int messageCount = 10_000;
181+
declareSuperStreamTopology(connection, superStream, partitions);
182+
String producerName = "super-stream-application";
183+
Producer producer =
184+
environment.producerBuilder().name(producerName).stream(superStream)
185+
.routing(message -> message.getProperties().getMessageIdAsString(), RoutingType.HASH)
186+
.build();
187+
188+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
189+
IntStream.range(0, messageCount)
190+
.forEach(
191+
i ->
192+
producer.send(
193+
producer
194+
.messageBuilder()
195+
.publishingId(i)
196+
.properties()
197+
.messageId(UUID.randomUUID().toString())
198+
.messageBuilder()
199+
.build(),
200+
confirmationStatus -> publishLatch.countDown()));
201+
202+
assertThat(latchAssert(publishLatch)).completes(5);
203+
204+
long lastPublishingId = producer.getLastPublishingId();
205+
assertThat(lastPublishingId).isNotZero();
206+
Client client = cf.get();
207+
IntStream.range(0, partitions)
208+
.mapToObj(i -> superStream + "-" + i)
209+
.forEach(
210+
stream -> {
211+
long publishingId = client.queryPublisherSequence(producerName, stream);
212+
assertThat(publishingId).isGreaterThanOrEqualTo(lastPublishingId);
213+
});
214+
215+
Map<String, AtomicLong> counts = new ConcurrentHashMap<>();
216+
AtomicLong totalCount = new AtomicLong(0);
217+
IntStream.range(0, partitions)
218+
.mapToObj(i -> superStream + "-" + i)
219+
.forEach(
220+
stream -> {
221+
AtomicLong streamCount = new AtomicLong(0);
222+
counts.put(stream, streamCount);
223+
environment.consumerBuilder().stream(stream)
224+
.offset(OffsetSpecification.first())
225+
.messageHandler(
226+
(context, message) -> {
227+
streamCount.incrementAndGet();
228+
totalCount.incrementAndGet();
229+
})
230+
.build();
231+
});
232+
233+
waitAtMost(10, () -> totalCount.get() == messageCount);
234+
235+
assertThat(counts.values().stream().map(AtomicLong::get))
236+
.hasSize(partitions)
237+
.doesNotContain(0L);
238+
assertThat(counts.values().stream().map(AtomicLong::get).reduce(0L, Long::sum))
239+
.isEqualTo(messageCount);
240+
}
176241
}

0 commit comments

Comments
 (0)