@@ -38,13 +38,15 @@ public class SingleActiveConsumerTest {
38
38
39
39
@ Test
40
40
void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes () throws Exception {
41
+ Client writerClient = cf .get ();
41
42
int messageCount = 10000 ;
42
43
AtomicLong lastReceivedOffset = new AtomicLong (0 );
43
44
Map <Byte , Boolean > consumerStates = new ConcurrentHashMap <>();
44
45
Map <Byte , AtomicInteger > receivedMessages = new ConcurrentHashMap <>();
45
46
receivedMessages .put (b (0 ), new AtomicInteger (0 ));
46
47
receivedMessages .put (b (1 ), new AtomicInteger (0 ));
47
48
CountDownLatch consumerUpdateLatch = new CountDownLatch (2 );
49
+ String consumerName = "foo" ;
48
50
ClientParameters clientParameters =
49
51
new ClientParameters ()
50
52
.chunkListener (
@@ -59,17 +61,17 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
59
61
(client , subscriptionId , active ) -> {
60
62
consumerStates .put (subscriptionId , active );
61
63
consumerUpdateLatch .countDown ();
62
- if (lastReceivedOffset .get () == 0 ) {
64
+ long storedOffset = writerClient .queryOffset (consumerName , stream );
65
+ if (storedOffset == 0 ) {
63
66
return OffsetSpecification .first ();
64
67
} else {
65
- return OffsetSpecification .offset (lastReceivedOffset . get () + 1 );
68
+ return OffsetSpecification .offset (storedOffset + 1 );
66
69
}
67
70
});
68
71
Client client = cf .get (clientParameters );
69
72
70
73
TestUtils .publishAndWaitForConfirms (cf , messageCount , stream );
71
74
72
- String consumerName = "foo" ;
73
75
Map <String , String > parameters = new HashMap <>();
74
76
parameters .put ("single-active-consumer" , "true" );
75
77
parameters .put ("name" , consumerName );
@@ -87,8 +89,8 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
87
89
() -> receivedMessages .getOrDefault (b (0 ), new AtomicInteger (0 )).get () == messageCount );
88
90
89
91
assertThat (lastReceivedOffset ).hasPositiveValue ();
90
- client .storeOffset (consumerName , stream , lastReceivedOffset .get ());
91
- waitAtMost (() -> client .queryOffset (consumerName , stream ) == lastReceivedOffset .get ());
92
+ writerClient .storeOffset (consumerName , stream , lastReceivedOffset .get ());
93
+ waitAtMost (() -> writerClient .queryOffset (consumerName , stream ) == lastReceivedOffset .get ());
92
94
93
95
long firstWaveLimit = lastReceivedOffset .get ();
94
96
response = client .unsubscribe (b (0 ));
0 commit comments