Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 12d6600

Browse files
committedAug 10, 2022
Document single active consumer
References #46
1 parent c82cae0 commit 12d6600

File tree

2 files changed

+123
-3
lines changed

2 files changed

+123
-3
lines changed
 

‎src/docs/asciidoc/api.adoc

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -881,8 +881,12 @@ made of 2 chunks:
881881
[[consumer-offset-tracking]]
882882
===== Tracking the Offset for a Consumer
883883

884-
A consumer can track the offset it has reached in a stream. This allows a new incarnation
885-
of the consumer to restart consuming where it left off. Offset tracking works in 2 steps:
884+
RabbitMQ Stream provides server-side offset tracking.
885+
This means a consumer can track the offset it has reached in a stream.
886+
It allows a new incarnation of the consumer to restart consuming where it left off.
887+
All of this without an extra datastore, as the broker stores the offset tracking information.
888+
889+
Offset tracking works in 2 steps:
886890

887891
* the consumer must have a *name*. The name is set with `ConsumerBuilder#name(String)`. The name
888892
can be any value (under 256 characters) and is expected to be unique (from the application
@@ -1056,4 +1060,82 @@ The application knows exactly when a message is processed and updates its in-mem
10561060
Let's take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval.
10571061
When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed.
10581062
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
1059-
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
1063+
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
1064+
1065+
===== Single Active Consumer
1066+
1067+
When the single active consumer feature is enabled for several consumer instances sharing the same stream and name, only one of these instances will be active at a time and so will receive messages.
1068+
The other instances will be idle.
1069+
1070+
The single active consumer feature provides 2 benefits:
1071+
1072+
* Messages are processed in order: there is only one consumer at a time.
1073+
* Consumption continuity is maintained: a consumer from the group will take over if the active one stops or crashes.
1074+
1075+
A typical sequence of events would be the following:
1076+
1077+
* Several instances of the same consuming application start up.
1078+
* Each application instance registers a single active consumer.
1079+
The consumer instances share the same name.
1080+
* The broker makes the first registered consumer the active one.
1081+
* The active consumer receives and processes messages, the other consumer instances remain idle.
1082+
* The active consumer stops or crashes.
1083+
* The broker chooses the consumer next in line to become the new active one.
1084+
* The new active consumer starts receiving messages.
1085+
1086+
Note there can be several groups of single active consumers on the same stream.
1087+
What makes them different from each other is the name used by the consumers.
1088+
The broker deals with them independently.
1089+
Let's use an example.
1090+
Imagine 2 different `app-1` and `app-2` applications consuming from the same stream, with 3 identical instances each.
1091+
Each instance registers 1 single active consumer with the name of the application.
1092+
We end up with 3 `app-1` consumers and 3 `app-2` consumers, 1 active consumer in each group, so overall 6 consumers and 2 active ones, all of this on the same stream.
1093+
1094+
Let's see now the API for single active consumer.
1095+
1096+
====== Enabling Single Active Consumer
1097+
1098+
Use the `ConsumerBuilder#singleActiveConsumer()` method to enable the feature:
1099+
1100+
.Enabling single active consumer
1101+
[source,java,indent=0]
1102+
--------
1103+
include::{test-examples}/ConsumerUsage.java[tag=enabling-single-active-consumer]
1104+
--------
1105+
<1> Set the consumer name (mandatory to enable single active consumer)
1106+
<2> Enable single active consumer
1107+
1108+
With the configuration above, the consumer will take part in the `application-1` group on the `my-stream` stream.
1109+
If the consumer instance is the first in a group, it will get messages as soon as there are some available. If it is not the first in the group, it will remain idle until it is its turn to be active (likely when all the instances registered before it are gone).
1110+
1111+
====== Offset Tracking
1112+
1113+
Single active consumer and offset tracking work together: when the active consumer goes away, another consumer takes over and resumes when the former active left off.
1114+
Well, this is how things should work and luckily this is what happens when using <<consumer-offset-tracking, server-side offset tracking>>.
1115+
So as long as you use <<consumer-automatic-offset-tracking, automatic offset tracking>> or <<consumer-manual-offset-tracking, manual offset tracking>>, the handoff between a former active consumer and the new one will go well.
1116+
1117+
The story is different is you are using an external store for offset tracking.
1118+
In this case you need to tell the client library where to resume from and you can do this by implementing the `ConsumerUpdateListener` API.
1119+
1120+
====== Reacting to Consumer State Change
1121+
1122+
The broker notifies a consumer that becomes active before dispatching messages to it.
1123+
The broker expects a response from the consumer and this response contains the offset the dispatching should start from.
1124+
So this is the consumer's responsibility to compute the appropriate offset, not the broker's.
1125+
The default behavior is to look up the last stored offset for the consumer on the stream.
1126+
This works when server-side offset tracking is in use, but it does not when the application chose to use an external store for offset tracking.
1127+
In this case, it is possible to use the `ConsumerBuilder#consumerUpdateListener(ConsumerUpdateListener)` method like demonstrated in the following snippet:
1128+
1129+
.Fetching the last stored offset from an external store in the consumer update listener callback
1130+
[source,java,indent=0]
1131+
--------
1132+
include::{test-examples}/ConsumerUsage.java[tag=sac-consumer-update-listener]
1133+
--------
1134+
<1> Set the consumer name (mandatory to enable single active consumer)
1135+
<2> Enable single active consumer
1136+
<3> Set the consumer update listener
1137+
<4> Fetch last offset from external store
1138+
<5> Return the offset to resume from to the broker
1139+
<6> Set also the subscription listener when using an external store
1140+
1141+
Note you also have to set up a <<consumer-subscription-listener, subscription listener>> when using an external store for offset tracking.

‎src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.rabbitmq.stream.*;
1818
import java.time.Duration;
19+
import org.assertj.core.data.Offset;
1920

2021
public class ConsumerUsage {
2122

@@ -153,4 +154,41 @@ long getOffsetFromExternalStore() {
153154
return 0L;
154155
}
155156

157+
void enablingSingleActiveConsumer() {
158+
Environment environment = Environment.builder().build();
159+
// tag::enabling-single-active-consumer[]
160+
Consumer consumer = environment.consumerBuilder()
161+
.stream("my-stream")
162+
.name("application-1") // <1>
163+
.singleActiveConsumer() // <2>
164+
.messageHandler((context, message) -> {
165+
// message handling code...
166+
})
167+
.build();
168+
// end::enabling-single-active-consumer[]
169+
}
170+
171+
void sacConsumerUpdateListener() {
172+
Environment environment = Environment.builder().build();
173+
// tag::sac-consumer-update-listener[]
174+
Consumer consumer = environment.consumerBuilder()
175+
.stream("my-stream")
176+
.name("application-1") // <1>
177+
.singleActiveConsumer() // <2>
178+
.consumerUpdateListener(context -> { // <3>
179+
long offset = getOffsetFromExternalStore(); // <4>
180+
return OffsetSpecification.offset(offset + 1); // <5>
181+
})
182+
.subscriptionListener(subscriptionContext -> { // <6>
183+
long offset = getOffsetFromExternalStore();
184+
subscriptionContext.offsetSpecification(OffsetSpecification.offset(offset + 1));
185+
})
186+
.messageHandler((context, message) -> {
187+
// message handling code...
188+
189+
storeOffsetInExternalStore(context.offset());
190+
})
191+
.build();
192+
// end::sac-consumer-update-listener[]
193+
}
156194
}

0 commit comments

Comments
 (0)
Please sign in to comment.