Skip to content

Commit 8d483c0

Browse files
authored
Merge pull request #94 from rabbitmq/single-active-consumer
Add support for single active consumer
2 parents 75f6433 + b624ec9 commit 8d483c0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3974
-367
lines changed

.github/workflows/test-sac.yml renamed to .github/workflows/test-3.11.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
name: Test SAC branch against RabbitMQ 3.11 alpha
1+
name: Test against RabbitMQ 3.11 alpha
22

33
on:
44
push:
55
branches:
6-
- single-active-consumer
7-
6+
- main
7+
pull_request:
8+
branches:
9+
- main
810
jobs:
911
build:
1012
runs-on: ubuntu-22.04

README.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,4 @@ Please launch the `./mvnw spotless:apply` command to format your changes before
111111

112112
(c) 2020-2022, VMware Inc or its affiliates.
113113

114-
Double licensed under the MPL2.0 and ASL2. See link:LICENSE[LICENSE] for details.
114+
Double licensed under the MPL2.0 and ASL2. See link:LICENSE[LICENSE] for details.

performance-tool.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
PERF_TOOL_BASE_NAME="stream-perf-test"
1+
PERF_TOOL_BASE_NAME="stream-perf-test-sac"

pom.xml

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,12 @@
186186
<optional>true</optional>
187187
</dependency>
188188

189+
<dependency>
190+
<groupId>com.rabbitmq</groupId>
191+
<artifactId>amqp-client</artifactId>
192+
<version>${amqp-client.version}</version>
193+
<optional>true</optional>
194+
</dependency>
189195
<!-- end of dependencies for performance tool -->
190196

191197
<dependency>
@@ -225,6 +231,12 @@
225231
<scope>test</scope>
226232
</dependency>
227233

234+
<dependency>
235+
<groupId>org.junit.platform</groupId>
236+
<artifactId>junit-platform-suite-engine</artifactId>
237+
<scope>test</scope>
238+
</dependency>
239+
228240
<dependency>
229241
<groupId>org.assertj</groupId>
230242
<artifactId>assertj-core</artifactId>
@@ -239,13 +251,6 @@
239251
<scope>test</scope>
240252
</dependency>
241253

