Skip to content

Commit 03825a9

Browse files
committed
Add SubscriptionListener
References #38
1 parent ac7d418 commit 03825a9

9 files changed

+362
-20
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public interface ConsumerBuilder {
6464
*/
6565
ConsumerBuilder name(String name);
6666

67+
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
68+
6769
/**
6870
* Enable {@link ManualTrackingStrategy}.
6971
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public interface SubscriptionListener {
17+
18+
void preSubscribe(SubscriptionContext subscriptionContext);
19+
20+
interface SubscriptionContext {
21+
22+
OffsetSpecification offsetSpecification();
23+
24+
void offsetSpecification(OffsetSpecification offsetSpecification);
25+
}
26+
}

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+46-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.rabbitmq.stream.OffsetSpecification;
2424
import com.rabbitmq.stream.StreamDoesNotExistException;
2525
import com.rabbitmq.stream.StreamException;
26+
import com.rabbitmq.stream.SubscriptionListener;
27+
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
2628
import com.rabbitmq.stream.impl.Client.ChunkListener;
2729
import com.rabbitmq.stream.impl.Client.CreditNotification;
2830
import com.rabbitmq.stream.impl.Client.MessageListener;
@@ -83,6 +85,7 @@ Runnable subscribe(
8385
String stream,
8486
OffsetSpecification offsetSpecification,
8587
String trackingReference,
88+
SubscriptionListener subscriptionListener,
8689
MessageHandler messageHandler) {
8790
// FIXME fail immediately if there's no locator (can provide a supplier that does not retry)
8891
List<Client.Broker> candidates = findBrokersForStream(stream);
@@ -95,7 +98,12 @@ Runnable subscribe(
9598
// we keep this instance when we move the subscription from a client to another one
9699
SubscriptionTracker subscriptionTracker =
97100
new SubscriptionTracker(
98-
consumer, stream, offsetSpecification, trackingReference, messageHandler);
101+
consumer,
102+
stream,
103+
offsetSpecification,
104+
trackingReference,
105+
subscriptionListener,
106+
messageHandler);
99107

100108
String key = keyForClientSubscription(newNode);
101109

@@ -212,6 +220,7 @@ private static class SubscriptionTracker {
212220
private final String offsetTrackingReference;
213221
private final MessageHandler messageHandler;
214222
private final StreamConsumer consumer;
223+
private final SubscriptionListener subscriptionListener;
215224
private volatile long offset;
216225
private volatile boolean hasReceivedSomething = false;
217226
private volatile byte subscriptionIdInClient;
@@ -223,11 +232,13 @@ private SubscriptionTracker(
223232
String stream,
224233
OffsetSpecification initialOffsetSpecification,
225234
String offsetTrackingReference,
235+
SubscriptionListener subscriptionListener,
226236
MessageHandler messageHandler) {
227237
this.consumer = consumer;
228238
this.stream = stream;
229239
this.initialOffsetSpecification = initialOffsetSpecification;
230240
this.offsetTrackingReference = offsetTrackingReference;
241+
this.subscriptionListener = subscriptionListener;
231242
this.messageHandler = messageHandler;
232243
}
233244

@@ -635,7 +646,7 @@ synchronized void add(
635646
update(previousSubscriptions, subscriptionId, subscriptionTracker);
636647

637648
String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
638-
if (subscriptionTracker.offsetTrackingReference != null) {
649+
if (offsetTrackingReference != null) {
639650
long trackedOffset =
640651
client.queryOffset(offsetTrackingReference, subscriptionTracker.stream);
641652
if (trackedOffset != 0) {
@@ -666,12 +677,20 @@ synchronized void add(
666677
subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference);
667678
}
668679

680+
SubscriptionContext subscriptionContext =
681+
new DefaultSubscriptionContext(offsetSpecification);
682+
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
683+
LOGGER.info(
684+
"Computed offset specification {}, offset specification used after subscription listener {}",
685+
offsetSpecification,
686+
subscriptionContext.offsetSpecification());
687+
669688
// FIXME consider using fewer initial credits
670689
Client.Response subscribeResponse =
671690
client.subscribe(
672691
subscriptionId,
673692
subscriptionTracker.stream,
674-
offsetSpecification,
693+
subscriptionContext.offsetSpecification(),
675694
10,
676695
subscriptionProperties);
677696
if (!subscribeResponse.isOk()) {
@@ -767,4 +786,28 @@ synchronized void close() {
767786
}
768787
}
769788
}
789+
790+
private static final class DefaultSubscriptionContext implements SubscriptionContext {
791+
792+
private volatile OffsetSpecification offsetSpecification;
793+
794+
private DefaultSubscriptionContext(OffsetSpecification computedOffsetSpecification) {
795+
this.offsetSpecification = computedOffsetSpecification;
796+
}
797+
798+
@Override
799+
public OffsetSpecification offsetSpecification() {
800+
return this.offsetSpecification;
801+
}
802+
803+
@Override
804+
public void offsetSpecification(OffsetSpecification offsetSpecification) {
805+
this.offsetSpecification = offsetSpecification;
806+
}
807+
808+
@Override
809+
public String toString() {
810+
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';
811+
}
812+
}
770813
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.rabbitmq.stream.MessageHandler;
1818
import com.rabbitmq.stream.MessageHandler.Context;
1919
import com.rabbitmq.stream.OffsetSpecification;
20+
import com.rabbitmq.stream.SubscriptionListener;
2021
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
2122
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
2223
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,7 +58,8 @@ class StreamConsumer implements Consumer {
5758
String name,
5859
StreamEnvironment environment,
5960
TrackingConfiguration trackingConfiguration,
60-
boolean lazyInit) {
61+
boolean lazyInit,
62+
SubscriptionListener subscriptionListener) {
6163

6264
try {
6365
this.name = name;
@@ -100,6 +102,7 @@ class StreamConsumer implements Consumer {
100102
stream,
101103
offsetSpecification,
102104
this.name,
105+
subscriptionListener,
103106
messageHandlerWithOrWithoutTracking);
104107

105108
this.status = Status.RUNNING;

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.stream.MessageHandler;
1919
import com.rabbitmq.stream.OffsetSpecification;
2020
import com.rabbitmq.stream.StreamException;
21+
import com.rabbitmq.stream.SubscriptionListener;
2122
import java.lang.reflect.Field;
2223
import java.lang.reflect.Modifier;
2324
import java.time.Duration;
@@ -34,6 +35,7 @@ class StreamConsumerBuilder implements ConsumerBuilder {
3435
private DefaultAutoTrackingStrategy autoTrackingStrategy;
3536
private DefaultManualTrackingStrategy manualTrackingStrategy;
3637
private boolean lazyInit = false;
38+
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
3739

3840
public StreamConsumerBuilder(StreamEnvironment environment) {
3941
this.environment = environment;
@@ -77,6 +79,15 @@ public ConsumerBuilder name(String name) {
7779
return this;
7880
}
7981

82+
@Override
83+
public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) {
84+
if (subscriptionListener == null) {
85+
throw new IllegalArgumentException("The subscription listener cannot be null");
86+
}
87+
this.subscriptionListener = subscriptionListener;
88+
return this;
89+
}
90+
8091
@Override
8192
public ManualTrackingStrategy manualTrackingStrategy() {
8293
this.manualTrackingStrategy = new DefaultManualTrackingStrategy(this);
@@ -142,7 +153,8 @@ public Consumer build() {
142153
this.name,
143154
this.environment,
144155
trackingConfiguration,
145-
this.lazyInit);
156+
this.lazyInit,
157+
this.subscriptionListener);
146158
environment.addConsumer((StreamConsumer) consumer);
147159
} else {
148160
consumer =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.rabbitmq.stream.ProducerBuilder;
2929
import com.rabbitmq.stream.StreamCreator;
3030
import com.rabbitmq.stream.StreamException;
31+
import com.rabbitmq.stream.SubscriptionListener;
3132
import com.rabbitmq.stream.compression.CompressionCodecFactory;
3233
import com.rabbitmq.stream.impl.Client.ClientParameters;
3334
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
@@ -467,10 +468,16 @@ Runnable registerConsumer(
467468
String stream,
468469
OffsetSpecification offsetSpecification,
469470
String trackingReference,
471+
SubscriptionListener subscriptionListener,
470472
MessageHandler messageHandler) {
471473
Runnable closingCallback =
472474
this.consumersCoordinator.subscribe(
473-
consumer, stream, offsetSpecification, trackingReference, messageHandler);
475+
consumer,
476+
stream,
477+
offsetSpecification,
478+
trackingReference,
479+
subscriptionListener,
480+
messageHandler);
474481
return closingCallback;
475482
}
476483

0 commit comments

Comments
 (0)