Skip to content

Mute ACK_FAILURE when sending RESET #417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
@Override
public CompletionStage<Void> verifyConnectivity()
{
return acquireConnection( READ ).thenCompose( Connection::forceRelease );
return acquireConnection( READ ).thenCompose( Connection::releaseNow );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private BiConsumer<Void,Throwable> transactionClosed( State newState )
{
// todo: test that this state transition always happens when commit or rollback
state = newState;
connection.release();
connection.releaseInBackground();
session.setBookmark( bookmark );
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,12 @@ private CompletionStage<Void> forceReleaseConnection()
{
if ( connection != null )
{
return connection.forceRelease();
return connection.releaseNow();
}
return completedFuture( null );
} ).exceptionally( error ->
{
// todo: this log message looks wrong, should it go to #rollbackTransaction() ?
logger.error( "Failed to rollback active transaction", error );
return null;
} );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import java.util.Map;

import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler;
import org.neo4j.driver.internal.handlers.InitResponseHandler;
import org.neo4j.driver.internal.messaging.InitMessage;
import org.neo4j.driver.v1.Value;

Expand Down Expand Up @@ -54,7 +54,7 @@ public void operationComplete( ChannelFuture future )
Channel channel = future.channel();

InitMessage message = new InitMessage( userAgent, authToken );
AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise );
InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise );

messageDispatcher( channel ).queue( handler );
channel.writeAndFlush( message, channel.voidPromise() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.PullAllMessage;
import org.neo4j.driver.internal.messaging.ResetMessage;
Expand Down Expand Up @@ -61,6 +62,12 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
this.clock = clock;
}

@Override
public boolean isInUse()
{
return state.isInUse();
}

@Override
public boolean tryMarkInUse()
{
Expand Down Expand Up @@ -99,29 +106,22 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
run( statement, parameters, runHandler, pullAllHandler, true );
}

// TODO change this to return future or say that it does stuff in the background in the method name?
@Override
public void release()
public void releaseInBackground()
{
if ( state.release() )
{
reset( new ReleaseChannelHandler( channel, channelPool, clock ) );
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
}
}

@Override
public boolean isInUse()
{
return state.isInUse();
}

@Override
public CompletionStage<Void> forceRelease()
public CompletionStage<Void> releaseNow()
{
if ( state.forceRelease() )
{
Promise<Void> releasePromise = channel.eventLoop().newPromise();
reset( new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ) );
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
return asCompletionStage( releasePromise );
}
else
Expand Down Expand Up @@ -151,7 +151,11 @@ private void run( String statement, Map<String,Value> parameters, ResponseHandle

private void reset( ResponseHandler resetHandler )
{
writeAndFlushMessageInEventLoop( ResetMessage.RESET, resetHandler );
channel.eventLoop().execute( () ->
{
messageDispatcher.muteAckFailure();
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
} );
}

private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2,
Expand All @@ -160,11 +164,6 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
channel.eventLoop().execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) );
}

private void writeAndFlushMessageInEventLoop( Message message, ResponseHandler handler )
{
channel.eventLoop().execute( () -> writeAndFlushMessage( message, handler ) );
}

private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
boolean flush )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
}

@Override
public void release()
public void releaseInBackground()
{
delegate.release();
delegate.releaseInBackground();
}

@Override
Expand All @@ -88,9 +88,9 @@ public boolean isInUse()
}

@Override
public CompletionStage<Void> forceRelease()
public CompletionStage<Void> releaseNow()
{
return delegate.forceRelease();
return delegate.releaseNow();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Queue;

import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
import org.neo4j.driver.internal.messaging.AckFailureMessage;
import org.neo4j.driver.internal.messaging.MessageHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ErrorUtil;
Expand All @@ -35,6 +34,7 @@
import org.neo4j.driver.v1.Value;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;

public class InboundMessageDispatcher implements MessageHandler
{
Expand All @@ -44,6 +44,7 @@ public class InboundMessageDispatcher implements MessageHandler

private Throwable currentError;
private boolean fatalErrorOccurred;
private boolean ackFailureMuted;

public InboundMessageDispatcher( Channel channel, Logging logging )
{
Expand Down Expand Up @@ -133,9 +134,8 @@ public void handleFailureMessage( String code, String message )
log.debug( "Received FAILURE message with code '%s' and message '%s'", code, message );
currentError = ErrorUtil.newNeo4jError( code, message );

// queue ACK_FAILURE before notifying the next response handler
queue( new AckFailureResponseHandler( this ) );
channel.writeAndFlush( AckFailureMessage.ACK_FAILURE, channel.voidPromise() );
// try to write ACK_FAILURE before notifying the next response handler
ackFailureIfNeeded();

ResponseHandler handler = handlers.remove();
handler.onFailure( currentError );
Expand Down Expand Up @@ -179,4 +179,43 @@ public Throwable currentError()
{
return currentError;
}

/**
* Makes this message dispatcher not send ACK_FAILURE in response to FAILURE until it's un-muted using
* {@link #unMuteAckFailure()}. Muting ACK_FAILURE is needed <b>only</b> when sending RESET message. RESET "jumps"
* over all queued messages on server and makes them fail. Received failures do not need to be acknowledge because
* RESET moves server's state machine to READY state.
* <p>
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
*/
public void muteAckFailure()
{
ackFailureMuted = true;
}

/**
* Makes this message dispatcher send ACK_FAILURE in response to FAILURE. Should be used in combination with
* {@link #muteAckFailure()} when sending RESET message.
* <p>
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
*
* @throws IllegalStateException if ACK_FAILURE is not muted right now.
*/
public void unMuteAckFailure()
{
if ( !ackFailureMuted )
{
throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" );
}
ackFailureMuted = false;
}

private void ackFailureIfNeeded()
{
if ( !ackFailureMuted )
{
queue( new AckFailureResponseHandler( this ) );
channel.writeAndFlush( ACK_FAILURE, channel.voidPromise() );
}
}
}

This file was deleted.

Loading