Skip to content

Add SubscriptionListener #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -881,3 +881,43 @@ entry, which has its own offset.
This means one must be careful when basing some decision on offset values, like
a modulo to perform an operation every X messages. As the message offsets have
no guarantee to be contiguous, the operation may not happen exactly every X messages.

====== Subscription Listener

The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
This callback can be used to customize the offset the client computed for the subscription.
The callback is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection or a topology change).

It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides.
The following code snippet shows how this can be done (note the interaction with the external store is not detailed):

.Using an external store for offset tracking with a subscription listener
[source,java,indent=0]
--------
include::{test-examples}/ConsumerUsage.java[tag=subscription-listener]
--------
<1> Set subscription listener
<2> Get offset from external store
<3> Set offset to use for the subscription
<4> Store the offset in the external store after processing

When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use.

Using a subscription listener can also be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex.
This requires a good understanding on how and when subscription occurs in the client, and so when the subscription listener is called:

* for a consumer with no name (server-side offset tracking _disabled_)
** on the first subscription (when the consumer is created): the offset specification is the one specified with `ConsumerBuilder#offset(OffsetSpecification)`, the default being `OffsetSpecification#next()`
** on re-subscription (after a disconnection or topology change): the offset specification is the offset of the last dispatched message
* for a consumer with a name (server-side offset tracking _enabled_)
** on the first subscription (when the consumer is created): the server-side stored offset (if any) overrides the value specified with `ConsumerBuilder#offset(OffsetSpecification)`
** on re-subscription (after a disconnection or topology change): the server-side stored offset is used

The subscription listener comes in handy on re-subscription.
The application can track the last processed offset in-memory, with an `AtomicLong` for example.
The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the client may not be perfectly appropriate on re-subscription.

Let's take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval.
When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed.
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
11 changes: 11 additions & 0 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder name(String name);

/**
* Callback on subscription.
*
* <p>Can be used to set the offset specification before subscribing to the stream.
*
* @see SubscriptionListener
* @param subscriptionListener the listener
* @return this builder instance
*/
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);

