Skip to content

Commit b758757

Browse files
author
Zhen
committed
Make sessioin.reset thread-safe
1 parent 22b45ac commit b758757

File tree

4 files changed

+32
-10
lines changed

4 files changed

+32
-10
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public void run()
5959

6060
private ExplicitTransaction currentTransaction;
6161
private AtomicBoolean isOpen = new AtomicBoolean( true );
62+
private AtomicBoolean markedToClose = new AtomicBoolean( false );
6263

6364
NetworkSession( Connection connection, Logger logger )
6465
{
@@ -108,13 +109,16 @@ public void reset()
108109
ensureNoUnrecoverableError();
109110
ensureConnectionIsOpen();
110111

111-
connection.resetAsync();
112+
if( markedToClose.compareAndSet( false, true ) )
113+
{
114+
connection.resetAsync();
115+
}
112116
}
113117

114118
@Override
115119
public boolean isOpen()
116120
{
117-
return isOpen.get();
121+
return isOpen.get() && !markedToClose.get();
118122
}
119123

120124
@Override
@@ -263,4 +267,16 @@ private void ensureConnectionIsOpen()
263267
"Please close this session and retry your statement in another new session." );
264268
}
265269
}
270+
271+
private void ensureSessionIsOpen()
272+
{
273+
if( !isOpen() )
274+
{
275+
throw new ClientException(
276+
"No more interaction with this session is allowed " +
277+
"as the current session is already closed or marked as closed. " +
278+
"You get this error either because you have a bad reference to a session that has already be closed " +
279+
"or you are trying to reuse a session that you have called `reset` on it." );
280+
}
281+
}
266282
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void sync()
113113
}
114114

115115
@Override
116-
public void flush()
116+
public synchronized void flush()
117117
{
118118
try
119119
{
@@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException )
180180
}
181181
}
182182

183-
private void queueMessage( Message msg, Collector collector )
183+
private synchronized void queueMessage( Message msg, Collector collector )
184184
{
185185
pendingMessages.add( msg );
186186
responseHandler.appendResultCollector( collector );

driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.net;
2020

21-
import java.util.LinkedList;
2221
import java.util.Map;
2322
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2424

2525
import org.neo4j.driver.internal.messaging.MessageHandler;
2626
import org.neo4j.driver.internal.spi.Collector;
@@ -39,7 +39,7 @@
3939

4040
public class SocketResponseHandler implements MessageHandler
4141
{
42-
private final Queue<Collector> collectors = new LinkedList<>();
42+
private final Queue<Collector> collectors = new ConcurrentLinkedQueue<>();
4343

4444
/** If a failure occurs, the error gets stored here */
4545
private Neo4jException error;

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public void shouldKillLongRunningStatement() throws Throwable
105105
final int killTimeout = 1; // 1s
106106
long startTime = -1, endTime;
107107

108-
try( final Session session = driver.session() )
108+
final Session session = driver.session();
109+
try
109110
{
110111
StatementResult result =
111112
session.run( "CALL test.driver.longRunningStatement({seconds})",
@@ -123,13 +124,18 @@ public void shouldKillLongRunningStatement() throws Throwable
123124
{
124125
endTime = System.currentTimeMillis();
125126
assertTrue( startTime > 0 );
126-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
127+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
127128
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
129+
assertFalse( session.isOpen() );
128130
}
129131
catch ( Exception e )
130132
{
131133
fail( "Should be a Neo4jException" );
132134
}
135+
finally
136+
{
137+
session.close();
138+
}
133139
}
134140

135141
@Test
@@ -168,7 +174,7 @@ public void shouldKillLongStreamingResult() throws Throwable
168174
assertThat( recordCount, greaterThan(1) );
169175

170176
assertTrue( startTime > 0 );
171-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
177+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
172178
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
173179
}
174180
}
@@ -190,7 +196,7 @@ public void run()
190196
}
191197
finally
192198
{
193-
session.reset(); // kill the session after timeout
199+
session.reset(); // reset the session after timeout
194200
}
195201
}
196202
} ).start();

0 commit comments

Comments
 (0)