Skip to content

Commit 8e521d1

Browse files
author
Zhen
committed
Renamed the kill method to reset
Added test using long running procedures to test session.reset Made the client to not ack_failure while a reset is called asynclly but has not yet received success to avoid ack_failure on IDEL state in server
1 parent 6bdecd7 commit 8e521d1

File tree

13 files changed

+201
-21
lines changed

13 files changed

+201
-21
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,12 @@ public StatementResult run( Statement statement )
9898
return cursor;
9999
}
100100

101-
public void kill()
101+
public void reset()
102102
{
103103
ensureNoUnrecoverableError();
104104
ensureConnectionIsOpen();
105105

106-
connection.resetAndFlushAsync();
106+
connection.resetAsync();
107107
}
108108

109109
@Override

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,9 +202,15 @@ public boolean hasUnrecoverableErrors()
202202
}
203203

204204
@Override
205-
public void resetAndFlushAsync()
205+
public void resetAsync()
206206
{
207-
delegate.resetAndFlushAsync();
207+
delegate.resetAsync();
208+
}
209+
210+
@Override
211+
public boolean isInterrupted()
212+
{
213+
return delegate.isInterrupted();
208214
}
209215

210216
private void markAsAvailable()

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@
2323
import java.util.LinkedList;
2424
import java.util.Map;
2525
import java.util.Queue;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627

2728
import org.neo4j.driver.internal.messaging.InitMessage;
2829
import org.neo4j.driver.internal.messaging.Message;
2930
import org.neo4j.driver.internal.messaging.PullAllMessage;
3031
import org.neo4j.driver.internal.messaging.RunMessage;
3132
import org.neo4j.driver.internal.security.SecurityPlan;
3233
import org.neo4j.driver.internal.spi.Connection;
33-
import org.neo4j.driver.v1.Logger;
3434
import org.neo4j.driver.internal.spi.StreamCollector;
35+
import org.neo4j.driver.v1.Logger;
3536
import org.neo4j.driver.v1.Logging;
3637
import org.neo4j.driver.v1.Value;
3738
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -45,6 +46,7 @@ public class SocketConnection implements Connection
4546
{
4647
private final Queue<Message> pendingMessages = new LinkedList<>();
4748
private final SocketResponseHandler responseHandler;
49+
private AtomicBoolean interrupted = new AtomicBoolean( false );
4850

4951
private final SocketClient socket;
5052

@@ -208,9 +210,25 @@ public boolean hasUnrecoverableErrors()
208210
}
209211

210212
@Override
211-
public void resetAndFlushAsync()
213+
public void resetAsync()
212214
{
213-
reset();
214-
flush();
215+
if( interrupted.compareAndSet( false, true ) )
216+
{
217+
queueMessage( RESET, new StreamCollector.ResetStreamCollector( new Runnable()
218+
{
219+
@Override
220+
public void run()
221+
{
222+
interrupted.set( false );
223+
}
224+
} ) );
225+
flush();
226+
}
227+
}
228+
229+
@Override
230+
public boolean isInterrupted()
231+
{
232+
return interrupted.get();
215233
}
216234
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,5 +244,4 @@ public void clearError()
244244
{
245245
error = null;
246246
}
247-
248247
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,18 +212,24 @@ public boolean hasUnrecoverableErrors()
212212
}
213213

214214
@Override
215-
public void resetAndFlushAsync()
215+
public void resetAsync()
216216
{
217217
try
218218
{
219-
delegate.resetAndFlushAsync();
219+
delegate.resetAsync();
220220
}
221221
catch( RuntimeException e )
222222
{
223223
onDelegateException( e );
224224
}
225225
}
226226

