Skip to content

Commit 2be8bc7

Browse files
committed
Handle manual offset tracking in super stream consumer
Offset storage on a given message trigger the storage for other partitions in the super stream.
1 parent 05477a1 commit 2be8bc7

File tree

4 files changed

+200
-16
lines changed

4 files changed

+200
-16
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+41-11
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class StreamConsumer implements Consumer {
2828

2929
private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
3030

31-
private final Runnable closingCallback;
31+
private volatile Runnable closingCallback;
3232

3333
private final Runnable closingTrackingCallback;
3434

@@ -44,15 +44,20 @@ class StreamConsumer implements Consumer {
4444

4545
private volatile Status status;
4646

47+
private volatile long lastRequestedStoredOffset = 0;
48+
4749
private final LongConsumer trackingCallback;
4850

51+
private final Runnable initCallback;
52+
4953
StreamConsumer(
5054
String stream,
5155
OffsetSpecification offsetSpecification,
5256
MessageHandler messageHandler,
5357
String name,
5458
StreamEnvironment environment,
55-
TrackingConfiguration trackingConfiguration) {
59+
TrackingConfiguration trackingConfiguration,
60+
boolean lazyInit) {
5661

5762
try {
5863
this.name = name;
@@ -87,11 +92,33 @@ class StreamConsumer implements Consumer {
8792
messageHandlerWithOrWithoutTracking = messageHandler;
8893
}
8994

90-
this.closingCallback =
91-
environment.registerConsumer(
92-
this, stream, offsetSpecification, this.name, messageHandlerWithOrWithoutTracking);
95+
Runnable init =
96+
() -> {
97+
this.closingCallback =
98+
environment.registerConsumer(
99+
this,
100+
stream,
101+
offsetSpecification,
102+
this.name,
103+
messageHandlerWithOrWithoutTracking);
104+
105+
this.status = Status.RUNNING;
106+
};
107+
if (lazyInit) {
108+
this.initCallback = init;
109+
} else {
110+
this.initCallback = () -> {};
111+
init.run();
112+
}
113+
} catch (RuntimeException e) {
114+
this.closed.set(true);
115+
throw e;
116+
}
117+
}
93118

94-
this.status = Status.RUNNING;
119+
void start() {
120+
try {
121+
this.initCallback.run();
95122
} catch (RuntimeException e) {
96123
this.closed.set(true);
97124
throw e;
@@ -102,10 +129,13 @@ class StreamConsumer implements Consumer {
102129
public void store(long offset) {
103130
trackingCallback.accept(offset);
104131
if (canTrack()) {
105-
try {
106-
this.trackingClient.storeOffset(this.name, this.stream, offset);
107-
} catch (Exception e) {
108-
LOGGER.debug("Error while trying to store offset: {}", e.getMessage());
132+
if (Long.compareUnsigned(this.lastRequestedStoredOffset, offset) < 0) {
133+
try {
134+
this.trackingClient.storeOffset(this.name, this.stream, offset);
135+
this.lastRequestedStoredOffset = offset;
136+
} catch (Exception e) {
137+
LOGGER.debug("Error while trying to store offset: {}", e.getMessage());
138+
}
109139
}
110140
}
111141
// nothing special to do if tracking is not possible or errors, e.g. because of a network
@@ -114,7 +144,7 @@ public void store(long offset) {
114144
}
115145

116146
private boolean canTrack() {
117-
return this.status == Status.RUNNING;
147+
return this.status == Status.RUNNING && this.name != null;
118148
}
119149

120150
@Override

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class StreamConsumerBuilder implements ConsumerBuilder {
3333
private String name;
3434
private DefaultAutoTrackingStrategy autoTrackingStrategy;
3535
private DefaultManualTrackingStrategy manualTrackingStrategy;
36+
private boolean lazyInit = false;
3637

3738
public StreamConsumerBuilder(StreamEnvironment environment) {
3839
this.environment = environment;
@@ -62,6 +63,10 @@ public ConsumerBuilder messageHandler(MessageHandler messageHandler) {
6263
return this;
6364
}
6465

66+
MessageHandler messageHandler() {
67+
return this.messageHandler;
68+
}
69+
6570
@Override
6671
public ConsumerBuilder name(String name) {
6772
if (name == null || name.length() > NAME_MAX_SIZE) {
@@ -86,6 +91,11 @@ public AutoTrackingStrategy autoTrackingStrategy() {
8691
return this.autoTrackingStrategy;
8792
}
8893

94+
StreamConsumerBuilder lazyInit(boolean lazyInit) {
95+
this.lazyInit = lazyInit;
96+
return this;
97+
}
98+
8999
@Override
90100
public Consumer build() {
91101
if (this.stream == null && this.superStream == null) {
@@ -131,10 +141,12 @@ public Consumer build() {
131141
this.messageHandler,
132142
this.name,
133143
this.environment,
134-
trackingConfiguration);
144+
trackingConfiguration,
145+
this.lazyInit);
135146
environment.addConsumer((StreamConsumer) consumer);
136147
} else {
137-
consumer = new SuperStreamConsumer(this, this.superStream, this.environment);
148+
consumer =
149+
new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration);
138150
}
139151
return consumer;
140152
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+93-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Consumer;
17+
import com.rabbitmq.stream.Message;
18+
import com.rabbitmq.stream.MessageHandler;
19+
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
20+
import java.util.HashMap;
21+
import java.util.List;
1722
import java.util.Map;
1823
import java.util.Map.Entry;
1924
import java.util.concurrent.ConcurrentHashMap;
@@ -27,13 +32,98 @@ class SuperStreamConsumer implements Consumer {
2732
private final Map<String, Consumer> consumers = new ConcurrentHashMap<>();
2833

2934
SuperStreamConsumer(
30-
StreamConsumerBuilder builder, String superStream, StreamEnvironment environment) {
35+
StreamConsumerBuilder builder,
36+
String superStream,
37+
StreamEnvironment environment,
38+
TrackingConfiguration trackingConfiguration) {
3139
this.superStream = superStream;
32-
for (String partition : environment.locatorOperation(c -> c.partitions(superStream))) {
33-
Consumer consumer = builder.duplicate().superStream(null).stream(partition).build();
40+
List<String> partitions = environment.locatorOperation(c -> c.partitions(superStream));
41+
42+
// for manual tracking offset strategy only
43+
ConsumerState[] states = new ConsumerState[partitions.size()];
44+
Map<String, ConsumerState> partitionToStates = new HashMap<>(partitions.size());
45+
for (int i = 0; i < partitions.size(); i++) {
46+
ConsumerState state = new ConsumerState();
47+
states[i] = state;
48+
partitionToStates.put(partitions.get(i), state);
49+
}
50+
for (String partition : partitions) {
51+
ConsumerState state = partitionToStates.get(partition);
52+
MessageHandler messageHandler;
53+
if (trackingConfiguration.enabled() && trackingConfiguration.manual()) {
54+
messageHandler =
55+
new ManualOffsetTrackingMessageHandler(builder.messageHandler(), states, state);
56+
} else {
57+
messageHandler = builder.messageHandler();
58+
}
59+
Consumer consumer =
60+
builder
61+
.duplicate()
62+
.lazyInit(true)
63+
.superStream(null)
64+
.messageHandler(messageHandler)
65+
.stream(partition)
66+
.build();
3467
consumers.put(partition, consumer);
68+
state.consumer = consumer;
3569
LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", partition, superStream);
3670
}
71+
72+
consumers.values().forEach(c -> ((StreamConsumer) c).start());
73+
}
74+
75+
private static final class ConsumerState {
76+
77+
private volatile long offset = 0;
78+
private volatile Consumer consumer;
79+
}
80+
81+
private static final class ManualOffsetTrackingMessageHandler implements MessageHandler {
82+
83+
private final MessageHandler delegate;
84+
private final ConsumerState[] consumerStates;
85+
private final ConsumerState consumerState;
86+
87+
private ManualOffsetTrackingMessageHandler(
88+
MessageHandler delegate, ConsumerState[] consumerStates, ConsumerState consumerState) {
89+
this.delegate = delegate;
90+
this.consumerStates = consumerStates;
91+
this.consumerState = consumerState;
92+
}
93+
94+
@Override
95+
public void handle(Context context, Message message) {
96+
Context ctx =
97+
new Context() {
98+
@Override
99+
public long offset() {
100+
return context.offset();
101+
}
102+
103+
@Override
104+
public long timestamp() {
105+
return context.timestamp();
106+
}
107+
108+
@Override
109+
public void storeOffset() {
110+
for (ConsumerState state : consumerStates) {
111+
if (ManualOffsetTrackingMessageHandler.this.consumerState == state) {
112+
context.storeOffset();
113+
} else if (state.offset != 0) {
114+
state.consumer.store(state.offset);
115+
}
116+
}
117+
}
118+
119+
@Override
120+
public Consumer consumer() {
121+
return context.consumer();
122+
}
123+
};
124+
this.delegate.handle(ctx, message);
125+
consumerState.offset = context.offset();
126+
}
37127
}
38128

39129
@Override

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

+52
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,56 @@ void consumeAllMessagesFromAllPartitions() throws Exception {
128128
});
129129
consumer.close();
130130
}
131+
132+
@Test
133+
@BrokerVersionAtLeast("3.9.6")
134+
void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
135+
declareSuperStreamTopology(connection, superStream, partitionCount);
136+
Client client = cf.get();
137+
List<String> partitions = client.partitions(superStream);
138+
int messageCount = 10000 * partitionCount;
139+
publishToPartitions(cf, partitions, messageCount);
140+
ConcurrentMap<String, AtomicInteger> messagesReceived = new ConcurrentHashMap<>(partitionCount);
141+
ConcurrentMap<String, Long> lastOffsets = new ConcurrentHashMap<>(partitionCount);
142+
partitions.forEach(
143+
p -> {
144+
messagesReceived.put(p, new AtomicInteger(0));
145+
});
146+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
147+
String consumerName = "my-app";
148+
AtomicInteger totalCount = new AtomicInteger();
149+
Consumer consumer =
150+
environment
151+
.consumerBuilder()
152+
.superStream(superStream)
153+
.offset(OffsetSpecification.first())
154+
.name(consumerName)
155+
.manualTrackingStrategy()
156+
.builder()
157+
.messageHandler(
158+
(context, message) -> {
159+
String partition = new String(message.getBodyAsBinary());
160+
messagesReceived.get(partition).incrementAndGet();
161+
lastOffsets.put(partition, context.offset());
162+
totalCount.incrementAndGet();
163+
if (totalCount.get() % 50 == 0) {
164+
context.storeOffset();
165+
}
166+
consumeLatch.countDown();
167+
})
168+
.build();
169+
latchAssert(consumeLatch).completes();
170+
assertThat(messagesReceived).hasSize(partitionCount);
171+
partitions.forEach(
172+
p -> {
173+
assertThat(messagesReceived).containsKey(p);
174+
assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount);
175+
});
176+
// checking stored offsets are big enough
177+
// offset near the end (the message count per partition minus a few messages)
178+
long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10);
179+
partitions.forEach(
180+
p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
181+
consumer.close();
182+
}
131183
}

0 commit comments

Comments
 (0)