Skip to content

Automatically close on session.reset() #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ public class NetworkSession implements Session
@Override
public void run()
{
if ( currentTransaction != null )
synchronized ( NetworkSession.this )
{
lastBookmark = currentTransaction.bookmark();
currentTransaction = null;
if ( currentTransaction != null )
{
lastBookmark = currentTransaction.bookmark();
currentTransaction = null;
}
}
}
};
Expand All @@ -73,9 +76,9 @@ public StatementResult run( String statementText )
}

@Override
public StatementResult run( String statementText, Map<String, Object> statementParameters )
public StatementResult run( String statementText, Map<String,Object> statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value(statementParameters);
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
return run( statementText, params );
}

Expand All @@ -97,21 +100,24 @@ public StatementResult run( Statement statement )
{
ensureConnectionIsValidBeforeRunningSession();
InternalStatementResult cursor = new InternalStatementResult( connection, null, statement );
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), cursor.runResponseCollector() );
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ),
cursor.runResponseCollector() );
connection.pullAll( cursor.pullAllResponseCollector() );
connection.flush();
return cursor;
}

public void reset()
public synchronized void reset()
{
ensureSessionIsOpen();
ensureNoUnrecoverableError();
ensureConnectionIsOpen();

if( currentTransaction != null )
if ( currentTransaction != null )
{
currentTransaction.markToClose();
lastBookmark = currentTransaction.bookmark();
currentTransaction = null;
}
connection.resetAsync();
}
Expand All @@ -126,21 +132,24 @@ public boolean isOpen()
public void close()
{
// Use atomic operation to protect from closing the connection twice (putting back to the pool twice).
if( !isOpen.compareAndSet( true, false ) )
if ( !isOpen.compareAndSet( true, false ) )
{
throw new ClientException( "This session has already been closed." );
}
else
{
if ( currentTransaction != null )
synchronized ( this )
{
try
{
currentTransaction.close();
}
catch ( Throwable e )
if ( currentTransaction != null )
{
// Best-effort
try
{
currentTransaction.close();
}
catch ( Throwable e )
{
// Best-effort
}
}
}
try
Expand All @@ -167,7 +176,7 @@ public Transaction beginTransaction()
}

@Override
public Transaction beginTransaction( String bookmark )
public synchronized Transaction beginTransaction( String bookmark )
{
ensureConnectionIsValidBeforeOpeningTransaction();
currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark );
Expand Down Expand Up @@ -224,7 +233,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction()
@Override
protected void finalize() throws Throwable
{
if( isOpen.compareAndSet( true, false ) )
if ( isOpen.compareAndSet( true, false ) )
{
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
"method on Sessions before disposing of the objects.", null );
Expand All @@ -235,14 +244,15 @@ protected void finalize() throws Throwable

private void ensureNoUnrecoverableError()
{
if( connection.hasUnrecoverableErrors() )
if ( connection.hasUnrecoverableErrors() )
{
throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " +
"has happened. Please close the current session and re-run your statement in a" +
" new session." );
}
}

//should be called from a synchronized block
private void ensureNoOpenTransactionBeforeRunningSession()
{
if ( currentTransaction != null )
Expand All @@ -252,6 +262,7 @@ private void ensureNoOpenTransactionBeforeRunningSession()
}
}

//should be called from a synchronized block
private void ensureNoOpenTransactionBeforeOpeningTransaction()
{
if ( currentTransaction != null )
Expand All @@ -273,12 +284,13 @@ private void ensureConnectionIsOpen()

private void ensureSessionIsOpen()
{
if( !isOpen() )
if ( !isOpen() )
{
throw new ClientException(
"No more interaction with this session is allowed " +
"as the current session is already closed or marked as closed. " +
"You get this error either because you have a bad reference to a session that has already be closed " +
"You get this error either because you have a bad reference to a session that has already be " +
"closed " +
"or you are trying to reuse a session that you have called `reset` on it." );
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ public void resetAsync()
}

@Override
public boolean isInterrupted()
public boolean isAckFailureMuted()
{
return delegate.isInterrupted();
return delegate.isAckFailureMuted();
}

private void markAsAvailable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class SocketConnection implements Connection
{
private final Queue<Message> pendingMessages = new LinkedList<>();
private final SocketResponseHandler responseHandler;
private AtomicBoolean interrupted = new AtomicBoolean( false );
private AtomicBoolean isInterrupted = new AtomicBoolean( false );
private AtomicBoolean isAckFailureMuted = new AtomicBoolean( false );
private final Collector.InitCollector initCollector = new Collector.InitCollector();

private final SocketClient socket;
Expand Down Expand Up @@ -115,6 +116,8 @@ public void sync()
@Override
public synchronized void flush()
{
ensureNotInterrupted();

try
{
socket.send( pendingMessages );
Expand All @@ -126,6 +129,29 @@ public synchronized void flush()
}
}

private void ensureNotInterrupted()
{
try
{
if( isInterrupted.get() )
{
// receive each of it and throw error immediately
while ( responseHandler.collectorsWaiting() > 0 )
{
receiveOne();
}
}
}
catch ( Neo4jException e )
{
throw new ClientException(
"An error has occurred due to the cancellation of executing a previous statement. " +
"You received this error probably because you did not consume the result immediately after " +
"running the statement which get reset in this session.", e );
}

}

private void receiveAll()
{
try
Expand Down Expand Up @@ -159,6 +185,7 @@ private void assertNoServerFailure()
{
Neo4jException exception = responseHandler.serverFailure();
responseHandler.clearError();
isInterrupted.set( false );
throw exception;
}
}
Expand All @@ -182,6 +209,8 @@ else if ( e instanceof SocketTimeoutException )

private synchronized void queueMessage( Message msg, Collector collector )
{
ensureNotInterrupted();

pendingMessages.add( msg );
responseHandler.appendResultCollector( collector );
}
Expand Down Expand Up @@ -211,26 +240,26 @@ public boolean hasUnrecoverableErrors()
}

@Override
public void resetAsync()
public synchronized void resetAsync()
{
if( interrupted.compareAndSet( false, true ) )
queueMessage( RESET, new Collector.ResetCollector( new Runnable()
{
queueMessage( RESET, new Collector.ResetCollector( new Runnable()
@Override
public void run()
{
@Override
public void run()
{
interrupted.set( false );
}
} ) );
flush();
}
isInterrupted.set( false );
isAckFailureMuted.set( false );
}
} ) );
flush();
isInterrupted.set( true );
isAckFailureMuted.set( true );
}

@Override
public boolean isInterrupted()
public boolean isAckFailureMuted()
{
return interrupted.get();
return isAckFailureMuted.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ public void resetAsync()
}

@Override
public boolean isInterrupted()
public boolean isAckFailureMuted()
{
return delegate.isInterrupted();
return delegate.isAckFailureMuted();
}

@Override
Expand Down Expand Up @@ -260,7 +260,7 @@ private void onDelegateException( RuntimeException e )
{
unrecoverableErrorsOccurred = true;
}
else if( !isInterrupted() )
else if( !isAckFailureMuted() )
{
ackFailure();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ public interface Connection extends AutoCloseable
void resetAsync();

/**
* Return true if the current session statement execution has been interrupted by another thread, otherwise false.
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
* Return true if ack_failure message is temporarily muted as the failure message will be acked using reset instead
* @return true if no ack_failre message should be sent when ackable failures are received.
*/
boolean isInterrupted();
boolean isAckFailureMuted();

/**
* Returns the version of the server connected to.
Expand Down
Loading