Skip to content

Commit 2c48e20

Browse files
authored
Merge pull request #295 from lutovich/1.1-flaky-tests
Fix couple flaky tests
2 parents 5de9312 + c2da8af commit 2c48e20

File tree

6 files changed

+108
-108
lines changed

6 files changed

+108
-108
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void start()
130130
{
131131
throw new ServiceUnavailableException( format(
132132
"Unable to connect to %s, ensure the database is running and that there is a " +
133-
"working network connection to it.", address ) );
133+
"working network connection to it.", address ), e );
134134
}
135135
catch ( IOException e )
136136
{

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,7 @@ public PooledConnection get()
8484
};
8585
PooledConnection conn = connections.acquire( supplier );
8686

87-
if ( closed.get() )
88-
{
89-
connections.terminate();
90-
throw poolClosedException();
91-
}
87+
assertNotClosed( address, connections );
9288

9389
conn.updateTimestamp();
9490
return conn;
@@ -140,16 +136,21 @@ public void close()
140136
}
141137
}
142138

143-
private void assertNotClosed()
139+
private void assertNotClosed( BoltServerAddress address, BlockingPooledConnectionQueue connections )
144140
{
145141
if ( closed.get() )
146142
{
147-
throw poolClosedException();
143+
connections.terminate();
144+
pools.remove( address );
145+
assertNotClosed();
148146
}
149147
}
150148

151-
private static RuntimeException poolClosedException()
149+
private void assertNotClosed()
152150
{
153-
return new IllegalStateException( "Pool closed" );
151+
if ( closed.get() )
152+
{
153+
throw new IllegalStateException( "Pool closed" );
154+
}
154155
}
155156
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ public void closeWithConcurrentAcquisitionsEmptiesThePool() throws InterruptedEx
265265

