18
18
import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
19
19
import static org .assertj .core .api .Assertions .assertThat ;
20
20
21
+ import com .rabbitmq .client .Connection ;
22
+ import com .rabbitmq .client .ConnectionFactory ;
21
23
import com .rabbitmq .stream .OffsetSpecification ;
22
24
import com .rabbitmq .stream .impl .Client .ClientParameters ;
23
25
import com .rabbitmq .stream .impl .Client .Response ;
27
29
import java .util .concurrent .CountDownLatch ;
28
30
import java .util .concurrent .atomic .AtomicInteger ;
29
31
import java .util .concurrent .atomic .AtomicLong ;
32
+ import java .util .stream .IntStream ;
30
33
import org .junit .jupiter .api .Test ;
34
+ import org .junit .jupiter .api .TestInfo ;
31
35
import org .junit .jupiter .api .extension .ExtendWith ;
32
36
33
37
@ ExtendWith (TestUtils .StreamTestInfrastructureExtension .class )
@@ -36,15 +40,26 @@ public class SingleActiveConsumerTest {
36
40
String stream ;
37
41
TestUtils .ClientFactory cf ;
38
42
43
+ private static Map <Byte , Boolean > consumerStates (int number ) {
44
+ Map <Byte , Boolean > consumerStates = new ConcurrentHashMap <>(number );
45
+ IntStream .range (0 , number ).forEach (i -> consumerStates .put (b (i ), false ));
46
+ return consumerStates ;
47
+ }
48
+
49
+ private static Map <Byte , AtomicInteger > receivedMessages (int subscriptionCount ) {
50
+ Map <Byte , AtomicInteger > receivedMessages = new ConcurrentHashMap <>(subscriptionCount );
51
+ IntStream .range (0 , subscriptionCount )
52
+ .forEach (i -> receivedMessages .put (b (i ), new AtomicInteger (0 )));
53
+ return receivedMessages ;
54
+ }
55
+
39
56
@ Test
40
57
void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes () throws Exception {
41
58
Client writerClient = cf .get ();
42
- int messageCount = 10000 ;
59
+ int messageCount = 5_000 ;
43
60
AtomicLong lastReceivedOffset = new AtomicLong (0 );
44
- Map <Byte , Boolean > consumerStates = new ConcurrentHashMap <>();
45
- Map <Byte , AtomicInteger > receivedMessages = new ConcurrentHashMap <>();
46
- receivedMessages .put (b (0 ), new AtomicInteger (0 ));
47
- receivedMessages .put (b (1 ), new AtomicInteger (0 ));
61
+ Map <Byte , Boolean > consumerStates = consumerStates (2 );
62
+ Map <Byte , AtomicInteger > receivedMessages = receivedMessages (2 );
48
63
CountDownLatch consumerUpdateLatch = new CountDownLatch (2 );
49
64
String consumerName = "foo" ;
50
65
ClientParameters clientParameters =
@@ -85,8 +100,7 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
85
100
.containsEntry (b (0 ), Boolean .TRUE )
86
101
.containsEntry (b (1 ), Boolean .FALSE );
87
102
88
- waitAtMost (
89
- () -> receivedMessages .getOrDefault (b (0 ), new AtomicInteger (0 )).get () == messageCount );
103
+ waitAtMost (() -> receivedMessages .get (b (0 )).get () == messageCount );
90
104
91
105
assertThat (lastReceivedOffset ).hasPositiveValue ();
92
106
writerClient .storeOffset (consumerName , stream , lastReceivedOffset .get ());
@@ -177,4 +191,112 @@ void noConsumerUpdateOnConnectionClosingIfSubscriptionNotUnsubscribed() throws E
177
191
client .close ();
178
192
assertThat (consumerUpdateCount ).hasValue (2 );
179
193
}
194
+
195
+ @ Test
196
+ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition (TestInfo info ) throws Exception {
197
+ Client writerClient = cf .get ();
198
+ int messageCount = 5_000 ;
199
+ Map <Byte , Boolean > consumerStates = consumerStates (2 );
200
+ AtomicLong lastReceivedOffset = new AtomicLong (0 );
201
+ Map <Byte , AtomicInteger > receivedMessages = receivedMessages (2 );
202
+ String superStream = TestUtils .streamName (info );
203
+ String consumerName = "foo" ;
204
+ Connection c = new ConnectionFactory ().newConnection ();
205
+ try {
206
+ TestUtils .declareSuperStreamTopology (c , superStream , 3 );
207
+ // working with the second partition
208
+ String partition = superStream + "-1" ;
209
+
210
+ Client client =
211
+ cf .get (
212
+ new ClientParameters ()
213
+ .consumerUpdateListener (
214
+ (client1 , subscriptionId , active ) -> {
215
+ boolean previousState = consumerStates .get (subscriptionId );
216
+
217
+ OffsetSpecification result ;
218
+
219
+ if (previousState == false && active == true ) {
220
+ long storedOffset = writerClient .queryOffset (consumerName , partition );
221
+ result =
222
+ storedOffset == 0
223
+ ? OffsetSpecification .first ()
224
+ : OffsetSpecification .offset (storedOffset + 1 );
225
+ } else if (previousState == true && active == false ) {
226
+ writerClient .storeOffset (
227
+ consumerName , partition , lastReceivedOffset .get ());
228
+ try {
229
+ waitAtMost (
230
+ () ->
231
+ writerClient .queryOffset (consumerName , partition )
232
+ == lastReceivedOffset .get ());
233
+ } catch (Exception e ) {
234
+ throw new RuntimeException (e );
235
+ }
236
+ result = OffsetSpecification .none ();
237
+ } else {
238
+ throw new IllegalStateException (
239
+ "There should no SAC transition from "
240
+ + previousState
241
+ + " to "
242
+ + active );
243
+ }
244
+ consumerStates .put (subscriptionId , active );
245
+ return result ;
246
+ })
247
+ .chunkListener (
248
+ (client12 , subscriptionId , offset , messageCount1 , dataSize ) ->
249
+ client12 .credit (subscriptionId , 1 ))
250
+ .messageListener (
251
+ (subscriptionId , offset , chunkTimestamp , message ) -> {
252
+ lastReceivedOffset .set (offset );
253
+ receivedMessages .get (subscriptionId ).incrementAndGet ();
254
+ }));
255
+ Map <String , String > parameters = new HashMap <>();
256
+ parameters .put ("single-active-consumer" , "true" );
257
+ parameters .put ("name" , consumerName );
258
+ parameters .put ("super-stream" , superStream );
259
+ Response response =
260
+ client .subscribe (b (0 ), partition , OffsetSpecification .first (), 2 , parameters );
261
+ assertThat (response .isOk ()).isTrue ();
262
+ waitAtMost (() -> consumerStates .get (b (0 )) == true );
263
+
264
+ TestUtils .publishAndWaitForConfirms (cf , messageCount , partition );
265
+
266
+ waitAtMost (() -> receivedMessages .get (b (0 )).get () == messageCount );
267
+ assertThat (lastReceivedOffset ).hasPositiveValue ();
268
+ long firstWaveLimit = lastReceivedOffset .get ();
269
+
270
+ response = client .subscribe (b (1 ), partition , OffsetSpecification .first (), 2 , parameters );
271
+ assertThat (response .isOk ()).isTrue ();
272
+
273
+ waitAtMost (() -> consumerStates .get (b (0 )) == false );
274
+ waitAtMost (() -> consumerStates .get (b (1 )) == true );
275
+
276
+ TestUtils .publishAndWaitForConfirms (cf , messageCount , partition );
277
+
278
+ waitAtMost (() -> receivedMessages .get (b (1 )).get () == messageCount );
279
+ assertThat (lastReceivedOffset ).hasValueGreaterThan (firstWaveLimit );
280
+
281
+ // clean unsubscription, storing the offset
282
+ writerClient .storeOffset (consumerName , partition , lastReceivedOffset .get ());
283
+ waitAtMost (
284
+ () -> writerClient .queryOffset (consumerName , partition ) == lastReceivedOffset .get ());
285
+
286
+ response = client .unsubscribe (b (1 ));
287
+ assertThat (response .isOk ()).isTrue ();
288
+ waitAtMost (() -> consumerStates .get (b (0 )) == true );
289
+ assertThat (consumerStates ).containsEntry (b (0 ), true ); // should not change when unsubscribing
290
+
291
+ response = client .unsubscribe (b (0 ));
292
+ assertThat (response .isOk ()).isTrue ();
293
+
294
+ assertThat (receivedMessages .values ().stream ().mapToInt (AtomicInteger ::get ).sum ())
295
+ .isEqualTo (messageCount * 2 );
296
+
297
+ } finally {
298
+ TestUtils .deleteSuperStreamTopology (c , superStream , 3 );
299
+ c .close ();
300
+ }
301
+ }
180
302
}
0 commit comments