Skip to content

Commit 4f069f0

Browse files
authored
Merge pull request #210 from zhenlineo/1.1-kill-query
Kill queries
2 parents 2fa3e87 + 8e521d1 commit 4f069f0

File tree

13 files changed

+234
-6
lines changed

13 files changed

+234
-6
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ public StatementResult run( Statement statement )
9898
return cursor;
9999
}
100100

101+
public void reset()
102+
{
103+
ensureNoUnrecoverableError();
104+
ensureConnectionIsOpen();
105+
106+
connection.resetAsync();
107+
}
108+
101109
@Override
102110
public boolean isOpen()
103111
{

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ public boolean hasUnrecoverableErrors()
201201
return delegate.hasUnrecoverableErrors();
202202
}
203203

204+
@Override
205+
public void resetAsync()
206+
{
207+
delegate.resetAsync();
208+
}
209+
210+
@Override
211+
public boolean isInterrupted()
212+
{
213+
return delegate.isInterrupted();
214+
}
215+
204216
private void markAsAvailable()
205217
{
206218
inUse.set( false );

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@
2323
import java.util.LinkedList;
2424
import java.util.Map;
2525
import java.util.Queue;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627

2728
import org.neo4j.driver.internal.messaging.InitMessage;
2829
import org.neo4j.driver.internal.messaging.Message;
2930
import org.neo4j.driver.internal.messaging.PullAllMessage;
3031
import org.neo4j.driver.internal.messaging.RunMessage;
3132
import org.neo4j.driver.internal.security.SecurityPlan;
3233
import org.neo4j.driver.internal.spi.Connection;
33-
import org.neo4j.driver.v1.Logger;
3434
import org.neo4j.driver.internal.spi.StreamCollector;
35+
import org.neo4j.driver.v1.Logger;
3536
import org.neo4j.driver.v1.Logging;
3637
import org.neo4j.driver.v1.Value;
3738
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -45,6 +46,7 @@ public class SocketConnection implements Connection
4546
{
4647
private final Queue<Message> pendingMessages = new LinkedList<>();
4748
private final SocketResponseHandler responseHandler;
49+
private AtomicBoolean interrupted = new AtomicBoolean( false );
4850

4951
private final SocketClient socket;
5052

@@ -206,4 +208,27 @@ public boolean hasUnrecoverableErrors()
206208
{
207209
throw new UnsupportedOperationException( "Unrecoverable error detection is not supported on SocketConnection." );
208210
}
211+
212+
@Override
213+
public void resetAsync()
214+
{
215+
if( interrupted.compareAndSet( false, true ) )
216+
{
217+
queueMessage( RESET, new StreamCollector.ResetStreamCollector( new Runnable()
218+
{
219+
@Override
220+
public void run()
221+
{
222+
interrupted.set( false );
223+
}
224+
} ) );
225+
flush();
226+
}
227+
}
228+
229+
@Override
230+
public boolean isInterrupted()
231+
{
232+
return interrupted.get();
233+
}
209234
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketResponseHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,5 +244,4 @@ public void clearError()
244244
{
245245
error = null;
246246
}
247-
248247
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,25 @@ public boolean hasUnrecoverableErrors()
211211
return unrecoverableErrorsOccurred;
212212
}
213213

214+
@Override
215+
public void resetAsync()
216+
{
217+
try
218+
{
219+
delegate.resetAsync();
220+
}
221+
catch( RuntimeException e )
222+
{
223+
onDelegateException( e );
224+
}
225+
}
226+
227+
@Override
228+
public boolean isInterrupted()
229+
{
230+
return delegate.isInterrupted();
231+
}
232+
214233
public void dispose()
215234
{
216235
delegate.close();
@@ -228,7 +247,7 @@ private void onDelegateException( RuntimeException e )
228247
{
229248
unrecoverableErrorsOccurred = true;
230249
}
231-
else
250+
else if( !isInterrupted() )
232251
{
233252
ackFailure();
234253
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,15 @@ public interface Connection extends AutoCloseable
106106

107107

108108
boolean hasUnrecoverableErrors();
109+
110+
/**
111+
* Asynchronously sending reset to the socket output channel.
112+
*/
113+
void resetAsync();
114+
115+
/**
116+
* Return true if the current session statement execution has been interrupted by another thread, otherwise false.
117+
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
118+
*/
119+
boolean isInterrupted();
109120
}

driver/src/main/java/org/neo4j/driver/internal/spi/StreamCollector.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,22 @@ public void doneIgnored()
6060
}
6161
};
6262

