Skip to content

Kill queries #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public StatementResult run( Statement statement )
return cursor;
}

public void reset()
{
ensureNoUnrecoverableError();
ensureConnectionIsOpen();

connection.resetAsync();
}

@Override
public boolean isOpen()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ public boolean hasUnrecoverableErrors()
return delegate.hasUnrecoverableErrors();
}

@Override
public void resetAsync()
{
delegate.resetAsync();
}

@Override
public boolean isInterrupted()
{
return delegate.isInterrupted();
}

private void markAsAvailable()
{
inUse.set( false );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.messaging.InitMessage;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.PullAllMessage;
import org.neo4j.driver.internal.messaging.RunMessage;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.internal.spi.StreamCollector;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;
Expand All @@ -45,6 +46,7 @@ public class SocketConnection implements Connection
{
private final Queue<Message> pendingMessages = new LinkedList<>();
private final SocketResponseHandler responseHandler;
private AtomicBoolean interrupted = new AtomicBoolean( false );

private final SocketClient socket;

Expand Down Expand Up @@ -206,4 +208,27 @@ public boolean hasUnrecoverableErrors()
{
throw new UnsupportedOperationException( "Unrecoverable error detection is not supported on SocketConnection." );
}

@Override
public void resetAsync()
{
if( interrupted.compareAndSet( false, true ) )
{
queueMessage( RESET, new StreamCollector.ResetStreamCollector( new Runnable()
{
@Override
public void run()
{
interrupted.set( false );
}
} ) );
flush();
}
}

@Override
public boolean isInterrupted()
{
return interrupted.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,4 @@ public void clearError()
{
error = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,25 @@ public boolean hasUnrecoverableErrors()
return unrecoverableErrorsOccurred;
}

@Override
public void resetAsync()
{
try
{
delegate.resetAsync();
}
catch( RuntimeException e )
{
onDelegateException( e );
}
}

@Override
public boolean isInterrupted()
{
return delegate.isInterrupted();
}

public void dispose()
{
delegate.close();
Expand All @@ -228,7 +247,7 @@ private void onDelegateException( RuntimeException e )
{
unrecoverableErrorsOccurred = true;
}
else
else if( !isInterrupted() )
{
ackFailure();
}
Expand Down
11 changes: 11 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,15 @@ public interface Connection extends AutoCloseable


boolean hasUnrecoverableErrors();

/**
* Asynchronously sending reset to the socket output channel.
*/
void resetAsync();

/**
* Return true if the current session statement execution has been interrupted by another thread, otherwise false.
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
*/
boolean isInterrupted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,22 @@ public void doneIgnored()
}
};

StreamCollector RESET = new ResetStreamCollector();

StreamCollector RESET = new NoOperationStreamCollector()
class ResetStreamCollector extends NoOperationStreamCollector
{
private final Runnable doneSuccessCallBack;

public ResetStreamCollector()
{
this( null );
}

public ResetStreamCollector( Runnable doneSuccessCallBack )
{
this.doneSuccessCallBack = doneSuccessCallBack;
}

@Override
public void doneFailure( Neo4jException error )
{
Expand All @@ -76,7 +89,17 @@ public void doneIgnored()
throw new ClientException(
"Invalid server response message `IGNORED` received for client message `RESET`." );
}
};

@Override
public void doneSuccess()
{
if( doneSuccessCallBack != null )
{
doneSuccessCallBack.run();
}
}
}


class NoOperationStreamCollector implements StreamCollector
{
Expand Down
7 changes: 7 additions & 0 deletions driver/src/main/java/org/neo4j/driver/v1/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public interface Session extends Resource, StatementRunner
*/
Transaction beginTransaction();

/**
* Reset the current session. This sends an immediate RESET signal to the server which both interrupts
* any statement that is currently executing and ignores any subsequently queued statements. Following
* the reset, the current transaction will have been rolled back and any outstanding failures will
* have been acknowledged.
*/
void reset();

/**
* Signal that you are done using this session. In the default driver usage, closing
Expand Down
108 changes: 108 additions & 0 deletions driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.util.TestNeo4j;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.v1.Values.parameters;

public class SessionIT
{
Expand Down Expand Up @@ -78,4 +86,104 @@ public void shouldHandleNullAuthToken() throws Throwable
// Then
assertFalse( session.isOpen() );
}

@Test
public void shouldKillLongRunningStatement() throws Throwable
{
neo4j.ensureProcedures( "longRunningStatement.jar" );
// Given
Driver driver = GraphDatabase.driver( neo4j.uri() );

int executionTimeout = 10; // 10s
final int killTimeout = 1; // 1s
long startTime = -1, endTime;

try( final Session session = driver.session() )
{
StatementResult result =
session.run( "CALL test.driver.longRunningStatement({seconds})",
parameters( "seconds", executionTimeout ) );

resetSessionAfterTimeout( session, killTimeout );

// When
startTime = System.currentTimeMillis();
result.consume();// blocking to run the statement

fail("Should have got an exception about statement get killed.");
}
catch( ClientException e )
{
endTime = System.currentTimeMillis();
assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") );

assertTrue( startTime > 0 );
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
}
}

@Test
public void shouldKillLongStreamingResult() throws Throwable
{
neo4j.ensureProcedures( "longRunningStatement.jar" );
// Given
Driver driver = GraphDatabase.driver( neo4j.uri() );

int executionTimeout = 10; // 10s
final int killTimeout = 1; // 1s
long startTime = -1, endTime;
int recordCount = 0;

try( final Session session = driver.session() )
{
StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})",
parameters( "seconds", executionTimeout ) );

resetSessionAfterTimeout( session, killTimeout );

// When
startTime = System.currentTimeMillis();
while( result.hasNext() )
{
result.next();
recordCount++;
}

fail("Should have got an exception about statement get killed.");
}
catch( ClientException e )
{
endTime = System.currentTimeMillis();
assertThat( e.code(), equalTo("Neo.ClientError.Procedure.ProcedureCallFailed") );
assertThat( recordCount, greaterThan(1) );

assertTrue( startTime > 0 );
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
}
}

private void resetSessionAfterTimeout( final Session session, final int timeout )
{
new Thread( new Runnable()
{
@Override
public void run()
{
try
{
Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
finally
{
session.reset(); // kill the session after timeout
}
}
} ).start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static void copyFile( File srcFile, File dstFile ) throws IOException
catch ( IOException e )
{
// Because the message from this cause may not mention which file it's about
throw new IOException( "Could not copy '" + srcFile + "' to '" + dstFile + "'", e );
throw new IOException( "Could not copy '" + srcFile.getCanonicalPath() + "' to '" + dstFile.getCanonicalPath() + "'", e );
}
finally
{
Expand Down
10 changes: 10 additions & 0 deletions driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,14 @@ public void updateEncryptionKeyAndCert( File key, File cert ) throws Exception
FileTools.copyFile( cert, Neo4jSettings.DEFAULT_TLS_CERT_FILE );
runner.forceToRestart(); // needs to force to restart as no configuration changed
}

public void ensureProcedures( String jarName ) throws IOException
{
File procedureJar = new File( Neo4jRunner.NEO4J_HOME, "plugins/" + jarName );
if( !procedureJar.exists() )
{
FileTools.copyFile( new File( "src/test/resources", jarName ), procedureJar );
runner.forceToRestart(); // needs to force to restart as no configuration changed
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public Transaction beginTransaction()
return realSession.beginTransaction();
}

@Override
public void reset()
{
realSession.reset();
}

@Override
public StatementResult run( String statementText, Map<String,Object> statementParameters )
{
Expand Down
Binary file not shown.