Skip to content

Commit 786f858

Browse files
author
Zhen
committed
Make sessioin.reset thread-safe
1 parent eb4087e commit 786f858

File tree

4 files changed

+40
-23
lines changed

4 files changed

+40
-23
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public void run()
5454

5555
private InternalTransaction currentTransaction;
5656
private AtomicBoolean isOpen = new AtomicBoolean( true );
57+
private AtomicBoolean markedToClose = new AtomicBoolean( false );
5758

5859
public InternalSession( Connection connection, Logger logger )
5960
{
@@ -103,13 +104,16 @@ public void reset()
103104
ensureNoUnrecoverableError();
104105
ensureConnectionIsOpen();
105106

106-
connection.resetAsync();
107+
if( markedToClose.compareAndSet( false, true ) )
108+
{
109+
connection.resetAsync();
110+
}
107111
}
108112

109113
@Override
110114
public boolean isOpen()
111115
{
112-
return isOpen.get();
116+
return isOpen.get() && !markedToClose.get();
113117
}
114118

115119
@Override
@@ -245,4 +249,16 @@ private void ensureConnectionIsOpen()
245249
"Please close this session and retry your statement in another new session." );
246250
}
247251
}
252+
253+
private void ensureSessionIsOpen()
254+
{
255+
if( !isOpen() )
256+
{
257+
throw new ClientException(
258+
"No more interaction with this session is allowed " +
259+
"as the current session is already closed or marked as closed. " +
260+
"You get this error either because you have a bad reference to a session that has already be closed " +
261+
"or you are trying to reuse a session that you have called `reset` on it." );
262+
}
263+
}
248264
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void sync()
113113
}
114114

115115
@Override
116-
public void flush()
116+
public synchronized void flush()
117117
{
118118
try
119119
{
@@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException )
180180
}
181181
}
182182

183-
private void queueMessage( Message msg, StreamCollector collector )
183+
private synchronized void queueMessage( Message msg, StreamCollector collector )
184184
{
185185
pendingMessages.add( msg );
186186
responseHandler.appendResultCollector( collector );
@@ -213,18 +213,8 @@ public boolean hasUnrecoverableErrors()
213213
@Override
214214
public void resetAsync()
215215
{
216-
if( interrupted.compareAndSet( false, true ) )
217-
{
218-
queueMessage( RESET, new StreamCollector.ResetStreamCollector( new Runnable()
219-
{
220-
@Override
221-
public void run()
222-
{
223-
interrupted.set( false );
224-
}
225-
} ) );
226-
flush();
227-
}
216+
queueMessage( RESET, StreamCollector.RESET );
217+
flush();
228218
}
229219

230220
@Override

driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.net;
2020

21-
import java.util.LinkedList;
2221
import java.util.Map;
2322
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2424

2525
import org.neo4j.driver.internal.messaging.MessageHandler;
2626
import org.neo4j.driver.internal.spi.StreamCollector;
@@ -39,7 +39,7 @@
3939

4040
public class SocketResponseHandler implements MessageHandler
4141
{
42-
private final Queue<StreamCollector> collectors = new LinkedList<>();
42+
private final Queue<StreamCollector> collectors = new ConcurrentLinkedQueue<>();
4343

4444
/** If a failure occurs, the error gets stored here */
4545
private Neo4jException error;

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import org.junit.Rule;
2222
import org.junit.Test;
2323

24+
import java.util.logging.Level;
25+
26+
import org.neo4j.driver.internal.logging.ConsoleLogging;
2427
import org.neo4j.driver.v1.AuthToken;
2528
import org.neo4j.driver.v1.AuthTokens;
29+
import org.neo4j.driver.v1.Config;
2630
import org.neo4j.driver.v1.Driver;
2731
import org.neo4j.driver.v1.GraphDatabase;
2832
import org.neo4j.driver.v1.Session;
@@ -99,13 +103,15 @@ public void shouldKillLongRunningStatement() throws Throwable
99103
{
100104
neo4j.ensureProcedures( "longRunningStatement.jar" );
101105
// Given
102-
Driver driver = GraphDatabase.driver( neo4j.uri() );
106+
Driver driver = GraphDatabase.driver( neo4j.uri(), Config.build().withLogging( new ConsoleLogging( Level.ALL
107+
) ).toConfig() );
103108

104109
int executionTimeout = 10; // 10s
105110
final int killTimeout = 1; // 1s
106111
long startTime = -1, endTime;
107112

108-
try( final Session session = driver.session() )
113+
final Session session = driver.session();
114+
try
109115
{
110116
StatementResult result =
111117
session.run( "CALL test.driver.longRunningStatement({seconds})",
@@ -123,13 +129,18 @@ public void shouldKillLongRunningStatement() throws Throwable
123129
{
124130
endTime = System.currentTimeMillis();
125131
assertTrue( startTime > 0 );
126-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
132+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
127133
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
134+
assertFalse( session.isOpen() );
128135
}
129136
catch ( Exception e )
130137
{
131138
fail( "Should be a Neo4jException" );
132139
}
140+
finally
141+
{
142+
session.close();
143+
}
133144
}
134145

135146
@Test
@@ -168,7 +179,7 @@ public void shouldKillLongStreamingResult() throws Throwable
168179
assertThat( recordCount, greaterThan(1) );
169180

170181
assertTrue( startTime > 0 );
171-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
182+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
172183
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
173184
}
174185
}
@@ -190,7 +201,7 @@ public void run()
190201
}
191202
finally
192203
{
193-
session.reset(); // kill the session after timeout
204+
session.reset(); // reset the session after timeout
194205
}
195206
}
196207
} ).start();

0 commit comments

Comments
 (0)