Skip to content

Fixed Session#reset() to only send a RESET message #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.neo4j.driver.internal.async.QueryRunner;
Expand Down Expand Up @@ -60,31 +59,25 @@ public class ExplicitTransaction implements Transaction
private enum State
{
/** The transaction is running with no explicit success or failure marked */
ACTIVE( true ),
ACTIVE,

/** Running, user marked for success, meaning it'll value committed */
MARKED_SUCCESS( true ),
MARKED_SUCCESS,

/** User marked as failed, meaning it'll be rolled back. */
MARKED_FAILED( true ),
MARKED_FAILED,

/**
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a
* fatal connection error.
*/
TERMINATED( false ),
TERMINATED,

/** This transaction has successfully committed */
COMMITTED( false ),
COMMITTED,

/** This transaction has been rolled back */
ROLLED_BACK( false );

final boolean txOpen;

State( boolean txOpen )
{
this.txOpen = txOpen;
}
ROLLED_BACK
}

private final Connection connection;
Expand Down Expand Up @@ -158,7 +151,7 @@ CompletionStage<Void> closeAsync()
{
return commitAsync();
}
else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED )
else if ( state != State.COMMITTED && state != State.ROLLED_BACK )
{
return rollbackAsync();
}
Expand All @@ -181,14 +174,14 @@ else if ( state == State.ROLLED_BACK )
}
else if ( state == State.TERMINATED )
{
return failedFuture(
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
transactionClosed( State.ROLLED_BACK );
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
}
else
{
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
.whenComplete( transactionClosed( State.COMMITTED ) );
.whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) );
}
}

Expand All @@ -205,15 +198,15 @@ else if ( state == State.ROLLED_BACK )
}
else if ( state == State.TERMINATED )
{
// transaction has been terminated by RESET and should be rolled back by the database
state = State.ROLLED_BACK;
// no need for explicit rollback, transaction should've been rolled back by the database
transactionClosed( State.ROLLED_BACK );
return completedWithNull();
}
else
{
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
.whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) );
}
}

Expand Down Expand Up @@ -314,15 +307,14 @@ else if ( state == State.MARKED_FAILED )
}
else if ( state == State.TERMINATED )
{
throw new ClientException(
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
throw new ClientException( "Cannot run more statements in this transaction, it has been terminated" );
}
}

@Override
public boolean isOpen()
{
return state.txOpen;
return state != State.COMMITTED && state != State.ROLLED_BACK;
}

@Override
Expand Down Expand Up @@ -394,14 +386,11 @@ else if ( commitOrRollbackError != null )
};
}

private BiConsumer<Object,Throwable> transactionClosed( State newState )
private void transactionClosed( State newState )
{
return ( ignore, error ) ->
{
state = newState;
connection.release(); // release in background
session.setBookmark( bookmark );
};
state = newState;
connection.release(); // release in background
session.setBookmark( bookmark );
}

private void terminateConnectionOnThreadInterrupt( String reason )
Expand Down
47 changes: 38 additions & 9 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.types.TypeSystem;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.v1.Values.value;
Expand Down Expand Up @@ -257,13 +258,24 @@ public void reset()

private CompletionStage<Void> resetAsync()
{
return existingTransactionOrNull().thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} ).thenCompose( ignore -> releaseConnection() );
return existingTransactionOrNull()
.thenAccept( tx ->
{
if ( tx != null )
{
tx.markTerminated();
}
} )
.thenCompose( ignore -> connectionStage )
.thenCompose( connection ->
{
if ( connection != null )
{
// there exists an active connection, send a RESET message over it
return connection.reset();
}
return completedWithNull();
} );
}

@Override
Expand Down Expand Up @@ -436,15 +448,32 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
{
ensureSessionIsOpen();

transactionStage = ensureNoOpenTxBeforeStartingTx()
// create a chain that acquires connection and starts a transaction
CompletionStage<ExplicitTransaction> newTransactionStage = ensureNoOpenTxBeforeStartingTx()
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection ->
{
ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
return tx.beginAsync( bookmark );
} );