/**
* Enable {@link ManualTrackingStrategy}.
*
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/com/rabbitmq/stream/SubscriptionListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream;

/**
* Callback interface to customize a subscription.
*
* <p>It is possible to change the computed {@link OffsetSpecification} in {@link
* #preSubscribe(SubscriptionContext)} by using a custom offset tracking strategy.
*/
public interface SubscriptionListener {

/**
* Callback called before the subscription is created.
*
* <p>The method is called when a {@link Consumer} is created and it registers to broker, and also
* when the subscription must be re-created (after a disconnection or when the subscription must
* moved because the stream member it was connection becomes unavailable).
*
* <p>Application code can set the {@link OffsetSpecification} that will be used with the {@link
* SubscriptionContext#offsetSpecification(OffsetSpecification)} method.
*
* @param subscriptionContext
*/
void preSubscribe(SubscriptionContext subscriptionContext);

/** Context object for the subscription. */
interface SubscriptionContext {

/**
* The offset specification computed by the library.
*
* <p>If the consumer has no name, the value is the value set with {@link
* ConsumerBuilder#offset(OffsetSpecification)} on the first subscription and the offset of the
* last dispatched message on subsequent calls (e.g. when the client re-subscribes after a
* disconnection).
*
* <p>If the consumer has a name, the value is the last stored if any.
*
* @see ConsumerBuilder#name(String)
* @return the computed offset specification
*/
OffsetSpecification offsetSpecification();

/**
* Set the offset specification to use for the subscription.
*
* <p>It overrides the value computed by the client.
*
* @param offsetSpecification the offset specification to use
*/
void offsetSpecification(OffsetSpecification offsetSpecification);
}
}
49 changes: 46 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
import com.rabbitmq.stream.impl.Client.ChunkListener;
import com.rabbitmq.stream.impl.Client.CreditNotification;
import com.rabbitmq.stream.impl.Client.MessageListener;
Expand Down Expand Up @@ -83,6 +85,7 @@ Runnable subscribe(
String stream,
OffsetSpecification offsetSpecification,
String trackingReference,
SubscriptionListener subscriptionListener,
MessageHandler messageHandler) {
// FIXME fail immediately if there's no locator (can provide a supplier that does not retry)
List<Client.Broker> candidates = findBrokersForStream(stream);
Expand All @@ -95,7 +98,12 @@ Runnable subscribe(
// we keep this instance when we move the subscription from a client to another one
SubscriptionTracker subscriptionTracker =
new SubscriptionTracker(
consumer, stream, offsetSpecification, trackingReference, messageHandler);
consumer,
stream,
offsetSpecification,
trackingReference,
subscriptionListener,
messageHandler);

String key = keyForClientSubscription(newNode);

Expand Down Expand Up @@ -212,6 +220,7 @@ private static class SubscriptionTracker {
private final String offsetTrackingReference;
private final MessageHandler messageHandler;
private final StreamConsumer consumer;
private final SubscriptionListener subscriptionListener;
private volatile long offset;
private volatile boolean hasReceivedSomething = false;
private volatile byte subscriptionIdInClient;
Expand All @@ -223,11 +232,13 @@ private SubscriptionTracker(
String stream,
OffsetSpecification initialOffsetSpecification,
String offsetTrackingReference,
SubscriptionListener subscriptionListener,
MessageHandler messageHandler) {
this.consumer = consumer;
this.stream = stream;
this.initialOffsetSpecification = initialOffsetSpecification;
this.offsetTrackingReference = offsetTrackingReference;
this.subscriptionListener = subscriptionListener;
this.messageHandler = messageHandler;
}

Expand Down Expand Up @@ -635,7 +646,7 @@ synchronized void add(
update(previousSubscriptions, subscriptionId, subscriptionTracker);

String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
if (subscriptionTracker.offsetTrackingReference != null) {
if (offsetTrackingReference != null) {
long trackedOffset =
client.queryOffset(offsetTrackingReference, subscriptionTracker.stream);
if (trackedOffset != 0) {
Expand Down Expand Up @@ -666,12 +677,20 @@ synchronized void add(
subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference);
}

SubscriptionContext subscriptionContext =
new DefaultSubscriptionContext(offsetSpecification);
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
LOGGER.info(
"Computed offset specification {}, offset specification used after subscription listener {}",
offsetSpecification,
subscriptionContext.offsetSpecification());

// FIXME consider using fewer initial credits
Client.Response subscribeResponse =
client.subscribe(
subscriptionId,
subscriptionTracker.stream,
offsetSpecification,
subscriptionContext.offsetSpecification(),
10,
subscriptionProperties);
if (!subscribeResponse.isOk()) {
Expand Down Expand Up @@ -767,4 +786,28 @@ synchronized void close() {
}
}
}

private static final class DefaultSubscriptionContext implements SubscriptionContext {

private volatile OffsetSpecification offsetSpecification;

private DefaultSubscriptionContext(OffsetSpecification computedOffsetSpecification) {
this.offsetSpecification = computedOffsetSpecification;
}

@Override
public OffsetSpecification offsetSpecification() {
return this.offsetSpecification;
}

@Override
public void offsetSpecification(OffsetSpecification offsetSpecification) {
this.offsetSpecification = offsetSpecification;
}

@Override
public String toString() {
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';
}
}
}
5 changes: 4 additions & 1 deletion src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.MessageHandler.Context;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -57,7 +58,8 @@ class StreamConsumer implements Consumer {
String name,
StreamEnvironment environment,
TrackingConfiguration trackingConfiguration,
boolean lazyInit) {
boolean lazyInit,
SubscriptionListener subscriptionListener) {

try {
this.name = name;
Expand Down Expand Up @@ -100,6 +102,7 @@ class StreamConsumer implements Consumer {
stream,
offsetSpecification,
this.name,
subscriptionListener,
messageHandlerWithOrWithoutTracking);

this.status = Status.RUNNING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
Expand All @@ -34,6 +35,7 @@ class StreamConsumerBuilder implements ConsumerBuilder {
private DefaultAutoTrackingStrategy autoTrackingStrategy;
private DefaultManualTrackingStrategy manualTrackingStrategy;
private boolean lazyInit = false;
private SubscriptionListener subscriptionListener = subscriptionContext -> {};

public StreamConsumerBuilder(StreamEnvironment environment) {
this.environment = environment;
Expand Down Expand Up @@ -77,6 +79,15 @@ public ConsumerBuilder name(String name) {
return this;
}

@Override
public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) {
if (subscriptionListener == null) {
throw new IllegalArgumentException("The subscription listener cannot be null");
}
this.subscriptionListener = subscriptionListener;
return this;
}

@Override
public ManualTrackingStrategy manualTrackingStrategy() {
this.manualTrackingStrategy = new DefaultManualTrackingStrategy(this);
Expand Down Expand Up @@ -142,7 +153,8 @@ public Consumer build() {
this.name,
this.environment,
trackingConfiguration,
this.lazyInit);
this.lazyInit,
this.subscriptionListener);
environment.addConsumer((StreamConsumer) consumer);
} else {
consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.rabbitmq.stream.ProducerBuilder;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
Expand Down Expand Up @@ -467,10 +468,16 @@ Runnable registerConsumer(
String stream,
OffsetSpecification offsetSpecification,
String trackingReference,
SubscriptionListener subscriptionListener,
MessageHandler messageHandler) {
Runnable closingCallback =
this.consumersCoordinator.subscribe(
consumer, stream, offsetSpecification, trackingReference, messageHandler);
consumer,
stream,
offsetSpecification,
trackingReference,
subscriptionListener,
messageHandler);
return closingCallback;
}

Expand Down
26 changes: 26 additions & 0 deletions src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,30 @@ void manualTrackingWithSettings() {
// end::manual-tracking-with-settings[]
}

void subscriptionListener() {
Environment environment = Environment.builder().build();
// tag::subscription-listener[]
Consumer consumer = environment.consumerBuilder()
.stream("my-stream")
.subscriptionListener(subscriptionContext -> { // <1>
long offset = getOffsetFromExternalStore(); // <2>
subscriptionContext.offsetSpecification(OffsetSpecification.offset(offset)); // <3>
})
.messageHandler((context, message) -> {
// message handling code...

storeOffsetInExternalStore(context.offset()); // <4>
})
.build();
// end::subscription-listener[]
}

void storeOffsetInExternalStore(long offset) {

}

long getOffsetFromExternalStore() {
return 0L;
}

}
Loading