Skip to content

Allow simple/async sessions to request data as needed #662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ public class AutoPullResponseHandler extends BasicPullResponseHandler implements
{
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
private final long fetchSize;
private final long lowRecordWatermark;
private final long highRecordWatermark;

// initialized lazily when first record arrives
private Queue<Record> records = UNINITIALIZED_RECORDS;

private ResultSummary summary;
private Throwable failure;
private boolean isAutoPullEnabled = true;

private CompletableFuture<Record> recordFuture;
private CompletableFuture<ResultSummary> summaryFuture;
Expand All @@ -64,6 +67,19 @@ public AutoPullResponseHandler(Query query, RunResponseHandler runResponseHandle
{
super(query, runResponseHandler, connection, metadataExtractor, completionListener );
this.fetchSize = fetchSize;

//For pull everything ensure conditions for disabling auto pull are never met
if ( fetchSize == UNLIMITED_FETCH_SIZE )
{
this.highRecordWatermark = Long.MAX_VALUE;
this.lowRecordWatermark = Long.MAX_VALUE;
}
else
{
this.highRecordWatermark = (long) (fetchSize * 0.7);
this.lowRecordWatermark = (long) (fetchSize * 0.3);
}

installRecordAndSummaryConsumers();
}

Expand Down Expand Up @@ -96,7 +112,10 @@ private void installRecordAndSummaryConsumers()

if ( error == null && summary == null ) // has_more
{
request( fetchSize );
if ( isAutoPullEnabled )
{
request( fetchSize );
}
}
} );
}
Expand Down Expand Up @@ -199,11 +218,29 @@ private void enqueueRecord( Record record )
}

records.add( record );

// too many records in the queue, pause auto request gathering
if ( records.size() > highRecordWatermark )
{
isAutoPullEnabled = false;
}
}

private Record dequeueRecord()
{
return records.poll();
Record record = records.poll();

if ( records.size() <= lowRecordWatermark )
{
//if not in streaming state we need to restart streaming
if ( state() != State.STREAMING_STATE )
{
request( fetchSize );
}
isAutoPullEnabled = true;
}

return record;
}

private <T> List<T> recordsAsList( Function<Record,T> mapFunction )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.test.StepVerifier;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
Expand All @@ -38,16 +39,18 @@
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.util.StubServer;

import static java.util.Arrays.asList;
Expand All @@ -72,6 +75,7 @@
import static org.neo4j.driver.util.StubServer.INSECURE_CONFIG;
import static org.neo4j.driver.util.StubServer.insecureBuilder;
import static org.neo4j.driver.util.TestUtil.asOrderedSet;
import static org.neo4j.driver.util.TestUtil.await;

class DirectDriverBoltKitTest
{
Expand Down Expand Up @@ -306,6 +310,75 @@ void shouldChangeFetchSize() throws Exception
}
}

@Test
void shouldOnlyPullRecordsWhenNeededSimpleSession() throws Exception
{
StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 );
try
{
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
{
Session session = driver.session( builder().withFetchSize( 2 ).build() );
Result result = session.run( "MATCH (n) RETURN n.name" );
ArrayList<String> resultList = new ArrayList<>();
result.forEachRemaining( ( rec ) -> resultList.add( rec.get( 0 ).asString() ) );

assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) );
}
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

@Test
void shouldOnlyPullRecordsWhenNeededAsyncSession() throws Exception
{
StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 );
try
{
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
{
AsyncSession session = driver.asyncSession( builder().withFetchSize( 2 ).build() );

ArrayList<String> resultList = new ArrayList<>();

await( session.runAsync( "MATCH (n) RETURN n.name" )
.thenCompose( resultCursor ->
resultCursor.forEachAsync( record -> resultList.add( record.get( 0 ).asString() ) ) ) );

assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) );
}
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

@Test
void shouldPullAllRecordsOnListAsyncWhenOverWatermark() throws Exception
{
StubServer server = StubServer.start( "streaming_records_v4_list_async.script", 9001 );
try
{
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
{
AsyncSession session = driver.asyncSession( builder().withFetchSize( 10 ).build() );

ResultCursor cursor = await( session.runAsync( "MATCH (n) RETURN n.name" ) );
List<String> records = await( cursor.listAsync( record -> record.get( 0 ).asString() ) );

assertEquals( records, asList( "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L" ) );
}
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

@Test
void shouldAllowPullAll() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
Expand All @@ -35,6 +37,7 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.value.BooleanValue;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.summary.QueryType;

Expand Down Expand Up @@ -679,7 +682,6 @@ void shouldReturnNotTransformedListInListAsync()

assertEquals( expectedRecords, list );
}

protected T newHandler()
{
return newHandler( new Query( "RETURN 1" ) );
Expand Down
Loading