242-
<dependency>
243-
<groupId>com.rabbitmq</groupId>
244-
<artifactId>amqp-client</artifactId>
245-
<version>${amqp-client.version}</version>
246-
<scope>test</scope>
247-
</dependency>
248-
249254
<dependency>
250255
<groupId>org.eclipse.paho</groupId>
251256
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
@@ -336,6 +341,11 @@
336341
<plugin>
337342
<artifactId>maven-surefire-plugin</artifactId>
338343
<version>${maven-surefire-plugin.version}</version>
344+
<configuration>
345+
<excludes>
346+
<exclude>**/*TestSuite.java</exclude>
347+
</excludes>
348+
</configuration>
339349
</plugin>
340350

341351
<plugin>
@@ -614,6 +624,12 @@
614624
<version>${guava.version}</version>
615625
</dependency>
616626

627+
<dependency>
628+
<groupId>com.rabbitmq</groupId>
629+
<artifactId>amqp-client</artifactId>
630+
<version>${amqp-client.version}</version>
631+
</dependency>
632+
617633
</dependencies>
618634
<build>
619635
<finalName>${finalName}</finalName>

publish-documentation-to-github-pages.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,4 @@ fi
4848

4949
git commit -m "$MESSAGE"
5050
git push origin gh-pages
51-
git checkout main
51+
git checkout main

src/docs/asciidoc/api.adoc

Lines changed: 132 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,11 @@ The following table sums up the main settings to create a `Consumer`:
812812
|Interval to check if the last requested stored offset has been actually stored.
813813
|`Duration.ofSeconds(5)`
814814

815+
|noTrackingStrategy
816+
|Disable server-side offset tracking even if a name is provided.
817+
Useful when <<single-active-consumer, single active consumer>> is enabled and an external store is used for offset tracking.
818+
|`false`
819+
815820
|subscriptionListener
816821
|A <<consumer-subscription-listener, callback>> before the subscription is created.
817822
Useful when using an external store for offset tracking.
@@ -881,8 +886,12 @@ made of 2 chunks:
881886
[[consumer-offset-tracking]]
882887
===== Tracking the Offset for a Consumer
883888

884-
A consumer can track the offset it has reached in a stream. This allows a new incarnation
885-
of the consumer to restart consuming where it left off. Offset tracking works in 2 steps:
889+
RabbitMQ Stream provides server-side offset tracking.
890+
This means a consumer can track the offset it has reached in a stream.
891+
It allows a new incarnation of the consumer to restart consuming where it left off.
892+
All of this without an extra datastore, as the broker stores the offset tracking information.
893+
894+
Offset tracking works in 2 steps:
886895

887896
* the consumer must have a *name*. The name is set with `ConsumerBuilder#name(String)`. The name
888897
can be any value (under 256 characters) and is expected to be unique (from the application
@@ -960,7 +969,7 @@ include::{test-examples}/ConsumerUsage.java[tag=manual-tracking-defaults]
960969
--------
961970
<1> Set the consumer name (mandatory for offset tracking)
962971
<2> Use manual tracking with defaults
963-
<3> Store at the current offset on some condition
972+
<3> Store the current offset on some condition
964973

965974
Manual tracking has only one setting: the *check interval*. The client checks
966975
that the last requested stored offset has been actually stored at the
@@ -1018,11 +1027,7 @@ The client provides a `SubscriptionListener` interface callback to add behavior
10181027
This callback can be used to customize the offset the client library computed for the subscription.
10191028
The callback is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection or a topology change).
10201029

1021-
[WARNING]
1022-
.Experimental
1023-
====
1024-
This API is experimental, it is subject to change.
1025-
====
1030+
WARNING: This API is *experimental*, it is subject to change.
10261031

10271032
It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides.
10281033
The following code snippet shows how this can be done (note the interaction with the external store is not detailed):
@@ -1056,4 +1061,122 @@ The application knows exactly when a message is processed and updates its in-mem
10561061
Let's take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval.
10571062
When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed.
10581063
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
1059-
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
1064+
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
1065+
1066+
[[single-active-consumer]]
1067+
===== Single Active Consumer
1068+
1069+
WARNING: Single Active Consumer requires *RabbitMQ 3.11* or more.
1070+
1071+
When the single active consumer feature is enabled for several consumer instances sharing the same stream and name, only one of these instances will be active at a time and so will receive messages.
1072+
The other instances will be idle.
1073+
1074+
The single active consumer feature provides 2 benefits:
1075+
1076+
* Messages are processed in order: there is only one consumer at a time.
1077+
* Consumption continuity is maintained: a consumer from the group will take over if the active one stops or crashes.
1078+
1079+
A typical sequence of events would be the following:
1080+
1081+
* Several instances of the same consuming application start up.
1082+
* Each application instance registers a single active consumer.
1083+
The consumer instances share the same name.
1084+
* The broker makes the first registered consumer the active one.
1085+
* The active consumer receives and processes messages, the other consumer instances remain idle.
1086+
* The active consumer stops or crashes.
1087+
* The broker chooses the consumer next in line to become the new active one.
1088+
* The new active consumer starts receiving messages.
1089+
1090+
The next figures illustrates this mechanism.
1091+
There can be only one active consumer:
1092+
1093+
.The first registered consumer is active, the next ones are inactive
1094+
[ditaa]
1095+
....
1096+
+----------+
1097+
+------+ consumer + Active
1098+
| +----------+
1099+
|
1100+
+--------+ | +=---------+
1101+
+ stream +---+------+ consumer + Inactive
1102+
+--------+ | +----------+
1103+
|
1104+
| +=---------+
1105+
+------+ consumer + Inactive
1106+
+----------+
1107+
....
1108+
1109+
The broker rolls over to another consumer when the active one stops or crashes:
1110+
1111+
.When the active consumer stops, the next in line becomes active
1112+
[ditaa]
1113+
....
1114+
+=---------+
1115+
| consumer + Closed
1116+
+----------+
1117+
1118+
+--------+ +----------+
1119+
+ stream +---+------+ consumer + Active
1120+
+--------+ | +----------+
1121+
|
1122+
| +=---------+
1123+
+------+ consumer + Inactive
1124+
+----------+
1125+
....
1126+
1127+
1128+
Note there can be several groups of single active consumers on the same stream.
1129+
What makes them different from each other is the name used by the consumers.
1130+
The broker deals with them independently.
1131+
Let's use an example.
1132+
Imagine 2 different `app-1` and `app-2` applications consuming from the same stream, with 3 identical instances each.
1133+
Each instance registers 1 single active consumer with the name of the application.
1134+
We end up with 3 `app-1` consumers and 3 `app-2` consumers, 1 active consumer in each group, so overall 6 consumers and 2 active ones, all of this on the same stream.
1135+
1136+
Let's see now the API for single active consumer.
1137+
1138+
====== Enabling Single Active Consumer
1139+
1140+
Use the `ConsumerBuilder#singleActiveConsumer()` method to enable the feature:
1141+
1142+
.Enabling single active consumer
1143+
[source,java,indent=0]
1144+
--------
1145+
include::{test-examples}/ConsumerUsage.java[tag=enabling-single-active-consumer]
1146+
--------
1147+
<1> Set the consumer name (mandatory to enable single active consumer)
1148+
<2> Enable single active consumer
1149+
1150+
With the configuration above, the consumer will take part in the `application-1` group on the `my-stream` stream.
1151+
If the consumer instance is the first in a group, it will get messages as soon as there are some available. If it is not the first in the group, it will remain idle until it is its turn to be active (likely when all the instances registered before it are gone).
1152+
1153+
====== Offset Tracking
1154+
1155+
Single active consumer and offset tracking work together: when the active consumer goes away, another consumer takes over and resumes when the former active left off.
1156+
Well, this is how things should work and luckily this is what happens when using <<consumer-offset-tracking, server-side offset tracking>>.
1157+
So as long as you use <<consumer-automatic-offset-tracking, automatic offset tracking>> or <<consumer-manual-offset-tracking, manual offset tracking>>, the handoff between a former active consumer and the new one will go well.
1158+
1159+
The story is different is you are using an external store for offset tracking.
1160+
In this case you need to tell the client library where to resume from and you can do this by implementing the `ConsumerUpdateListener` API.
1161+
1162+
[[consumer-update-listener]]
1163+
====== Reacting to Consumer State Change
1164+
1165+
The broker notifies a consumer that becomes active before dispatching messages to it.
1166+
The broker expects a response from the consumer and this response contains the offset the dispatching should start from.
1167+
So this is the consumer's responsibility to compute the appropriate offset, not the broker's.
1168+
The default behavior is to look up the last stored offset for the consumer on the stream.
1169+
This works when server-side offset tracking is in use, but it does not when the application chose to use an external store for offset tracking.
1170+
In this case, it is possible to use the `ConsumerBuilder#consumerUpdateListener(ConsumerUpdateListener)` method like demonstrated in the following snippet:
1171+
1172+
.Fetching the last stored offset from an external store in the consumer update listener callback
1173+
[source,java,indent=0]
1174+
--------
1175+
include::{test-examples}/ConsumerUsage.java[tag=sac-consumer-update-listener]
1176+
--------
1177+
<1> Set the consumer name (mandatory to enable single active consumer)
1178+
<2> Enable single active consumer
1179+
<3> Disable server-side offset tracking
1180+
<4> Set the consumer update listener
1181+
<5> Fetch last offset from external store
1182+
<6> Return the offset to resume consuming from to the broker

src/docs/asciidoc/performance-tool.adoc

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ Note the message body size cannot be smaller that 8 bytes, as the performance to
332332
a long in each message to calculate the latency. Note also the actual size of a message will be
333333
slightly higher, as the body is wrapped in an <<api.adoc#working-with-complex-messages,AMQP 1.0 message>>.
334334

335+
[[performance-tool-connection-pooling]]
335336
===== Connection Pooling
336337

337338
The performance tool does not use connection pooling by default: each producer and consumer has its own connection.
@@ -403,6 +404,7 @@ The accepted values for `--offset` are `first`, `last`, `next` (the default),
403404
an unsigned long for a given offset, and an ISO 8601 formatted timestamp
404405
(eg. `2020-06-03T07:45:54Z`).
405406

407+
[[performance-tool-offset-tracking]]
406408
===== Offset Tracking (Consumer)
407409

408410
A consumer can <<api.adoc#consumer-offset-tracking,track the point>> it has reached
@@ -491,6 +493,76 @@ java -jar stream-perf-test.jar --uris rabbitmq-stream://my-load-balancer:5552 \
491493

492494
The same blog post covers why a https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer[load balancer can make things more complicated] for client applications like the performance tool and how https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#client-workaround-with-a-load-balancer[they can mitigate these issues].
493495

496+
[[performance-tool-sac]]
497+
===== Single Active Consumer
498+
499+
If the `--single-active-consumer` flag is set, the performance tool will create <<api.adoc#single-active-consumer, single active consumer>> instances.
500+
This means that if there are more consumers than streams, there will be only one active consumer at a time on a stream, _if they share the same name_.
501+
Note <<performance-tool-offset-tracking, offset tracking>> gets enabled automatically if it's not with `--single-active-consumer` (using 10,000 for `--store-every`).
502+
Let's see a couple of examples.
503+
504+
In the following command we have 1 producer publishing to 1 stream and 3 consumers on this stream.
505+
As `--single-active-consumer` is used, only one of these consumers will be active at a time.
506+
507+
----
508+
java -jar stream-perf-test.jar --producers 1 --consumers 3 --single-active-consumer \
509+
--consumer-names my-app
510+
----
511+
512+
Note we use a fixed value for the consumer names: if they don't have the same name, the broker will not consider them as a group of consumers, so they will all get messages, like regular consumers.
513+
514+
In the following example we have 2 producers for 2 streams and 6 consumers overall (3 for each stream).
515+
Note the consumers have the same name on their streams with the use of `--consumer-names my-app-%s`, as `%s` is a <<consumer-names, placeholder for the stream name>>.
516+
517+
----
518+
java -jar stream-perf-test.jar --producers 2 --consumers 6 --stream-count 2 \
519+
--single-active-consumer --consumer-names my-app-%s
520+
----
521+
522+
523+
===== Super Streams
524+
525+
The performance tool has a `--super-streams` flag to enable <<super-streams.adoc#super-streams, super streams>> on the publisher and consumer sides.
526+
This support is meant to be used with the <<performance-tool-sac, `--single-active-consumer` flag>>, to <<super-streams.adoc#super-stream-sac, benefit from both features>>.
527+
We recommend reading the appropriate sections of the documentation to understand the semantics of the flags before using them.
528+
Let's see some examples.
529+
530+
The example below creates 1 producer and 3 consumers on the default `stream`, which is now a _super stream_ because of the `--super-streams` flag:
531+
532+
----
533+
java -jar stream-perf-test.jar --producers 1 --consumers 3 --single-active-consumer \
534+
--super-streams --consumer-names my-app
535+
----
536+
537+
The performance tool creates 3 individual streams by default, they are the partitions of the super stream.
538+
They are named `stream-0`, `stream-1`, and `stream-2`, after the name of the super stream, `stream`.
539+
The producer will publish to each of them, using a <<super-streams.adoc#super-stream-producer, hash-based routing strategy>>.
540+
541+
A consumer is _composite_ with `--super-streams`: it creates a consumer instance for each partition.
542+
This is 9 consumer instances overall – 3 composite consumers and 3 partitions – spread evenly across the partitions, but with only one active at a time on a given stream.
543+
544+
Note we use a fixed consumer name so that the broker considers the consumers belong to the same group and enforce the single active consumer behavior.
545+
546+
The next example is more convoluted.
547+
We are going to work with 2 super streams (`--stream-count 2` and `--super-streams`).
548+
Each super stream will have 5 partitions (`--super-stream-partitions 5`), so this is 10 streams overall (`stream-1-0` to `stream-1-4` and `stream-2-0` to `stream-2-4`).
549+
Here is the command line:
550+
551+
----
552+
java -jar stream-perf-test.jar --producers 2 --consumers 6 --stream-count 2 \
553+
--super-streams --super-stream-partitions 5 \
554+
--single-active-consumer \
555+
--consumer-names my-app-%s
556+
----
557+
558+
We see also that each super stream has 1 producer (`--producers 2`) and 3 consumers (`--consumers 6`).
559+
The composite consumers will spread their consumer instances across the partitions.
560+
Each partition will have 3 consumers but only 1 active at a time with `--single-active-consumer` and `--consumer-names my-app-%s` (the consumers on a given stream have the same name, so the broker make sure only one consumes at a time).
561+
562+
Note the performance tool does not use <<performance-tool-connection-pooling, connection pooling>> by default.
563+
The command above opens a significant number of connections – 30 just for consumers – and may not reflect exactly how applications are deployed in the real world.
564+
Don't hesitate to use the `--producers-by-connection` and `--consumers-by-connection` options to make the runs as close to your workloads as possible.
565+
494566
===== Monitoring
495567

496568
The tool can expose some runtime information on HTTP.

0 commit comments

Comments
 (0)