Skip to content

Ensure a single handler manages channel's auto-read #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void operationComplete( ChannelFuture future )
InitMessage message = new InitMessage( userAgent, authToken );
InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise );

messageDispatcher( channel ).queue( handler );
messageDispatcher( channel ).enqueue( handler );
channel.writeAndFlush( message, channel.voidPromise() );
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
boolean flush )
{
messageDispatcher.queue( handler1 );
messageDispatcher.queue( handler2 );
messageDispatcher.enqueue( handler1 );
messageDispatcher.enqueue( handler2 );

channel.write( message1, channel.voidPromise() );

Expand All @@ -216,7 +216,7 @@ private void writeMessages( Message message1, ResponseHandler handler1, Message

private void writeAndFlushMessage( Message message, ResponseHandler handler )
{
messageDispatcher.queue( handler );
messageDispatcher.enqueue( handler );
channel.writeAndFlush( message, channel.voidPromise() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.MessageHandler;
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.v1.Logger;
Expand All @@ -48,13 +49,15 @@ public class InboundMessageDispatcher implements MessageHandler
private boolean fatalErrorOccurred;
private boolean ackFailureMuted;

private AutoReadManagingResponseHandler autoReadManagingHandler;

public InboundMessageDispatcher( Channel channel, Logging logging )
{
this.channel = requireNonNull( channel );
this.log = new ChannelActivityLogger( channel, logging, getClass() );
}

public void queue( ResponseHandler handler )
public void enqueue( ResponseHandler handler )
{
if ( fatalErrorOccurred )
{
Expand All @@ -63,6 +66,7 @@ public void queue( ResponseHandler handler )
else
{
handlers.add( handler );
updateAutoReadManagingHandlerIfNeeded( handler );
}
}

Expand Down Expand Up @@ -115,7 +119,7 @@ public void handleAckFailureMessage()
public void handleSuccessMessage( Map<String,Value> meta )
{
log.debug( "S: SUCCESS %s", meta );
ResponseHandler handler = handlers.remove();
ResponseHandler handler = removeHandler();
handler.onSuccess( meta );
}

Expand Down Expand Up @@ -148,7 +152,7 @@ public void handleFailureMessage( String code, String message )
// try to write ACK_FAILURE before notifying the next response handler
ackFailureIfNeeded();

ResponseHandler handler = handlers.remove();
ResponseHandler handler = removeHandler();
handler.onFailure( currentError );
}

Expand All @@ -157,7 +161,7 @@ public void handleIgnoredMessage()
{
log.debug( "S: IGNORED" );

ResponseHandler handler = handlers.remove();
ResponseHandler handler = removeHandler();

Throwable error;
if ( currentError != null )
Expand Down Expand Up @@ -185,7 +189,7 @@ public void handleFatalError( Throwable error )

while ( !handlers.isEmpty() )
{
ResponseHandler handler = handlers.remove();
ResponseHandler handler = removeHandler();
handler.onFailure( currentError );
}
}
Expand Down Expand Up @@ -241,12 +245,48 @@ public boolean isAckFailureMuted()
return ackFailureMuted;
}

/**
* <b>Visible for testing</b>
*/
AutoReadManagingResponseHandler autoReadManagingHandler()
{
return autoReadManagingHandler;
}

private void ackFailureIfNeeded()
{
if ( !ackFailureMuted )
{
queue( new AckFailureResponseHandler( this ) );
enqueue( new AckFailureResponseHandler( this ) );
channel.writeAndFlush( ACK_FAILURE, channel.voidPromise() );
}
}

private ResponseHandler removeHandler()
{
ResponseHandler handler = handlers.remove();
if ( autoReadManagingHandler == handler )
{
// handler that is being removed is the auto-read managing handler
// make sure this dispatcher does not hold on to a removed handler
autoReadManagingHandler = null;
}
return handler;
}

private void updateAutoReadManagingHandlerIfNeeded( ResponseHandler handler )
{
if ( handler instanceof AutoReadManagingResponseHandler )
{
if ( autoReadManagingHandler != null )
{
// there already exists a handler that manages channel's auto-read
// make it stop because new managing handler is being added and there should only be a single such handler
autoReadManagingHandler.disableAutoReadManagement();
// restore the default value of auto-read
channel.config().setAutoRead( true );
}
autoReadManagingHandler = (AutoReadManagingResponseHandler) handler;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private boolean hasBeenIdleForTooLong( Channel channel )
private Future<Boolean> ping( Channel channel )
{
Promise<Boolean> result = channel.eventLoop().newPromise();
messageDispatcher( channel ).queue( new PingResponseHandler( result, channel, log ) );
messageDispatcher( channel ).enqueue( new PingResponseHandler( result, channel, log ) );
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.MetadataUtil;
Expand All @@ -44,7 +44,7 @@
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

public abstract class PullAllResponseHandler implements ResponseHandler
public abstract class PullAllResponseHandler implements AutoReadManagingResponseHandler
{
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();

Expand All @@ -58,6 +58,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
// initialized lazily when first record arrives
private Queue<Record> records = UNINITIALIZED_RECORDS;

private boolean autoReadManagementEnabled = true;
private boolean finished;
private Throwable failure;
private ResultSummary summary;
Expand Down Expand Up @@ -129,6 +130,12 @@ public synchronized void onRecord( Value[] fields )
}
}

@Override
public synchronized void disableAutoReadManagement()
{
autoReadManagementEnabled = false;
}

public synchronized CompletionStage<Record> peekAsync()
{
Record record = records.peek();
Expand Down Expand Up @@ -209,7 +216,7 @@ else if ( finished )
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
// future will be completed with null on SUCCESS and completed with Throwable on FAILURE
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
connection.enableAutoRead();
enableAutoRead();
failureFuture = new CompletableFuture<>();
}
return failureFuture;
Expand All @@ -234,7 +241,7 @@ private void enqueueRecord( Record record )
// more than high watermark records are already queued, tell connection to stop auto-reading from network
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
// fetched from network faster than consumed
connection.disableAutoRead();
disableAutoRead();
}
}

Expand All @@ -246,7 +253,7 @@ private Record dequeueRecord()
{
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
// and populate queue with new records from network
connection.enableAutoRead();
enableAutoRead();
}

return record;
Expand Down Expand Up @@ -319,4 +326,20 @@ private ResultSummary extractResultSummary( Map<String,Value> metadata )
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata );
}

private void enableAutoRead()
{
if ( autoReadManagementEnabled )
{
connection.enableAutoRead();
}
}

private void disableAutoRead()
{
if ( autoReadManagementEnabled )
{
connection.disableAutoRead();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.spi;

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;

/**
* A type of {@link ResponseHandler handler} that manages auto-read of the underlying connection using {@link Connection#enableAutoRead()} and
* {@link Connection#disableAutoRead()}.
* <p>
* Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
* There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
* racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
* a single auto-read managing handler per connection.
*/
public interface AutoReadManagingResponseHandler extends ResponseHandler
{
/**
* Tell this handler that it should stop changing auto-read setting for the connection.
*/
void disableAutoReadManagement();
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void shouldWriteInitMessageWhenHandshakeCompleted()
listener.operationComplete( handshakeCompletedPromise );
assertTrue( channel.finish() );

verify( messageDispatcher ).queue( any( InitResponseHandler.class ) );
verify( messageDispatcher ).enqueue( any( InitResponseHandler.class ) );
Object outboundMessage = channel.readOutbound();
assertThat( outboundMessage, instanceOf( InitMessage.class ) );
InitMessage initMessage = (InitMessage) outboundMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,10 @@ private static class ThreadTrackingInboundMessageDispatcher extends InboundMessa
}

@Override
public void queue( ResponseHandler handler )
public void enqueue( ResponseHandler handler )
{
queueThreadNames.add( Thread.currentThread().getName() );
super.queue( handler );
super.enqueue( handler );
}

}
Expand Down
Loading