227+
@Override
228+
public boolean isInterrupted()
229+
{
230+
return delegate.isInterrupted();
231+
}
232+
227233
public void dispose()
228234
{
229235
delegate.close();
@@ -241,7 +247,7 @@ private void onDelegateException( RuntimeException e )
241247
{
242248
unrecoverableErrorsOccurred = true;
243249
}
244-
else
250+
else if( !isInterrupted() )
245251
{
246252
ackFailure();
247253
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,13 @@ public interface Connection extends AutoCloseable
108108
boolean hasUnrecoverableErrors();
109109

110110
/**
111-
* Asynchronously sending reset and flush to the socket output channel.
111+
* Asynchronously sending reset to the socket output channel.
112112
*/
113-
void resetAndFlushAsync();
113+
void resetAsync();
114+
115+
/**
116+
* Return true if the current session statement execution has been interrupted by another thread, otherwise false.
117+
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
118+
*/
119+
boolean isInterrupted();
114120
}

driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,22 @@ public void doneIgnored()
6060
}
6161
};
6262

63+
StreamCollector RESET = new ResetStreamCollector();
6364

64-
StreamCollector RESET = new NoOperationStreamCollector()
65+
class ResetStreamCollector extends NoOperationStreamCollector
6566
{
67+
private final Runnable doneSuccessCallBack;
68+
69+
public ResetStreamCollector()
70+
{
71+
this( null );
72+
}
73+
74+
public ResetStreamCollector( Runnable doneSuccessCallBack )
75+
{
76+
this.doneSuccessCallBack = doneSuccessCallBack;
77+
}
78+
6679
@Override
6780
public void doneFailure( Neo4jException error )
6881
{
@@ -76,7 +89,17 @@ public void doneIgnored()
7689
throw new ClientException(
7790
"Invalid server response message `IGNORED` received for client message `RESET`." );
7891
}
79-
};
92+
93+
@Override
94+
public void doneSuccess()
95+
{
96+
if( doneSuccessCallBack != null )
97+
{
98+
doneSuccessCallBack.run();
99+
}
100+
}
101+
}
102+
80103

