Skip to content

Commit 2af94d0

Browse files
author
Zhen
committed
Make sessioin.reset thread-safe
Sync on `enqueue` and `flush` to avoid sending `reset` and other messages at the same time. Also changed response handler stream collector queue to be concurrent queue so that adding into the queue (sending) and removing from the queue (receiving) can be done safely concurrently. Keep `ack_fail` muted when `reset` is already send but not replied yet.
1 parent 22b45ac commit 2af94d0

File tree

4 files changed

+32
-10
lines changed

4 files changed

+32
-10
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public void run()
5959

6060
private ExplicitTransaction currentTransaction;
6161
private AtomicBoolean isOpen = new AtomicBoolean( true );
62+
private AtomicBoolean markedToClose = new AtomicBoolean( false );
6263

6364
NetworkSession( Connection connection, Logger logger )
6465
{
@@ -108,13 +109,16 @@ public void reset()
108109
ensureNoUnrecoverableError();
109110
ensureConnectionIsOpen();
110111

111-
connection.resetAsync();
112+
if( markedToClose.compareAndSet( false, true ) )
113+
{
114+
connection.resetAsync();
115+
}
112116
}
113117

114118
@Override
115119
public boolean isOpen()
116120
{
117-
return isOpen.get();
121+
return isOpen.get() && !markedToClose.get();
118122
}
119123

120124
@Override
@@ -263,4 +267,16 @@ private void ensureConnectionIsOpen()
263267
"Please close this session and retry your statement in another new session." );
264268
}
265269
}
270+
271+
private void ensureSessionIsOpen()
272+
{
273+
if( !isOpen() )
274+
{
275+
throw new ClientException(
276+
"No more interaction with this session is allowed " +
277+
"as the current session is already closed or marked as closed. " +
278+
"You get this error either because you have a bad reference to a session that has already be closed " +
279+
"or you are trying to reuse a session that you have called `reset` on it." );
280+
}
281+
}
266282
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void sync()
113113
}
114114

115115
@Override
116-
public void flush()
116+
public synchronized void flush()
117117
{
118118
try
119119
{
@@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException )
180180
}
181181
}
182182

183-
private void queueMessage( Message msg, Collector collector )
183+
private synchronized void queueMessage( Message msg, Collector collector )
184184
{
185185
pendingMessages.add( msg );
186186
responseHandler.appendResultCollector( collector );

driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.net;
2020

21-
import java.util.LinkedList;
2221
import java.util.Map;
2322
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2424

2525
import org.neo4j.driver.internal.messaging.MessageHandler;
2626
import org.neo4j.driver.internal.spi.Collector;
@@ -39,7 +39,7 @@
3939

4040
public class SocketResponseHandler implements MessageHandler
4141
{
42-
private final Queue<Collector> collectors = new LinkedList<>();
42+
private final Queue<Collector> collectors = new ConcurrentLinkedQueue<>();
4343

4444
/** If a failure occurs, the error gets stored here */
4545
private Neo4jException error;

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public void shouldKillLongRunningStatement() throws Throwable
105105
final int killTimeout = 1; // 1s
106106
long startTime = -1, endTime;
107107

108-
try( final Session session = driver.session() )
108+
final Session session = driver.session();
109+
try
109110
{
110111
StatementResult result =
111112
session.run( "CALL test.driver.longRunningStatement({seconds})",
@@ -123,13 +124,18 @@ public void shouldKillLongRunningStatement() throws Throwable
123124
{
124125
endTime = System.currentTimeMillis();
125126
assertTrue( startTime > 0 );
126-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
127+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
127128
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
129+
assertFalse( session.isOpen() );
128130
}
129131
catch ( Exception e )
130132
{
131133
fail( "Should be a Neo4jException" );
132134
}
135+
finally
136+
{
137+
session.close();
138+
}
133139
}
134140

135141
@Test
@@ -168,7 +174,7 @@ public void shouldKillLongStreamingResult() throws Throwable
168174
assertThat( recordCount, greaterThan(1) );
169175

170176
assertTrue( startTime > 0 );
171-
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
177+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
172178
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
173179
}
174180
}
@@ -190,7 +196,7 @@ public void run()
190196
}
191197
finally
192198
{
193-
session.reset(); // kill the session after timeout
199+
session.reset(); // reset the session after timeout
194200
}
195201
}
196202
} ).start();

0 commit comments

Comments
 (0)