26
26
import java .util .concurrent .ConcurrentHashMap ;
27
27
import java .util .concurrent .CountDownLatch ;
28
28
import java .util .concurrent .atomic .AtomicInteger ;
29
+ import java .util .concurrent .atomic .AtomicLong ;
29
30
import org .junit .jupiter .api .Test ;
30
31
import org .junit .jupiter .api .extension .ExtendWith ;
31
32
@@ -36,8 +37,9 @@ public class SingleActiveConsumerTest {
36
37
TestUtils .ClientFactory cf ;
37
38
38
39
@ Test
39
- void subscribe () throws Exception {
40
+ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes () throws Exception {
40
41
int messageCount = 10000 ;
42
+ AtomicLong lastReceivedOffset = new AtomicLong (0 );
41
43
Map <Byte , Boolean > consumerStates = new ConcurrentHashMap <>();
42
44
Map <Byte , AtomicInteger > receivedMessages = new ConcurrentHashMap <>();
43
45
receivedMessages .put (b (0 ), new AtomicInteger (0 ));
@@ -49,21 +51,28 @@ void subscribe() throws Exception {
49
51
(client , subscriptionId , offset , msgCount , dataSize ) ->
50
52
client .credit (subscriptionId , 1 ))
51
53
.messageListener (
52
- (subscriptionId , offset , chunkTimestamp , message ) ->
53
- receivedMessages .get (subscriptionId ).incrementAndGet ())
54
+ (subscriptionId , offset , chunkTimestamp , message ) -> {
55
+ lastReceivedOffset .set (offset );
56
+ receivedMessages .get (subscriptionId ).incrementAndGet ();
57
+ })
54
58
.consumerUpdateListener (
55
59
(client , subscriptionId , active ) -> {
56
60
consumerStates .put (subscriptionId , active );
57
61
consumerUpdateLatch .countDown ();
58
- return null ;
62
+ if (lastReceivedOffset .get () == 0 ) {
63
+ return OffsetSpecification .first ();
64
+ } else {
65
+ return OffsetSpecification .offset (lastReceivedOffset .get () + 1 );
66
+ }
59
67
});
60
68
Client client = cf .get (clientParameters );
61
69
62
70
TestUtils .publishAndWaitForConfirms (cf , messageCount , stream );
63
71
72
+ String consumerName = "foo" ;
64
73
Map <String , String > parameters = new HashMap <>();
65
74
parameters .put ("single-active-consumer" , "true" );
66
- parameters .put ("name" , "foo" );
75
+ parameters .put ("name" , consumerName );
67
76
Response response = client .subscribe (b (0 ), stream , OffsetSpecification .first (), 2 , parameters );
68
77
assertThat (response .isOk ()).isTrue ();
69
78
response = client .subscribe (b (1 ), stream , OffsetSpecification .first (), 2 , parameters );
@@ -76,5 +85,69 @@ void subscribe() throws Exception {
76
85
77
86
waitAtMost (
78
87
() -> receivedMessages .getOrDefault (b (0 ), new AtomicInteger (0 )).get () == messageCount );
88
+
89
+ assertThat (lastReceivedOffset ).hasPositiveValue ();
90
+ client .storeOffset (consumerName , stream , lastReceivedOffset .get ());
91
+ waitAtMost (() -> client .queryOffset (consumerName , stream ) == lastReceivedOffset .get ());
92
+
93
+ long firstWaveLimit = lastReceivedOffset .get ();
94
+ response = client .unsubscribe (b (0 ));
95
+ assertThat (response .isOk ()).isTrue ();
96
+
97
+ TestUtils .publishAndWaitForConfirms (cf , messageCount , stream );
98
+
99
+ waitAtMost (() -> consumerStates .get (b (1 )) == true );
100
+
101
+ waitAtMost (
102
+ () -> receivedMessages .getOrDefault (b (1 ), new AtomicInteger (0 )).get () == messageCount );
103
+ assertThat (lastReceivedOffset ).hasValueGreaterThan (firstWaveLimit );
104
+
105
+ response = client .unsubscribe (b (1 ));
106
+ assertThat (response .isOk ()).isTrue ();
107
+ }
108
+
109
+ @ Test
110
+ void consumerUpdateListenerShouldBeCalledInOrder () throws Exception {
111
+ StringBuffer consumerUpdateHistory = new StringBuffer ();
112
+ Client client =
113
+ cf .get (
114
+ new ClientParameters ()
115
+ .consumerUpdateListener (
116
+ (client1 , subscriptionId , active ) -> {
117
+ consumerUpdateHistory .append (
118
+ String .format ("<%d.%b>" , subscriptionId , active ));
119
+ return null ;
120
+ }));
121
+ String consumerName = "foo" ;
122
+ Map <String , String > parameters = new HashMap <>();
123
+ parameters .put ("single-active-consumer" , "true" );
124
+ parameters .put ("name" , consumerName );
125
+ Response response = client .subscribe (b (0 ), stream , OffsetSpecification .first (), 2 , parameters );
126
+ assertThat (response .isOk ()).isTrue ();
127
+ waitAtMost (() -> consumerUpdateHistory .toString ().equals ("<0.true>" ));
128
+ for (int i = 1 ; i < 10 ; i ++) {
129
+ byte subscriptionId = b (i );
130
+ response =
131
+ client .subscribe (subscriptionId , stream , OffsetSpecification .first (), 2 , parameters );
132
+ assertThat (response .isOk ()).isTrue ();
133
+ waitAtMost (
134
+ () ->
135
+ consumerUpdateHistory
136
+ .toString ()
137
+ .contains (String .format ("<%d.%b>" , subscriptionId , false )));
138
+ }
139
+
140
+ for (int i = 0 ; i < 9 ; i ++) {
141
+ byte subscriptionId = b (i );
142
+ response = client .unsubscribe (subscriptionId );
143
+ assertThat (response .isOk ()).isTrue ();
144
+ waitAtMost (
145
+ () ->
146
+ consumerUpdateHistory
147
+ .toString ()
148
+ .contains (String .format ("<%d.%b>" , subscriptionId + 1 , true )));
149
+ }
150
+ response = client .unsubscribe (b (9 ));
151
+ assertThat (response .isOk ()).isTrue ();
79
152
}
80
153
}
0 commit comments