63+
StreamCollector RESET = new ResetStreamCollector();
6364

64-
StreamCollector RESET = new NoOperationStreamCollector()
65+
class ResetStreamCollector extends NoOperationStreamCollector
6566
{
67+
private final Runnable doneSuccessCallBack;
68+
69+
public ResetStreamCollector()
70+
{
71+
this( null );
72+
}
73+
74+
public ResetStreamCollector( Runnable doneSuccessCallBack )
75+
{
76+
this.doneSuccessCallBack = doneSuccessCallBack;
77+
}
78+
6679
@Override
6780
public void doneFailure( Neo4jException error )
6881
{
@@ -76,7 +89,17 @@ public void doneIgnored()
7689
throw new ClientException(
7790
"Invalid server response message `IGNORED` received for client message `RESET`." );
7891
}
79-
};
92+
93+
@Override
94+
public void doneSuccess()
95+
{
96+
if( doneSuccessCallBack != null )
97+
{
98+
doneSuccessCallBack.run();
99+
}
100+
}
101+
}
102+
80103

81104
class NoOperationStreamCollector implements StreamCollector
82105
{

driver/src/main/java/org/neo4j/driver/v1/Session.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ public interface Session extends Resource, StatementRunner
5858
*/
5959
Transaction beginTransaction();
6060

61+
/**
62+
* Reset the current session. This sends an immediate RESET signal to the server which both interrupts
63+
* any statement that is currently executing and ignores any subsequently queued statements. Following
64+
* the reset, the current transaction will have been rolled back and any outstanding failures will
65+
* have been acknowledged.
66+
*/
67+
void reset();
6168

6269
/**
6370
* Signal that you are done using this session. In the default driver usage, closing

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@
2626
import org.neo4j.driver.v1.Driver;
2727
import org.neo4j.driver.v1.GraphDatabase;
2828
import org.neo4j.driver.v1.Session;
29+
import org.neo4j.driver.v1.StatementResult;
30+
import org.neo4j.driver.v1.exceptions.ClientException;
2931
import org.neo4j.driver.v1.util.TestNeo4j;
3032

33+
import static org.hamcrest.CoreMatchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
3135
import static org.junit.Assert.assertFalse;
36+
import static org.junit.Assert.assertThat;
37+
import static org.junit.Assert.assertTrue;
38+
import static org.junit.Assert.fail;
39+
import static org.neo4j.driver.v1.Values.parameters;
3240

3341
public class SessionIT
3442
{
@@ -78,4 +86,104 @@ public void shouldHandleNullAuthToken() throws Throwable
7886
// Then
7987
assertFalse( session.isOpen() );
8088
}
89+
90+
@Test
91+
public void shouldKillLongRunningStatement() throws Throwable
92+
{
93+
neo4j.ensureProcedures( "longRunningStatement.jar" );
94+
// Given
95+
Driver driver = GraphDatabase.driver( neo4j.uri() );
96+
97+
int executionTimeout = 10; // 10s
98+
final int killTimeout = 1; // 1s
99+
long startTime = -1, endTime;
100+
101+
try( final Session session = driver.session() )
102+
{
103+
StatementResult result =
104+
session.run( "CALL test.driver.longRunningStatement({seconds})",
105+
parameters( "seconds", executionTimeout ) );
106+
107+
resetSessionAfterTimeout( session, killTimeout );
108+
109+
// When
110+
startTime = System.currentTimeMillis();
111+
result.consume();// blocking to run the statement
112+
113+
fail("Should have got an exception about statement get killed.");
114+
}
115+
catch( ClientException e )
116+
{
117+
endTime = System.currentTimeMillis();
118+
assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") );
119+
120+
assertTrue( startTime > 0 );
121+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
122+
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
123+
}
124+
}
125+
126+
@Test
127+
public void shouldKillLongStreamingResult() throws Throwable
128+
{
129+
neo4j.ensureProcedures( "longRunningStatement.jar" );
130+
// Given
131+
Driver driver = GraphDatabase.driver( neo4j.uri() );
132+
133+
int executionTimeout = 10; // 10s
134+
final int killTimeout = 1; // 1s
135+
long startTime = -1, endTime;
136+
int recordCount = 0;
137+
138+
try( final Session session = driver.session() )
139+
{
140+
StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})",
141+
parameters( "seconds", executionTimeout ) );
142+
143+
resetSessionAfterTimeout( session, killTimeout );
144+
145+
// When
146+
startTime = System.currentTimeMillis();
147+
while( result.hasNext() )
148+
{
149+
result.next();
150+
recordCount++;
151+
}
152+
153+
fail("Should have got an exception about statement get killed.");
154+
}
155+
catch( ClientException e )
156+
{
157+
endTime = System.currentTimeMillis();
158+
assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") );
159+
assertThat( recordCount, greaterThan(1) );
160+
161+
assertTrue( startTime > 0 );
162+
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
163+
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
164+
}
165+
}
166+
167+
private void resetSessionAfterTimeout( final Session session, final int timeout )
168+
{
169+
new Thread( new Runnable()
170+
{
171+
@Override
172+
public void run()
173+
{
174+
try
175+
{
176+
Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds
177+
}
178+
catch ( InterruptedException e )
179+
{
180+
e.printStackTrace();
181+
}
182+
finally
183+
{
184+
session.reset(); // kill the session after timeout
185+
}
186+
}
187+
} ).start();
188+
}
81189
}

driver/src/test/java/org/neo4j/driver/v1/util/FileTools.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public static void copyFile( File srcFile, File dstFile ) throws IOException
164164
catch ( IOException e )
165165
{
166166
// Because the message from this cause may not mention which file it's about
167-
throw new IOException( "Could not copy '" + srcFile + "' to '" + dstFile + "'", e );
167+
throw new IOException( "Could not copy '" + srcFile.getCanonicalPath() + "' to '" + dstFile.getCanonicalPath() + "'", e );
168168
}
169169
finally
170170
{

driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,14 @@ public void updateEncryptionKeyAndCert( File key, File cert ) throws Exception
123123
FileTools.copyFile( cert, Neo4jSettings.DEFAULT_TLS_CERT_FILE );
124124
runner.forceToRestart(); // needs to force to restart as no configuration changed
125125
}
126+
127+
public void ensureProcedures( String jarName ) throws IOException
128+
{
129+
File procedureJar = new File( Neo4jRunner.NEO4J_HOME, "plugins/" + jarName );
130+
if( !procedureJar.exists() )
131+
{
132+
FileTools.copyFile( new File( "src/test/resources", jarName ), procedureJar );
133+
runner.forceToRestart(); // needs to force to restart as no configuration changed
134+
}
135+
}
126136
}

driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ public Transaction beginTransaction()
100100
return realSession.beginTransaction();
101101
}
102102

103+
@Override
104+
public void reset()
105+
{
106+
realSession.reset();
107+
}
108+
103109
@Override
104110
public StatementResult run( String statementText, Map<String,Object> statementParameters )
105111
{
Binary file not shown.

0 commit comments

Comments
 (0)