Skip to content

Commit ee5465a

Browse files
committed
Document subscription listener
Fixes #38
1 parent 91d61cb commit ee5465a

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

src/docs/asciidoc/api.adoc

+40
Original file line numberDiff line numberDiff line change
@@ -881,3 +881,43 @@ entry, which has its own offset.
881881
This means one must be careful when basing some decision on offset values, like
882882
a modulo to perform an operation every X messages. As the message offsets have
883883
no guarantee to be contiguous, the operation may not happen exactly every X messages.
884+
885+
====== Subscription Listener
886+
887+
The library provides a `SubscriptionListener` interface callback to add behavior before a subscription is created.
888+
This callback can be used to customize the offset the library computed for the subscription.
889+
The callback is called when the consumer is first created and when the library has to re-subscribe (e.g. after a disconnection or a topology change).
890+
891+
It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides.
892+
The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
893+
894+
.Using an external store for offset tracking with a subscription listener
895+
[source,java,indent=0]
896+
--------
897+
include::{test-examples}/ConsumerUsage.java[tag=subscription-listener]
898+
--------
899+
<1> Set subscription listener
900+
<2> Get offset from external store
901+
<3> Set offset to use for the subscription
902+
<4> Store the offset in the external store after processing
903+
904+
When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use.
905+
906+
Using a subscription listener can be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex.
907+
This requires a good understanding on how and when subscription occurs in the library, and so when the subscription listener is called:
908+
909+
* for a consumer with no name (server-side offset tracking _disabled_)
910+
** on the first subscription (when the consumer is created): the offset specification is the one specified with `ConsumerBuilder#offset(OffsetSpecification)`, the default being `OffsetSpecification#next()`
911+
** on re-subscription (after a disconnection or topology change): the offset specification is the offset of the last dispatched message
912+
* for a consumer with a name (server-side offset tracking _enabled_)
913+
** on the first subscription (when the consumer is created): the server-side stored offset (if any) overrides the value specified with `ConsumerBuilder#offset(OffsetSpecification)`
914+
** on re-subscription (after a disconnection or topology change): the server-side stored offset is used
915+
916+
The subscription listener comes in handy on re-subscription.
917+
The application can track the last processed offset in-memory, with an `AtomicLong` for example.
918+
The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the library may not be perfectly appropriate on re-subscription.
919+
920+
If we take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval.
921+
When the glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed.
922+
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
923+
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.

src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java

+26
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,30 @@ void manualTrackingWithSettings() {
127127
// end::manual-tracking-with-settings[]
128128
}
129129

130+
void subscriptionListener() {
131+
Environment environment = Environment.builder().build();
132+
// tag::subscription-listener[]
133+
Consumer consumer = environment.consumerBuilder()
134+
.stream("my-stream")
135+
.subscriptionListener(subscriptionContext -> { // <1>
136+
long offset = getOffsetFromExternalStore(); // <2>
137+
subscriptionContext.offsetSpecification(OffsetSpecification.offset(offset)); // <3>
138+
})
139+
.messageHandler((context, message) -> {
140+
// message handling code...
141+
142+
storeOffsetInExternalStore(context.offset()); // <4>
143+
})
144+
.build();
145+
// end::subscription-listener[]
146+
}
147+
148+
void storeOffsetInExternalStore(long offset) {
149+
150+
}
151+
152+
long getOffsetFromExternalStore() {
153+
return 0L;
154+
}
155+
130156
}

0 commit comments

Comments
 (0)