Skip to content

Backpressure for reading records #438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class NettyConnection implements Connection
private final Clock clock;

private final AtomicBoolean open = new AtomicBoolean( true );
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );

public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
{
Expand All @@ -72,8 +71,7 @@ public boolean isOpen()
@Override
public void enableAutoRead()
{
assertOpen();
if ( autoReadEnabled.compareAndSet( false, true ) )
if ( isOpen() )
{
setAutoRead( true );
}
Expand All @@ -82,8 +80,7 @@ public void enableAutoRead()
@Override
public void disableAutoRead()
{
assertOpen();
if ( autoReadEnabled.compareAndSet( true, false ) )
if ( isOpen() )
{
setAutoRead( false );
}
Expand All @@ -110,6 +107,9 @@ public CompletionStage<Void> release()
{
if ( open.compareAndSet( true, false ) )
{
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
setAutoRead( true );

reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
}
return releaseFuture;
Expand Down Expand Up @@ -180,7 +180,7 @@ private void setAutoRead( boolean value )

private void assertOpen()
{
if ( !open.get() )
if ( !isOpen() )
{
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.neo4j.driver.internal.handlers;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,36 +27,27 @@
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.summary.InternalNotification;
import org.neo4j.driver.internal.summary.InternalPlan;
import org.neo4j.driver.internal.summary.InternalProfiledPlan;
import org.neo4j.driver.internal.summary.InternalResultSummary;
import org.neo4j.driver.internal.summary.InternalServerInfo;
import org.neo4j.driver.internal.summary.InternalSummaryCounters;
import org.neo4j.driver.internal.util.MetadataUtil;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.summary.Notification;
import org.neo4j.driver.v1.summary.Plan;
import org.neo4j.driver.v1.summary.ProfiledPlan;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.summary.StatementType;

import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

// todo: unit tests
public abstract class PullAllResponseHandler implements ResponseHandler
{
private static final boolean TOUCH_AUTO_READ = false;
static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );

private final Statement statement;
private final RunResponseHandler runResponseHandler;
protected final Connection connection;

private final Queue<Record> records = new LinkedList<>();
private final Queue<Record> records = new ArrayDeque<>();

private boolean finished;
private Throwable failure;
Expand Down Expand Up @@ -210,25 +199,31 @@ else if ( finished )
private void queueRecord( Record record )
{
records.add( record );
if ( TOUCH_AUTO_READ )

boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null;
// when summary or failure is requested we have to buffer all remaining records and then return summary/failure
// do not disable auto-read in this case, otherwise records will not be consumed and trailing
// SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure
if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK )
{
if ( records.size() > 10_000 )
{
connection.disableAutoRead();
}
// more than high watermark records are already queued, tell connection to stop auto-reading from network
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
// fetched from network faster than consumed
connection.disableAutoRead();
}
}

private Record dequeueRecord()
{
Record record = records.poll();
if ( TOUCH_AUTO_READ )

if ( records.size() < RECORD_BUFFER_LOW_WATERMARK )
{
if ( record != null && records.size() < 100 )
{
connection.enableAutoRead();
}
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
// and populate queue with new records from network
connection.enableAutoRead();
}

return record;
}

Expand Down Expand Up @@ -302,89 +297,7 @@ private boolean completeFailureFuture( Throwable error )

private ResultSummary extractResultSummary( Map<String,Value> metadata )
{
InternalServerInfo serverInfo = new InternalServerInfo( connection.serverAddress(),
connection.serverVersion() );
return new InternalResultSummary( statement, serverInfo, extractStatementType( metadata ),
extractCounters( metadata ), extractPlan( metadata ), extractProfiledPlan( metadata ),
extractNotifications( metadata ), runResponseHandler.resultAvailableAfter(),
extractResultConsumedAfter( metadata ) );
}

private static StatementType extractStatementType( Map<String,Value> metadata )
{
Value typeValue = metadata.get( "type" );
if ( typeValue != null )
{
return StatementType.fromCode( typeValue.asString() );
}
return null;
}

private static InternalSummaryCounters extractCounters( Map<String,Value> metadata )
{
Value countersValue = metadata.get( "stats" );
if ( countersValue != null )
{
return new InternalSummaryCounters(
counterValue( countersValue, "nodes-created" ),
counterValue( countersValue, "nodes-deleted" ),
counterValue( countersValue, "relationships-created" ),
counterValue( countersValue, "relationships-deleted" ),
counterValue( countersValue, "properties-set" ),
counterValue( countersValue, "labels-added" ),
counterValue( countersValue, "labels-removed" ),
counterValue( countersValue, "indexes-added" ),
counterValue( countersValue, "indexes-removed" ),
counterValue( countersValue, "constraints-added" ),
counterValue( countersValue, "constraints-removed" )
);
}
return null;
}

private static int counterValue( Value countersValue, String name )
{
Value value = countersValue.get( name );
return value.isNull() ? 0 : value.asInt();
}

private static Plan extractPlan( Map<String,Value> metadata )
{
Value planValue = metadata.get( "plan" );
if ( planValue != null )
{
return InternalPlan.EXPLAIN_PLAN_FROM_VALUE.apply( planValue );
}
return null;
}

private static ProfiledPlan extractProfiledPlan( Map<String,Value> metadata )
{
Value profiledPlanValue = metadata.get( "profile" );
if ( profiledPlanValue != null )
{
return InternalProfiledPlan.PROFILED_PLAN_FROM_VALUE.apply( profiledPlanValue );
}
return null;
}

private static List<Notification> extractNotifications( Map<String,Value> metadata )
{
Value notificationsValue = metadata.get( "notifications" );
if ( notificationsValue != null )
{
return notificationsValue.asList( InternalNotification.VALUE_TO_NOTIFICATION );
}
return Collections.emptyList();
}

private static long extractResultConsumedAfter( Map<String,Value> metadata )
{
Value resultConsumedAfterValue = metadata.get( "result_consumed_after" );
if ( resultConsumedAfterValue != null )
{
return resultConsumedAfterValue.asLong();
}
return -1;
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.neo4j.driver.internal.handlers;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,13 +26,15 @@
import org.neo4j.driver.v1.Value;

import static java.util.Collections.emptyList;
import static org.neo4j.driver.internal.util.MetadataUtil.extractResultAvailableAfter;
import static org.neo4j.driver.internal.util.MetadataUtil.extractStatementKeys;

public class RunResponseHandler implements ResponseHandler
{
private final CompletableFuture<Void> runCompletedFuture;

private List<String> statementKeys = emptyList();
private long resultAvailableAfter;
private long resultAvailableAfter = -1;

public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
{
Expand All @@ -43,7 +44,7 @@ public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
@Override
public void onSuccess( Map<String,Value> metadata )
{
statementKeys = extractKeys( metadata );
statementKeys = extractStatementKeys( metadata );
resultAvailableAfter = extractResultAvailableAfter( metadata );

completeRunFuture();
Expand Down Expand Up @@ -80,33 +81,4 @@ private void completeRunFuture()
{
runCompletedFuture.complete( null );
}

private static List<String> extractKeys( Map<String,Value> metadata )
{
Value keysValue = metadata.get( "fields" );
if ( keysValue != null )
{
if ( !keysValue.isEmpty() )
{
List<String> keys = new ArrayList<>( keysValue.size() );
for ( Value value : keysValue.values() )
{
keys.add( value.asString() );
}

return keys;
}
}
return emptyList();
}

private static long extractResultAvailableAfter( Map<String,Value> metadata )
{
Value resultAvailableAfterValue = metadata.get( "result_available_after" );
if ( resultAvailableAfterValue != null )
{
return resultAvailableAfterValue.asLong();
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.summary;

import java.util.Objects;

import org.neo4j.driver.v1.summary.InputPosition;

/**
Expand Down Expand Up @@ -61,6 +63,29 @@ public int column()
return column;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
InternalInputPosition that = (InternalInputPosition) o;
return offset == that.offset &&
line == that.line &&
column == that.column;
}

@Override
public int hashCode()
{
return Objects.hash( offset, line, column );
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ public List<Notification> notifications()
@Override
public long resultAvailableAfter( TimeUnit unit )
{
return unit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS );
return resultAvailableAfter == -1 ? resultAvailableAfter
: unit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS );
}

@Override
public long resultConsumedAfter( TimeUnit unit )
{
return unit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS );
return resultConsumedAfter == -1 ? resultConsumedAfter
: unit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,19 @@

public class InternalServerInfo implements ServerInfo
{
private final BoltServerAddress address;
private final String address;
private final String version;

public InternalServerInfo( BoltServerAddress address, ServerVersion version )
{
this( address, version.toString() );
}

public InternalServerInfo( BoltServerAddress address, String version )
{
this.address = address;
this.version = version;
}

public BoltServerAddress boltServerAddress()
{
return this.address;
this.address = address.toString();
this.version = version.toString();
}

@Override
public String address()
{
return this.address.toString();
return address;
}

@Override
Expand Down
Loading