Skip to content

Make sessioin.reset thread-safe #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
}

@Override
public StatementResult run( Statement statement )
public synchronized StatementResult run( Statement statement )
{
ensureNotFailed();

Expand Down Expand Up @@ -217,7 +217,7 @@ public TypeSystem typeSystem()
return InternalTypeSystem.TYPE_SYSTEM;
}

public void markToClose()
public synchronized void markToClose()
{
state = State.FAILED;
}
Expand Down
19 changes: 19 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@ public StatementResult run( Statement statement )

public void reset()
{
ensureSessionIsOpen();
ensureNoUnrecoverableError();
ensureConnectionIsOpen();

if( currentTransaction != null )
{
currentTransaction.markToClose();
}
connection.resetAsync();
}

Expand Down Expand Up @@ -202,13 +207,15 @@ public TypeSystem typeSystem()

private void ensureConnectionIsValidBeforeRunningSession()
{
ensureSessionIsOpen();
ensureNoUnrecoverableError();
ensureNoOpenTransactionBeforeRunningSession();
ensureConnectionIsOpen();
}

private void ensureConnectionIsValidBeforeOpeningTransaction()
{
ensureSessionIsOpen();
ensureNoUnrecoverableError();
ensureNoOpenTransactionBeforeOpeningTransaction();
ensureConnectionIsOpen();
Expand Down Expand Up @@ -263,4 +270,16 @@ private void ensureConnectionIsOpen()
"Please close this session and retry your statement in another new session." );
}
}

private void ensureSessionIsOpen()
{
if( !isOpen() )
{
throw new ClientException(
"No more interaction with this session is allowed " +
"as the current session is already closed or marked as closed. " +
"You get this error either because you have a bad reference to a session that has already be closed " +
"or you are trying to reuse a session that you have called `reset` on it." );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void sync()
}

@Override
public void flush()
public synchronized void flush()
{
try
{
Expand Down Expand Up @@ -180,7 +180,7 @@ else if ( e instanceof SocketTimeoutException )
}
}

private void queueMessage( Message msg, Collector collector )
private synchronized void queueMessage( Message msg, Collector collector )
{
pendingMessages.add( msg );
responseHandler.appendResultCollector( collector );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
*/
package org.neo4j.driver.internal.net;

import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.neo4j.driver.internal.messaging.MessageHandler;
import org.neo4j.driver.internal.spi.Collector;
Expand All @@ -39,7 +39,7 @@

public class SocketResponseHandler implements MessageHandler
{
private final Queue<Collector> collectors = new LinkedList<>();
private final Queue<Collector> collectors = new ConcurrentLinkedQueue<>();

/** If a failure occurs, the error gets stored here */
private Neo4jException error;
Expand Down
86 changes: 76 additions & 10 deletions driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@
import org.junit.Rule;
import org.junit.Test;

import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.*;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.util.TestNeo4j;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -105,7 +101,8 @@ public void shouldKillLongRunningStatement() throws Throwable
final int killTimeout = 1; // 1s
long startTime = -1, endTime;

try( final Session session = driver.session() )
final Session session = driver.session();
try
{
StatementResult result =
session.run( "CALL test.driver.longRunningStatement({seconds})",
Expand All @@ -123,13 +120,17 @@ public void shouldKillLongRunningStatement() throws Throwable
{
endTime = System.currentTimeMillis();
assertTrue( startTime > 0 );
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
}
catch ( Exception e )
{
fail( "Should be a Neo4jException" );
}
finally
{
session.close();
}
}

@Test
Expand Down Expand Up @@ -168,7 +169,7 @@ public void shouldKillLongStreamingResult() throws Throwable
assertThat( recordCount, greaterThan(1) );

assertTrue( startTime > 0 );
assertTrue( endTime - startTime > killTimeout * 1000 ); // get killed by session.kill
assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset
assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished
}
}
Expand All @@ -190,9 +191,74 @@ public void run()
}
finally
{
session.reset(); // kill the session after timeout
session.reset(); // reset the session after timeout
}
}
} ).start();
}

@Test
public void shouldAllowMoreStatementAfterSessionReset()
{
// Given
try( Driver driver = GraphDatabase.driver( neo4j.uri() );
Session session = driver.session() )
{

session.run( "Return 1" ).consume();

// When reset the state of this session
session.reset();

// Then can run successfully more statements without any error
session.run( "Return 2" ).consume();
}
}

@Test
public void shouldAllowMoreTxAfterSessionReset()
{
// Given
try( Driver driver = GraphDatabase.driver( neo4j.uri() );
Session session = driver.session() )
{
try( Transaction tx = session.beginTransaction() )
{
tx.run("Return 1");
tx.success();
}

// When reset the state of this session
session.reset();

// Then can run more Tx
try( Transaction tx = session.beginTransaction() )
{
tx.run("Return 2");
tx.success();
}
}
}

@Test
public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset()
{
// Given
try( Driver driver = GraphDatabase.driver( neo4j.uri() );
Session session = driver.session() )
{
try( Transaction tx = session.beginTransaction() )
{
// When reset the state of this session
session.reset();
// Then
tx.run( "Return 1" );
fail( "Should not allow tx run as tx is already failed." );
}
catch( Exception e )
{
assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) );
}
}
}
}