266266
for ( int i = 0; i < port.intValue(); i++ )
267267
{
268-
assertFalse( pool.hasAddress( new BoltServerAddress( "localhost", i ) ) );
268+
boolean hasAddress = pool.hasAddress( new BoltServerAddress( "localhost", i ) );
269+
assertFalse( "Pool still has connection queues" + pool, hasAddress );
269270
}
270271
for ( Connection connection : createdConnections )
271272
{

driver/src/test/java/org/neo4j/driver/internal/util/CaptureStdOut.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.v1.stress;
2020

21+
import org.junit.After;
22+
import org.junit.Before;
2123
import org.junit.Rule;
2224
import org.junit.Test;
2325

@@ -28,6 +30,7 @@
2830
import java.util.concurrent.ThreadLocalRandom;
2931
import java.util.concurrent.TimeUnit;
3032
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicReference;
3134

3235
import org.neo4j.driver.v1.Config;
3336
import org.neo4j.driver.v1.Driver;
@@ -36,6 +39,7 @@
3639
import org.neo4j.driver.v1.util.TestNeo4j;
3740

3841
import static java.util.Arrays.asList;
42+
import static org.junit.Assert.assertTrue;
3943
import static org.neo4j.driver.v1.GraphDatabase.driver;
4044

4145
public class SessionPoolingStressIT
@@ -44,65 +48,107 @@ public class SessionPoolingStressIT
4448
public TestNeo4j neo4j = new TestNeo4j();
4549

4650
private static final int N_THREADS = 50;
47-
private final ExecutorService executor = Executors.newFixedThreadPool( N_THREADS );
48-
private static final List<String> QUERIES = asList( "RETURN 1295 + 42", "UNWIND range(1,10000) AS x CREATE (n {prop:x}) DELETE n " );
4951
private static final int MAX_TIME = 10000;
50-
private final AtomicBoolean hasFailed = new AtomicBoolean( false );
52+
53+
private static final List<String> QUERIES = asList(
54+
"RETURN 1295 + 42", "UNWIND range(1,10000) AS x CREATE (n {prop:x}) DELETE n " );
55+
56+
private Driver driver;
57+
private ExecutorService executor;
58+
59+
@Before
60+
public void setUp() throws Exception
61+
{
62+
executor = Executors.newFixedThreadPool( N_THREADS );
63+
}
64+
65+
@After
66+
public void tearDown() throws Exception
67+
{
68+
if ( executor != null )
69+
{
70+
executor.shutdownNow();
71+
}
72+
73+
if ( driver != null )
74+
{
75+
driver.close();
76+
}
77+
}
5178

5279
@Test
53-
public void shouldWorkFine() throws InterruptedException
80+
public void shouldWorkFine() throws Throwable
5481
{
55-
Driver driver = driver( neo4j.uri(),
56-
Config.build()
57-
.withEncryptionLevel( Config.EncryptionLevel.NONE )
58-
.withMaxSessions( N_THREADS ).toConfig() );
59-
60-
doWork( driver );
61-
executor.awaitTermination( MAX_TIME + (int)(MAX_TIME * 0.2), TimeUnit.MILLISECONDS );
62-
driver.close();
82+
Config config = Config.build()
83+
.withEncryptionLevel( Config.EncryptionLevel.NONE )
84+
.withMaxSessions( N_THREADS )
85+
.toConfig();
86+
87+
driver = driver( neo4j.uri(), config );
88+
89+
AtomicBoolean stop = new AtomicBoolean();
90+
AtomicReference<Throwable> failureReference = new AtomicReference<>();
91+
92+
doWork( stop, failureReference );
93+
94+
Thread.sleep( MAX_TIME );
95+
96+
stop.set( true );
97+
executor.shutdown();
98+
assertTrue( executor.awaitTermination( MAX_TIME, TimeUnit.MILLISECONDS ) );
99+
100+
Throwable failure = failureReference.get();
101+
if ( failure != null )
102+
{
103+
throw new AssertionError( "Some workers have failed", failure );
104+
}
63105
}
64106

65-
private void doWork( final Driver driver )
107+
private void doWork( AtomicBoolean stop, AtomicReference<Throwable> failure )
66108
{
67109
for ( int i = 0; i < N_THREADS; i++ )
68110
{
69-
executor.execute( new Worker( driver ) );
111+
executor.execute( new Worker( driver, stop, failure ) );
70112
}
71113
}
72114

73115
private class Worker implements Runnable
74116
{
75117
private final Random random = ThreadLocalRandom.current();
76118
private final Driver driver;
119+
private final AtomicBoolean stop;
120+
private final AtomicReference<Throwable> failureReference;
77121

78-
public Worker( Driver driver )
122+
Worker( Driver driver, AtomicBoolean stop, AtomicReference<Throwable> failureReference )
79123
{
80124
this.driver = driver;
125+
this.stop = stop;
126+
this.failureReference = failureReference;
81127
}
82128

83129
@Override
84130
public void run()
85131
{
86132
try
87133
{
88-
long deadline = System.currentTimeMillis() + MAX_TIME;
89-
for (;;)
134+
while ( !stop.get() )
90135
{
91136
for ( String query : QUERIES )
92137
{
93138
runQuery( query );
94139
}
95-
long left = deadline - System.currentTimeMillis();
96-
if ( left <= 0 )
97-
{
98-
break;
99-
}
100140
}
101141
}
102-
catch ( Throwable e )
142+
catch ( Throwable failure )
103143
{
104-
e.printStackTrace();
105-
hasFailed.set( true );
144+
if ( !failureReference.compareAndSet( null, failure ) )
145+
{
146+
Throwable firstFailure = failureReference.get();
147+
synchronized ( firstFailure )
148+
{
149+
firstFailure.addSuppressed( failure );
150+
}
151+
}
106152
}
107153
}
108154

driver/src/test/java/org/neo4j/driver/v1/util/StubServer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import java.io.File;
2222
import java.io.IOException;
23+
import java.net.InetSocketAddress;
24+
import java.net.SocketAddress;
25+
import java.nio.channels.SocketChannel;
2326
import java.util.ArrayList;
2427
import java.util.List;
2528

@@ -33,6 +36,8 @@
3336

3437
public class StubServer
3538
{
39+
private static final int SOCKET_CONNECT_ATTEMPTS = 20;
40+
3641
public static final Config INSECURE_CONFIG = Config.build()
3742
.withEncryptionLevel( Config.EncryptionLevel.NONE ).toConfig();
3843

@@ -50,7 +55,7 @@ private StubServer( String script, int port ) throws IOException, InterruptedExc
5055
command.addAll( asList( Integer.toString( port ), script ) );
5156
ProcessBuilder server = new ProcessBuilder().inheritIO().command( command );
5257
process = server.start();
53-
sleep( 500 ); // might take a moment for the socket to start listening
58+
waitForSocket( port );
5459
}
5560

5661
public static StubServer start( String resource, int port ) throws IOException, InterruptedException
@@ -100,4 +105,22 @@ private static boolean boltKitAvailable()
100105
return false;
101106
}
102107
}
108+
109+
private static void waitForSocket( int port ) throws InterruptedException
110+
{
111+
SocketAddress address = new InetSocketAddress( "localhost", port );
112+
for ( int i = 0; i < SOCKET_CONNECT_ATTEMPTS; i++ )
113+
{
114+
try
115+
{
116+
SocketChannel.open( address );
117+
return;
118+
}
119+
catch ( Exception e )
120+
{
121+
sleep( 300 );
122+
}
123+
}
124+
throw new AssertionError( "Can't connect to " + address );
125+
}
103126
}

0 commit comments

Comments
 (0)