Skip to content

Fixed Session#reset() #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
@Override
public CompletionStage<Void> verifyConnectivity()
{
return acquireConnection( READ ).thenCompose( Connection::forceRelease );
return acquireConnection( READ ).thenCompose( Connection::releaseNow );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.StatementResultCursor;
Expand All @@ -42,7 +43,6 @@

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.v1.Values.value;
Expand All @@ -65,10 +65,9 @@ private enum State
MARKED_FAILED,

/**
* An error has occurred, transaction can no longer be used and no more messages will be sent for this
* transaction.
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
*/
FAILED,
TERMINATED,

/** This transaction has successfully committed */
COMMITTED,
Expand Down Expand Up @@ -135,17 +134,10 @@ CompletionStage<Void> closeAsync()
{
return commitAsync();
}
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED )
{
return rollbackAsync();
}
else if ( state == State.FAILED )
{
// unrecoverable error happened, transaction should've been rolled back on the server
// update state so that this transaction does not remain open
state = State.ROLLED_BACK;
return completedFuture( null );
}
else
{
return completedFuture( null );
Expand All @@ -161,7 +153,12 @@ public CompletionStage<Void> commitAsync()
}
else if ( state == State.ROLLED_BACK )
{
return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
}
else if ( state == State.TERMINATED )
{
return failedFuture(
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
}
else
{
Expand All @@ -174,12 +171,18 @@ public CompletionStage<Void> rollbackAsync()
{
if ( state == State.COMMITTED )
{
return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
}
else if ( state == State.ROLLED_BACK )
{
return completedFuture( null );
}
else if ( state == State.TERMINATED )
{
// transaction has been terminated by RESET and should be rolled back by the database
state = State.ROLLED_BACK;
return completedFuture( null );
}
else
{
return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) );
Expand All @@ -190,9 +193,8 @@ private BiConsumer<Void,Throwable> transactionClosed( State newState )
{
return ( ignore, error ) ->
{
// todo: test that this state transition always happens when commit or rollback
state = newState;
connection.release();
connection.releaseInBackground();
session.setBookmark( bookmark );
};
}
Expand Down Expand Up @@ -280,18 +282,18 @@ public StatementResult run( Statement statement )
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
{
ensureCanRunQueries();
//noinspection unchecked
return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this );
}

@Override
public boolean isOpen()
{
return state != State.COMMITTED && state != State.ROLLED_BACK;
return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED;
}

private void ensureCanRunQueries()
{
// todo: test these two new branches
if ( state == State.COMMITTED )
{
throw new ClientException( "Cannot run more statements in this transaction, it has been committed" );
Expand All @@ -300,14 +302,19 @@ else if ( state == State.ROLLED_BACK )
{
throw new ClientException( "Cannot run more statements in this transaction, it has been rolled back" );
}
else if ( state == State.FAILED || state == State.MARKED_FAILED )
else if ( state == State.MARKED_FAILED )
{
throw new ClientException(
"Cannot run more statements in this transaction, because previous statements in the " +
"transaction has failed and the transaction has been rolled back. Please start a new " +
"transaction to run another statement."
);
}
else if ( state == State.TERMINATED )
{
throw new ClientException(
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
}
}

@Override
Expand All @@ -316,21 +323,9 @@ public TypeSystem typeSystem()
return InternalTypeSystem.TYPE_SYSTEM;
}

public void resultFailed( Throwable error )
{
if ( isRecoverable( error ) )
{
failure();
}
else
{
markToClose();
}
}

public void markToClose()
public void markTerminated()
{
state = State.FAILED;
state = State.TERMINATED;
}

public Bookmark bookmark()
Expand Down
96 changes: 58 additions & 38 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void close()
getBlocking( lastResultStage
.exceptionally( error -> null )
.thenCompose( this::ensureBuffered )
.thenCompose( error -> forceReleaseResources().thenApply( ignore ->
.thenCompose( error -> releaseResources().thenApply( ignore ->
{
if ( error != null )
{
Expand All @@ -177,7 +177,7 @@ public CompletionStage<Void> closeAsync()
// todo: wait for buffered result?
if ( open.compareAndSet( true, false ) )
{
return forceReleaseResources();
return releaseResources();
}
return completedFuture( null );
}
Expand Down Expand Up @@ -254,7 +254,19 @@ public String lastBookmark()
@Override
public void reset()
{
getBlocking( forceReleaseResources() );
getBlocking( resetAsync() );
}

private CompletionStage<Void> resetAsync()
{
return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() )
.thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} );
}

@Override
Expand Down Expand Up @@ -465,41 +477,38 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
return connectionStage;
}

private CompletionStage<Void> forceReleaseResources()
private CompletionStage<Void> releaseResources()
{
return rollbackTransaction().thenCompose( ignore -> forceReleaseConnection() );
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
}

private CompletionStage<Void> rollbackTransaction()
{
return transactionStage
.exceptionally( error -> null ) // handle previous acquisition failures
.thenCompose( tx ->
{
if ( tx != null && tx.isOpen() )
{
return tx.rollbackAsync();
}
return completedFuture( null );
} );
return existingTransactionOrNull().thenCompose( tx ->
{
if ( tx != null )
{
return tx.rollbackAsync();
}
return completedFuture( null );
} ).exceptionally( error ->
{
Throwable cause = Futures.completionErrorCause( error );
logger.error( "Failed to rollback active transaction", cause );
return null;
} );
}

private CompletionStage<Void> forceReleaseConnection()
private CompletionStage<Void> releaseConnectionNow()
{
return connectionStage
.exceptionally( error -> null ) // handle previous acquisition failures
.thenCompose( connection ->
{
if ( connection != null )
{
return connection.forceRelease();
}
return completedFuture( null );
} ).exceptionally( error ->
{
logger.error( "Failed to rollback active transaction", error );
return null;
} );
return existingConnectionOrNull().thenCompose( connection ->
{
if ( connection != null )
{
return connection.releaseNow();
}
return completedFuture( null );
} );
}

private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
Expand All @@ -516,14 +525,25 @@ private CompletionStage<Void> ensureNoOpenTxBeforeStartingTx()

private CompletionStage<Void> ensureNoOpenTx( String errorMessage )
{
return transactionStage.exceptionally( error -> null )
.thenAccept( tx ->
{
if ( tx != null && tx.isOpen() )
{
throw new ClientException( errorMessage );
}
} );
return existingTransactionOrNull().thenAccept( tx ->
{
if ( tx != null )
{
throw new ClientException( errorMessage );
}
} );
}

private CompletionStage<ExplicitTransaction> existingTransactionOrNull()
{
return transactionStage
.exceptionally( error -> null ) // handle previous acquisition failures
.thenApply( tx -> tx != null && tx.isOpen() ? tx : null );
}

private CompletionStage<Connection> existingConnectionOrNull()
{
return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures
}

private void ensureSessionIsOpen()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import java.util.Map;

import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler;
import org.neo4j.driver.internal.handlers.InitResponseHandler;
import org.neo4j.driver.internal.messaging.InitMessage;
import org.neo4j.driver.v1.Value;

Expand Down Expand Up @@ -54,7 +54,7 @@ public void operationComplete( ChannelFuture future )
Channel channel = future.channel();

InitMessage message = new InitMessage( userAgent, authToken );
AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise );
InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise );

messageDispatcher( channel ).queue( handler );
channel.writeAndFlush( message, channel.voidPromise() );
Expand Down
Loading