Skip to content

Commit 479cd1c

Browse files
committed
Add method to get producer last publishing ID
1 parent 7014d25 commit 479cd1c

File tree

5 files changed

+178
-19
lines changed

5 files changed

+178
-19
lines changed

src/main/java/com/rabbitmq/stream/Producer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ public interface Producer extends AutoCloseable {
1818

1919
MessageBuilder messageBuilder();
2020

21+
long getLastPublishingId();
22+
2123
void send(Message message, ConfirmationHandler confirmationHandler);
2224
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ long lastCommittedOffset() {
160160
if (canCommit()) {
161161
try {
162162
// the client can be null by now, but we catch the exception and return 0
163-
// callers should know how to deal with a committed of 0
163+
// callers should know how to deal with a committed offset of 0
164164
return this.commitClient.queryOffset(this.name, this.stream);
165165
} catch (Exception e) {
166166
return 0;

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class StreamProducer implements Producer {
4747
// FIXME investigate a more optimized data structure to handle pending messages
4848
private final ConcurrentMap<Long, AccumulatedEntity> unconfirmedMessages;
4949
private final int batchSize;
50+
private final String name;
5051
private final String stream;
5152
private final Client.OutboundEntityWriteCallback writeCallback;
5253
private final Semaphore unconfirmedMessagesSemaphore;
@@ -70,6 +71,7 @@ class StreamProducer implements Producer {
7071
int maxUnconfirmedMessages,
7172
StreamEnvironment environment) {
7273
this.environment = environment;
74+
this.name = name;
7375
this.stream = stream;
7476
this.closingCallback = environment.registerProducer(this, name, this.stream);
7577
final Client.OutboundEntityWriteCallback delegateWriteCallback;
@@ -173,6 +175,28 @@ public MessageBuilder messageBuilder() {
173175
return codec.messageBuilder();
174176
}
175177

178+
@Override
179+
public long getLastPublishingId() {
180+
if (this.name != null && !this.name.isEmpty()) {
181+
if (canSend()) {
182+
try {
183+
return this.client.queryPublisherSequence(this.name, this.stream);
184+
} catch (Exception e) {
185+
throw new IllegalStateException(
186+
"Error while trying to query last publishing ID for "
187+
+ "producer "
188+
+ this.name
189+
+ " on stream "
190+
+ stream);
191+
}
192+
} else {
193+
throw new IllegalStateException("The producer has no connection");
194+
}
195+
} else {
196+
throw new IllegalStateException("The producer has no name");
197+
}
198+
}
199+
176200
@Override
177201
public void send(Message message, ConfirmationHandler confirmationHandler) {
178202
try {

src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,22 @@ public AccumulatedEntity get() {
3434
}
3535
int count = 0;
3636
Batch batch = createBatch();
37+
AccumulatedEntity lastMessageInBatch = null;
3738
while (count != this.subEntrySize) {
3839
AccumulatedEntity message = messages.poll();
3940
if (message == null) {
4041
break;
4142
}
42-
if (count == 0) {
43-
batch.add(
44-
message.publishindId(),
45-
(EncodedMessage) message.encodedEntity(),
46-
message.confirmationCallback());
47-
} else {
48-
batch.add((EncodedMessage) message.encodedEntity(), message.confirmationCallback());
49-
}
43+
lastMessageInBatch = message;
44+
batch.add((EncodedMessage) message.encodedEntity(), message.confirmationCallback());
5045
count++;
5146
}
52-
return batch.isEmpty() ? null : batch;
47+
if (batch.isEmpty()) {
48+
return null;
49+
} else {
50+
batch.publishingId = lastMessageInBatch.publishindId();
51+
return batch;
52+
}
5353
}
5454

5555
private static class Batch implements AccumulatedEntity {
@@ -65,14 +65,6 @@ private Batch(
6565
this.confirmationCallback = confirmationCallback;
6666
}
6767

68-
void add(
69-
long publishingId,
70-
Codec.EncodedMessage encodedMessage,
71-
StreamProducer.ConfirmationCallback confirmationCallback) {
72-
this.publishingId = publishingId;
73-
this.add(encodedMessage, confirmationCallback);
74-
}
75-
7668
void add(
7769
Codec.EncodedMessage encodedMessage,
7870
StreamProducer.ConfirmationCallback confirmationCallback) {

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,23 @@
1717
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
1818
import static org.assertj.core.api.Assertions.assertThat;
1919

20-
import com.rabbitmq.stream.*;
20+
import com.rabbitmq.stream.BackOffDelayPolicy;
21+
import com.rabbitmq.stream.ConfirmationHandler;
22+
import com.rabbitmq.stream.ConfirmationStatus;
23+
import com.rabbitmq.stream.Constants;
24+
import com.rabbitmq.stream.Environment;
25+
import com.rabbitmq.stream.EnvironmentBuilder;
26+
import com.rabbitmq.stream.Host;
27+
import com.rabbitmq.stream.Producer;
28+
import com.rabbitmq.stream.StreamException;
2129
import com.rabbitmq.stream.impl.StreamProducer.Status;
2230
import io.netty.channel.EventLoopGroup;
2331
import java.nio.charset.StandardCharsets;
2432
import java.time.Duration;
2533
import java.util.List;
2634
import java.util.Map;
35+
import java.util.SortedSet;
36+
import java.util.TreeSet;
2737
import java.util.UUID;
2838
import java.util.concurrent.ConcurrentHashMap;
2939
import java.util.concurrent.CountDownLatch;
@@ -34,6 +44,7 @@
3444
import java.util.concurrent.atomic.AtomicInteger;
3545
import java.util.concurrent.atomic.AtomicLong;
3646
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.function.Consumer;
3748
import java.util.stream.Collectors;
3849
import java.util.stream.IntStream;
3950
import org.junit.jupiter.api.AfterEach;
@@ -289,4 +300,134 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
289300
.build();
290301
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
291302
}
303+
304+
@ParameterizedTest
305+
@ValueSource(ints = {1, 7})
306+
void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) throws Exception {
307+
int lineCount = 50_000;
308+
int firstWaveLineCount = lineCount / 5;
309+
int backwardCount = firstWaveLineCount / 10;
310+
SortedSet<Integer> document = new TreeSet<>();
311+
IntStream.range(0, lineCount).forEach(i -> document.add(i));
312+
Producer producer =
313+
environment.producerBuilder().name("producer-1").stream(stream)
314+
.subEntrySize(subEntrySize)
315+
.build();
316+
317+
AtomicReference<CountDownLatch> latch =
318+
new AtomicReference<>(new CountDownLatch(firstWaveLineCount));
319+
ConfirmationHandler confirmationHandler = confirmationStatus -> latch.get().countDown();
320+
Consumer<Integer> publishMessage =
321+
i ->
322+
producer.send(
323+
producer
324+
.messageBuilder()
325+
.publishingId(i)
326+
.addData(String.valueOf(i).getBytes())
327+
.build(),
328+
confirmationHandler);
329+
document.headSet(firstWaveLineCount).forEach(publishMessage);
330+
331+
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
332+
333+
latch.set(new CountDownLatch(lineCount - firstWaveLineCount + backwardCount));
334+
335+
document.tailSet(firstWaveLineCount - backwardCount).forEach(publishMessage);
336+
337+
assertThat(latch.get().await(5, TimeUnit.SECONDS)).isTrue();
338+
339+
CountDownLatch consumeLatch = new CountDownLatch(lineCount);
340+
AtomicInteger consumed = new AtomicInteger();
341+
environment.consumerBuilder().stream(stream)
342+
.messageHandler(
343+
(offset, message) -> {
344+
consumed.incrementAndGet();
345+
consumeLatch.countDown();
346+
})
347+
.build();
348+
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
349+
Thread.sleep(1000);
350+
// if we are using sub-entries, we cannot avoid duplicates.
351+
// here, a sub-entry in the second wave, right at the end of the re-submitted
352+
// values will contain those duplicates, because its publishing ID will be
353+
// the one of its last message, so the server will accept the whole sub-entry,
354+
// including the duplicates.
355+
assertThat(consumed.get()).isEqualTo(lineCount + backwardCount % subEntrySize);
356+
}
357+
358+
@ParameterizedTest
359+
@ValueSource(ints = {1, 7})
360+
void newIncarnationOfProducerCanQueryItsLastPublishingId(int subEntrySize) throws Exception {
361+
Producer p =
362+
environment.producerBuilder().name("producer-1").stream(stream)
363+
.subEntrySize(subEntrySize)
364+
.build();
365+
366+
AtomicReference<Producer> producer = new AtomicReference<>(p);
367+
368+
AtomicLong publishingSequence = new AtomicLong(0);
369+
AtomicLong lastConfirmed = new AtomicLong(-1);
370+
ConfirmationHandler confirmationHandler =
371+
confirmationStatus -> {
372+
if (confirmationStatus.isConfirmed()) {
373+
lastConfirmed.set(confirmationStatus.getMessage().getPublishingId());
374+
}
375+
};
376+
377+
AtomicBoolean canPublish = new AtomicBoolean(true);
378+
Runnable publish =
379+
() -> {
380+
while (canPublish.get()) {
381+
producer
382+
.get()
383+
.send(
384+
producer
385+
.get()
386+
.messageBuilder()
387+
.publishingId(publishingSequence.getAndIncrement())
388+
.addData(String.valueOf(publishingSequence.get()).getBytes())
389+
.build(),
390+
confirmationHandler);
391+
}
392+
};
393+
new Thread(publish).start();
394+
395+
Thread.sleep(1000L);
396+
canPublish.set(false);
397+
waitAtMost(10, () -> publishingSequence.get() == lastConfirmed.get() + 1);
398+
assertThat(lastConfirmed.get()).isPositive();
399+
400+
producer.get().close();
401+
402+
p =
403+
environment.producerBuilder().name("producer-1").stream(stream)
404+
.subEntrySize(subEntrySize)
405+
.build();
406+
producer.set(p);
407+
408+
long lastPublishingId = producer.get().getLastPublishingId();
409+
assertThat(lastPublishingId).isEqualTo(lastConfirmed.get());
410+
411+
canPublish.set(true);
412+
new Thread(publish).start();
413+
414+
Thread.sleep(1000L);
415+
canPublish.set(false);
416+
417+
waitAtMost(10, () -> publishingSequence.get() == lastConfirmed.get() + 1);
418+
assertThat(lastConfirmed.get()).isGreaterThan(lastPublishingId);
419+
420+
CountDownLatch consumeLatch = new CountDownLatch((int) (lastConfirmed.get() + 1));
421+
AtomicInteger consumed = new AtomicInteger();
422+
environment.consumerBuilder().stream(stream)
423+
.messageHandler(
424+
(offset, message) -> {
425+
consumed.incrementAndGet();
426+
consumeLatch.countDown();
427+
})
428+
.build();
429+
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
430+
Thread.sleep(1000);
431+
assertThat(consumed.get()).isEqualTo(lastConfirmed.get() + 1);
432+
}
292433
}

0 commit comments

Comments
 (0)