18
18
*/
19
19
package org .neo4j .driver .v1 .stress ;
20
20
21
+ import org .junit .After ;
22
+ import org .junit .Before ;
21
23
import org .junit .Rule ;
22
24
import org .junit .Test ;
23
25
28
30
import java .util .concurrent .ThreadLocalRandom ;
29
31
import java .util .concurrent .TimeUnit ;
30
32
import java .util .concurrent .atomic .AtomicBoolean ;
33
+ import java .util .concurrent .atomic .AtomicReference ;
31
34
32
35
import org .neo4j .driver .v1 .Config ;
33
36
import org .neo4j .driver .v1 .Driver ;
36
39
import org .neo4j .driver .v1 .util .TestNeo4j ;
37
40
38
41
import static java .util .Arrays .asList ;
42
+ import static org .junit .Assert .assertTrue ;
39
43
import static org .neo4j .driver .v1 .GraphDatabase .driver ;
40
44
41
45
public class SessionPoolingStressIT
@@ -44,65 +48,107 @@ public class SessionPoolingStressIT
44
48
public TestNeo4j neo4j = new TestNeo4j ();
45
49
46
50
private static final int N_THREADS = 50 ;
47
- private final ExecutorService executor = Executors .newFixedThreadPool ( N_THREADS );
48
- private static final List <String > QUERIES = asList ( "RETURN 1295 + 42" , "UNWIND range(1,10000) AS x CREATE (n {prop:x}) DELETE n " );
49
51
private static final int MAX_TIME = 10000 ;
50
- private final AtomicBoolean hasFailed = new AtomicBoolean ( false );
52
+
53
+ private static final List <String > QUERIES = asList (
54
+ "RETURN 1295 + 42" , "UNWIND range(1,10000) AS x CREATE (n {prop:x}) DELETE n " );
55
+
56
+ private Driver driver ;
57
+ private ExecutorService executor ;
58
+
59
+ @ Before
60
+ public void setUp () throws Exception
61
+ {
62
+ executor = Executors .newFixedThreadPool ( N_THREADS );
63
+ }
64
+
65
+ @ After
66
+ public void tearDown () throws Exception
67
+ {
68
+ if ( executor != null )
69
+ {
70
+ executor .shutdownNow ();
71
+ }
72
+
73
+ if ( driver != null )
74
+ {
75
+ driver .close ();
76
+ }
77
+ }
51
78
52
79
@ Test
53
- public void shouldWorkFine () throws InterruptedException
80
+ public void shouldWorkFine () throws Throwable
54
81
{
55
- Driver driver = driver ( neo4j .uri (),
56
- Config .build ()
57
- .withEncryptionLevel ( Config .EncryptionLevel .NONE )
58
- .withMaxSessions ( N_THREADS ).toConfig () );
59
-
60
- doWork ( driver );
61
- executor .awaitTermination ( MAX_TIME + (int )(MAX_TIME * 0.2 ), TimeUnit .MILLISECONDS );
62
- driver .close ();
82
+ Config config = Config .build ()
83
+ .withEncryptionLevel ( Config .EncryptionLevel .NONE )
84
+ .withMaxSessions ( N_THREADS )
85
+ .toConfig ();
86
+
87
+ driver = driver ( neo4j .uri (), config );
88
+
89
+ AtomicBoolean stop = new AtomicBoolean ();
90
+ AtomicReference <Throwable > failureReference = new AtomicReference <>();
91
+
92
+ doWork ( stop , failureReference );
93
+
94
+ Thread .sleep ( MAX_TIME );
95
+
96
+ stop .set ( true );
97
+ executor .shutdown ();
98
+ assertTrue ( executor .awaitTermination ( MAX_TIME , TimeUnit .MILLISECONDS ) );
99
+
100
+ Throwable failure = failureReference .get ();
101
+ if ( failure != null )
102
+ {
103
+ throw new AssertionError ( "Some workers have failed" , failure );
104
+ }
63
105
}
64
106
65
- private void doWork ( final Driver driver )
107
+ private void doWork ( AtomicBoolean stop , AtomicReference < Throwable > failure )
66
108
{
67
109
for ( int i = 0 ; i < N_THREADS ; i ++ )
68
110
{
69
- executor .execute ( new Worker ( driver ) );
111
+ executor .execute ( new Worker ( driver , stop , failure ) );
70
112
}
71
113
}
72
114
73
115
private class Worker implements Runnable
74
116
{
75
117
private final Random random = ThreadLocalRandom .current ();
76
118
private final Driver driver ;
119
+ private final AtomicBoolean stop ;
120
+ private final AtomicReference <Throwable > failureReference ;
77
121
78
- public Worker ( Driver driver )
122
+ Worker ( Driver driver , AtomicBoolean stop , AtomicReference < Throwable > failureReference )
79
123
{
80
124
this .driver = driver ;
125
+ this .stop = stop ;
126
+ this .failureReference = failureReference ;
81
127
}
82
128
83
129
@ Override
84
130
public void run ()
85
131
{
86
132
try
87
133
{
88
- long deadline = System .currentTimeMillis () + MAX_TIME ;
89
- for (;;)
134
+ while ( !stop .get () )
90
135
{
91
136
for ( String query : QUERIES )
92
137
{
93
138
runQuery ( query );
94
139
}
95
- long left = deadline - System .currentTimeMillis ();
96
- if ( left <= 0 )
97
- {
98
- break ;
99
- }
100
140
}
101
141
}
102
- catch ( Throwable e )
142
+ catch ( Throwable failure )
103
143
{
104
- e .printStackTrace ();
105
- hasFailed .set ( true );
144
+ if ( !failureReference .compareAndSet ( null , failure ) )
145
+ {
146
+ Throwable firstFailure = failureReference .get ();
147
+ synchronized ( firstFailure )
148
+ {
149
+ firstFailure .addSuppressed ( failure );
150
+ }
151
+ }
106
152
}
107
153
}
108
154
0 commit comments