81104
class NoOperationStreamCollector implements StreamCollector
82105
{

driver/src/main/java/org/neo4j/driver/v1/Session.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ public interface Session extends Resource, StatementRunner
5959
Transaction beginTransaction();
6060

6161
/**
62-
* Stop running more statements in this session and rest the session to a clean state.
62+
* Reset the current session. This sends an immediate RESET signal to the server which both interrupts
63+
* any statement that is currently executing and ignores any subsequently queued statements. Following
64+
* the reset, the current transaction will have been rolled back and any outstanding failures will
65+
* have been acknowledged.
6366
*/
64-
void kill();
67+
void reset();
68+
6569
/**
6670
* Signal that you are done using this session. In the default driver usage, closing
6771
* and accessing sessions is very low cost, because sessions are pooled by {@link Driver}.

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@
2626
import org.neo4j.driver.v1.Driver;
2727
import org.neo4j.driver.v1.GraphDatabase;
2828
import org.neo4j.driver.v1.Session;
29+
import org.neo4j.driver.v1.StatementResult;
30+
import org.neo4j.driver.v1.exceptions.ClientException;
2931
import org.neo4j.driver.v1.util.TestNeo4j;
3032

33+
import static org.hamcrest.CoreMatchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
3135
import static org.junit.Assert.assertFalse;
36+
import static org.junit.Assert.assertThat;
37+
import static org.junit.Assert.assertTrue;
38+
import static org.junit.Assert.fail;
39+
import static org.neo4j.driver.v1.Values.parameters;
3240

3341
public class SessionIT
3442
{
@@ -78,4 +86,104 @@ public void shouldHandleNullAuthToken() throws Throwable
7886
// Then
7987
assertFalse( session.isOpen() );
8088
}
89+
90+
@Test
91+
public void shouldKillLongRunningStatement() throws Throwable
92+
{
93+
neo4j.ensureProcedures( "longRunningStatement.jar" );
94+
// Given
95+
Driver driver = GraphDatabase.driver( neo4j.uri() );
96+
97+
int executionTimeout = 10; // 10s
98+
final int killTimeout = 1; // 1s
99+
long startTime = -1, endTime;
100+
101+
try( final Session session = driver.session() )
102+
{
103+
StatementResult result =
104+
session.run( "CALL test.driver.longRunningStatement({seconds})",
105+
parameters( "seconds", executionTimeout ) );
106+
107+
resetSessionAfterTimeout( session, killTimeout );
108+
109+
// When
110+
startTime = System.currentTimeMillis();
111+
result.consume();// blocking to run the statement
112+
113+
fail("Should have got an exception about statement get killed.");
114+
}
115+
catch( ClientException e )
116+
{
117+
endTime = System.currentTimeMillis();
118+
assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") );
119+
120+
assertTrue( startTime > 0 );
121+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
122+
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
123+
}
124+
}
125+
126+
@Test
127+
public void shouldKillLongStreamingResult() throws Throwable
128+
{
129+
neo4j.ensureProcedures( "longRunningStatement.jar" );
130+
// Given
131+
Driver driver = GraphDatabase.driver( neo4j.uri() );
132+
133+
int executionTimeout = 10; // 10s
134+
final int killTimeout = 1; // 1s
135+
long startTime = -1, endTime;
136+
int recordCount = 0;
137+
138+
try( final Session session = driver.session() )
139+
{
140+
StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})",
141+
parameters( "seconds", executionTimeout ) );
142+
143+
resetSessionAfterTimeout( session, killTimeout );
144+
145+
// When
146+
startTime = System.currentTimeMillis();
147+
while( result.hasNext() )
148+
{
149+
result.next();
150+
recordCount++;
151+
}
152+
153+
fail("Should have got an exception about statement get killed.");
154+
}
155+
catch( ClientException e )
156+
{
157+
endTime = System.currentTimeMillis();
158+
assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") );
159+
assertThat( recordCount, greaterThan(1) );
160+
161+
assertTrue( startTime > 0 );
162+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
163+
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
164+
}
165+
}
166+
167+
private void resetSessionAfterTimeout( final Session session, final int timeout )
168+
{
169+
new Thread( new Runnable()
170+
{
171+
@Override
172+
public void run()
173+
{
174+
try
175+
{
176+
Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds
177+
}
178+
catch ( InterruptedException e )
179+
{
180+
e.printStackTrace();
181+
}
182+
finally
183+
{
184+
session.reset(); // kill the session after timeout
185+
}
186+
}
187+
} ).start();
188+
}
81189
}

driver/src/test/java/org/neo4j/driver/v1/util/FileTools.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public static void copyFile( File srcFile, File dstFile ) throws IOException
164164
catch ( IOException e )
165165
{
166166
// Because the message from this cause may not mention which file it's about
167-
throw new IOException( "Could not copy '" + srcFile + "' to '" + dstFile + "'", e );
167+
throw new IOException( "Could not copy '" + srcFile.getCanonicalPath() + "' to '" + dstFile.getCanonicalPath() + "'", e );
168168
}
169169
finally
170170
{

driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,14 @@ public void updateEncryptionKeyAndCert( File key, File cert ) throws Exception
123123
FileTools.copyFile( cert, Neo4jSettings.DEFAULT_TLS_CERT_FILE );
124124
runner.forceToRestart(); // needs to force to restart as no configuration changed
125125
}
126+
127+
public void ensureProcedures( String jarName ) throws IOException
128+
{
129+
File procedureJar = new File( Neo4jRunner.NEO4J_HOME, "plugins/" + jarName );
130+
if( !procedureJar.exists() )
131+
{
132+
FileTools.copyFile( new File( "src/test/resources", jarName ), procedureJar );
133+
runner.forceToRestart(); // needs to force to restart as no configuration changed
134+
}
135+
}
126136
}

driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ public Transaction beginTransaction()
101101
}
102102

103103
@Override
104-
public void kill()
104+
public void reset()
105105
{
106-
realSession.kill();
106+
realSession.reset();
107107
}
108108

109109
@Override
Binary file not shown.

0 commit comments

Comments
 (0)