return transactionStage;
// update the reference to the only known transaction
CompletionStage<ExplicitTransaction> currentTransactionStage = transactionStage;

transactionStage = newTransactionStage
.exceptionally( error -> null ) // ignore errors from starting new transaction
.thenCompose( tx ->
{
if ( tx == null )
{
// failed to begin new transaction, keep reference to the existing one
return currentTransactionStage;
}
// new transaction started, keep reference to it
return completedFuture( tx );
} );

return newTransactionStage;
}

private CompletionStage<Connection> acquireConnection( AccessMode mode )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.PullAllMessage;
Expand All @@ -39,6 +40,7 @@
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Value;

import static java.util.Collections.emptyMap;
import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason;

public class NettyConnection implements Connection
Expand Down Expand Up @@ -108,15 +110,24 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
}
}

@Override
public CompletionStage<Void> reset()
{
CompletableFuture<Void> result = new CompletableFuture<>();
ResetResponseHandler handler = new ResetResponseHandler( messageDispatcher, result );
writeResetMessageIfNeeded( handler, true );
return result;
}

@Override
public CompletionStage<Void> release()
{
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
{
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
setAutoRead( true );
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
channelPool, messageDispatcher, clock, releaseFuture );

reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
writeResetMessageIfNeeded( handler, false );
}
return releaseFuture;
}
Expand Down Expand Up @@ -152,12 +163,21 @@ private void run( String statement, Map<String,Value> parameters, ResponseHandle
pullAllHandler, flush );
}

private void reset( ResponseHandler resetHandler )
private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean isSessionReset )
{
channel.eventLoop().execute( () ->
{
messageDispatcher.muteAckFailure();
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
if ( isSessionReset && !isOpen() )
{
resetHandler.onSuccess( emptyMap() );
}
else
{
messageDispatcher.muteAckFailure();
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
setAutoRead( true );
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
}
} );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
newRoutingResponseHandler( pullAllHandler ) );
}

@Override
public CompletionStage<Void> reset()
{
return delegate.reset();
}

@Override
public boolean isOpen()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
Expand Down Expand Up @@ -157,14 +158,24 @@ public void handleIgnoredMessage()
log.debug( "S: IGNORED" );

ResponseHandler handler = handlers.remove();

Throwable error;
if ( currentError != null )
{
handler.onFailure( currentError );
error = currentError;
}
else if ( ackFailureMuted )
{
error = new ClientException( "Database ignored the request because session has been reset" );
}
else
{
log.warn( "Received IGNORED message for handler %s but error is missing", handler );
log.warn( "Received IGNORED message for handler %s but error is missing and RESET is not in progress. " +
"Current handlers %s", handler, handlers );

error = new ClientException( "Database ignored the request" );
}
handler.onFailure( error );
}

public void handleFatalError( Throwable error )
Expand Down Expand Up @@ -212,15 +223,9 @@ public void muteAckFailure()
* {@link #muteAckFailure()} when sending RESET message.
* <p>
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
*
* @throws IllegalStateException if ACK_FAILURE is not muted right now.
*/
public void unMuteAckFailure()
{
if ( !ackFailureMuted )
{
throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" );
}
ackFailureMuted = false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.handlers;

import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;

import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.util.Clock;

import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp;

public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
{
private final Channel channel;
private final ChannelPool pool;
private final Clock clock;

public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool,
InboundMessageDispatcher messageDispatcher, Clock clock, CompletableFuture<Void> releaseFuture )
{
super( messageDispatcher, releaseFuture );
this.channel = channel;
this.pool = pool;
this.clock = clock;
}

@Override
protected void resetCompleted( CompletableFuture<Void> completionFuture )
{
setLastUsedTimestamp( channel, clock.millis() );

Future<Void> released = pool.release( channel );
released.addListener( ignore -> completionFuture.complete( null ) );
}
}
Loading