22
22
23
23
import java .net .URI ;
24
24
import java .util .Arrays ;
25
- import java .util .HashMap ;
26
- import java .util .HashSet ;
27
25
import java .util .List ;
28
- import java .util .Map ;
29
- import java .util .Set ;
30
- import java .util .concurrent .ConcurrentHashMap ;
31
- import java .util .concurrent .ConcurrentMap ;
32
26
import java .util .concurrent .atomic .AtomicInteger ;
33
- import java .util .concurrent .atomic .AtomicLong ;
34
27
35
28
import org .neo4j .driver .AuthToken ;
36
29
import org .neo4j .driver .Config ;
37
- import org .neo4j .driver .Driver ;
38
30
import org .neo4j .driver .exceptions .SessionExpiredException ;
39
- import org .neo4j .driver .internal .BoltServerAddress ;
40
- import org .neo4j .driver .internal .util .ServerVersion ;
41
- import org .neo4j .driver .summary .ResultSummary ;
42
- import org .neo4j .driver .util .cc .ClusterMemberRole ;
43
- import org .neo4j .driver .util .cc .ClusterMemberRoleDiscoveryFactory ;
44
31
import org .neo4j .driver .util .cc .LocalOrRemoteClusterExtension ;
45
32
46
- import static org .hamcrest .Matchers .both ;
47
- import static org .hamcrest .Matchers .greaterThan ;
48
- import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
49
- import static org .hamcrest .Matchers .lessThanOrEqualTo ;
50
- import static org .hamcrest .junit .MatcherAssert .assertThat ;
51
-
52
33
class CausalClusteringStressIT extends AbstractStressTestBase <CausalClusteringStressIT .Context >
53
34
{
54
35
@ RegisterExtension
@@ -104,27 +85,6 @@ boolean handleWriteFailure( Throwable error, Context context )
104
85
return false ;
105
86
}
106
87
107
- @ Override
108
- void assertExpectedReadQueryDistribution ( Context context )
109
- {
110
- Map <String ,Long > readQueriesByServer = context .getReadQueriesByServer ();
111
- ClusterAddresses clusterAddresses = fetchClusterAddresses ( driver );
112
-
113
- // expect all followers to serve more than zero read queries
114
- assertAllAddressesServedReadQueries ( "Follower" , clusterAddresses .followers , readQueriesByServer );
115
-
116
- // expect all read replicas to serve more than zero read queries
117
- assertAllAddressesServedReadQueries ( "Read replica" , clusterAddresses .readReplicas , readQueriesByServer );
118
-
119
- // expect all followers to serve same order of magnitude read queries
120
- assertAllAddressesServedSimilarAmountOfReadQueries ( "Followers" , clusterAddresses .followers ,
121
- readQueriesByServer , clusterAddresses );
122
-
123
- // expect all read replicas to serve same order of magnitude read queries
124
- assertAllAddressesServedSimilarAmountOfReadQueries ( "Read replicas" , clusterAddresses .readReplicas ,
125
- readQueriesByServer , clusterAddresses );
126
- }
127
-
128
88
@ Override
129
89
void printStats ( Context context )
130
90
{
@@ -141,115 +101,10 @@ void dumpLogs()
141
101
clusterRule .dumpClusterLogs ();
142
102
}
143
103
144
- private static ClusterAddresses fetchClusterAddresses ( Driver driver )
145
- {
146
- Set <String > followers = new HashSet <>();
147
- Set <String > readReplicas = new HashSet <>();
148
-
149
- final ClusterMemberRoleDiscoveryFactory .ClusterMemberRoleDiscovery discovery =
150
- ClusterMemberRoleDiscoveryFactory .newInstance ( ServerVersion .version ( driver ) );
151
- final Map <BoltServerAddress ,ClusterMemberRole > clusterOverview = discovery .findClusterOverview ( driver );
152
-
153
- for ( BoltServerAddress address : clusterOverview .keySet () )
154
- {
155
- String boltAddress = String .format ( "%s:%s" , address .host (), address .port () );
156
- ClusterMemberRole role = clusterOverview .get ( address );
157
- if ( role == ClusterMemberRole .FOLLOWER )
158
- {
159
- followers .add ( boltAddress );
160
- }
161
- else if ( role == ClusterMemberRole .READ_REPLICA )
162
- {
163
- readReplicas .add ( boltAddress );
164
- }
165
- }
166
-
167
- return new ClusterAddresses ( followers , readReplicas );
168
- }
169
-
170
- private static void assertAllAddressesServedReadQueries ( String addressType , Set <String > addresses ,
171
- Map <String ,Long > readQueriesByServer )
172
- {
173
- for ( String address : addresses )
174
- {
175
- Long queries = readQueriesByServer .get ( address );
176
- assertThat ( addressType + " did not serve any read queries" , queries , greaterThan ( 0L ) );
177
- }
178
- }
179
-
180
- private static void assertAllAddressesServedSimilarAmountOfReadQueries ( String addressesType , Set <String > addresses ,
181
- Map <String ,Long > readQueriesByServer , ClusterAddresses allAddresses )
182
- {
183
- long expectedOrderOfMagnitude = -1 ;
184
- for ( String address : addresses )
185
- {
186
- long queries = readQueriesByServer .get ( address );
187
- long orderOfMagnitude = orderOfMagnitude ( queries );
188
- if ( expectedOrderOfMagnitude == -1 )
189
- {
190
- expectedOrderOfMagnitude = orderOfMagnitude ;
191
- }
192
- else
193
- {
194
- assertThat ( addressesType + " are expected to serve similar amount of queries. " +
195
- "Addresses: " + allAddresses + ", " +
196
- "read queries served: " + readQueriesByServer ,
197
- orderOfMagnitude ,
198
- both ( greaterThanOrEqualTo ( expectedOrderOfMagnitude - 1 ) )
199
- .and ( lessThanOrEqualTo ( expectedOrderOfMagnitude + 1 ) ) );
200
- }
201
- }
202
- }
203
-
204
- private static long orderOfMagnitude ( long number )
205
- {
206
- long result = 1 ;
207
- while ( number >= 10 )
208
- {
209
- number /= 10 ;
210
- result ++;
211
- }
212
- return result ;
213
- }
214
-
215
104
static class Context extends AbstractContext
216
105
{
217
- final ConcurrentMap <String ,AtomicLong > readQueriesByServer = new ConcurrentHashMap <>();
218
106
final AtomicInteger leaderSwitches = new AtomicInteger ();
219
107
220
- @ Override
221
- public void processSummary ( ResultSummary summary )
222
- {
223
- if ( summary == null )
224
- {
225
- return ;
226
- }
227
-
228
- String serverAddress = summary .server ().address ();
229
-
230
- AtomicLong count = readQueriesByServer .get ( serverAddress );
231
- if ( count == null )
232
- {
233
- count = new AtomicLong ();
234
- AtomicLong existingCounter = readQueriesByServer .putIfAbsent ( serverAddress , count );
235
- if ( existingCounter != null )
236
- {
237
- count = existingCounter ;
238
- }
239
- }
240
- count .incrementAndGet ();
241
- }
242
-
243
- Map <String ,Long > getReadQueriesByServer ()
244
- {
245
- Map <String ,Long > result = new HashMap <>();
246
- for ( Map .Entry <String ,AtomicLong > entry : readQueriesByServer .entrySet () )
247
- {
248
- result .put ( entry .getKey (), entry .getValue ().get () );
249
- }
250
- return result ;
251
- }
252
-
253
108
void leaderSwitch ()
254
109
{
255
110
leaderSwitches .incrementAndGet ();
@@ -261,24 +116,4 @@ int getLeaderSwitchCount()
261
116
}
262
117
}
263
118
264
- private static class ClusterAddresses
265
- {
266
- final Set <String > followers ;
267
- final Set <String > readReplicas ;
268
-
269
- ClusterAddresses ( Set <String > followers , Set <String > readReplicas )
270
- {
271
- this .followers = followers ;
272
- this .readReplicas = readReplicas ;
273
- }
274
-
275
- @ Override
276
- public String toString ()
277
- {
278
- return "ClusterAddresses{" +
279
- "followers=" + followers +
280
- ", readReplicas=" + readReplicas +
281
- '}' ;
282
- }
283
- }
284
119
}
0 commit comments