From 001e8438989ea13bfcff07752f712d9eec096599 Mon Sep 17 00:00:00 2001 From: Gregory Woods Date: Fri, 13 Dec 2019 11:40:59 +0000 Subject: [PATCH] Allow simple/async sessions to request data as needed --- .../pulln/AutoPullResponseHandler.java | 41 ++++- .../internal/DirectDriverBoltKitTest.java | 77 ++++++++- .../PullAllResponseHandlerTestBase.java | 4 +- .../pulln/AutoPullResponseHandlerTest.java | 154 +++++++++++++++++- .../streaming_records_v4_buffering.script | 19 +++ .../streaming_records_v4_list_async.script | 23 +++ 6 files changed, 309 insertions(+), 9 deletions(-) create mode 100644 driver/src/test/resources/streaming_records_v4_buffering.script create mode 100644 driver/src/test/resources/streaming_records_v4_list_async.script diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java index 1c2c88f476..0f637d4ebe 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java @@ -49,12 +49,15 @@ public class AutoPullResponseHandler extends BasicPullResponseHandler implements { private static final Queue UNINITIALIZED_RECORDS = Iterables.emptyQueue(); private final long fetchSize; + private final long lowRecordWatermark; + private final long highRecordWatermark; // initialized lazily when first record arrives private Queue records = UNINITIALIZED_RECORDS; private ResultSummary summary; private Throwable failure; + private boolean isAutoPullEnabled = true; private CompletableFuture recordFuture; private CompletableFuture summaryFuture; @@ -64,6 +67,19 @@ public AutoPullResponseHandler(Query query, RunResponseHandler runResponseHandle { super(query, runResponseHandler, connection, metadataExtractor, completionListener ); this.fetchSize = fetchSize; + + //For pull everything ensure conditions for disabling auto pull are never met + if ( fetchSize == UNLIMITED_FETCH_SIZE ) + { + this.highRecordWatermark = Long.MAX_VALUE; + this.lowRecordWatermark = Long.MAX_VALUE; + } + else + { + this.highRecordWatermark = (long) (fetchSize * 0.7); + this.lowRecordWatermark = (long) (fetchSize * 0.3); + } + installRecordAndSummaryConsumers(); } @@ -96,7 +112,10 @@ private void installRecordAndSummaryConsumers() if ( error == null && summary == null ) // has_more { - request( fetchSize ); + if ( isAutoPullEnabled ) + { + request( fetchSize ); + } } } ); } @@ -199,11 +218,29 @@ private void enqueueRecord( Record record ) } records.add( record ); + + // too many records in the queue, pause auto request gathering + if ( records.size() > highRecordWatermark ) + { + isAutoPullEnabled = false; + } } private Record dequeueRecord() { - return records.poll(); + Record record = records.poll(); + + if ( records.size() <= lowRecordWatermark ) + { + //if not in streaming state we need to restart streaming + if ( state() != State.STREAMING_STATE ) + { + request( fetchSize ); + } + isAutoPullEnabled = true; + } + + return record; } private List recordsAsList( Function mapFunction ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java index c8a68ebab4..016dfaa73c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java @@ -26,6 +26,7 @@ import reactor.test.StepVerifier; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Consumer; @@ -38,16 +39,18 @@ import org.neo4j.driver.GraphDatabase; import org.neo4j.driver.Logger; import org.neo4j.driver.Record; -import org.neo4j.driver.Session; import org.neo4j.driver.Result; +import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.TransientException; import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory; -import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.util.StubServer; import static java.util.Arrays.asList; @@ -72,6 +75,7 @@ import static org.neo4j.driver.util.StubServer.INSECURE_CONFIG; import static org.neo4j.driver.util.StubServer.insecureBuilder; import static org.neo4j.driver.util.TestUtil.asOrderedSet; +import static org.neo4j.driver.util.TestUtil.await; class DirectDriverBoltKitTest { @@ -306,6 +310,75 @@ void shouldChangeFetchSize() throws Exception } } + @Test + void shouldOnlyPullRecordsWhenNeededSimpleSession() throws Exception + { + StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 ); + try + { + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) + { + Session session = driver.session( builder().withFetchSize( 2 ).build() ); + Result result = session.run( "MATCH (n) RETURN n.name" ); + ArrayList resultList = new ArrayList<>(); + result.forEachRemaining( ( rec ) -> resultList.add( rec.get( 0 ).asString() ) ); + + assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) ); + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + + @Test + void shouldOnlyPullRecordsWhenNeededAsyncSession() throws Exception + { + StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 ); + try + { + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) + { + AsyncSession session = driver.asyncSession( builder().withFetchSize( 2 ).build() ); + + ArrayList resultList = new ArrayList<>(); + + await( session.runAsync( "MATCH (n) RETURN n.name" ) + .thenCompose( resultCursor -> + resultCursor.forEachAsync( record -> resultList.add( record.get( 0 ).asString() ) ) ) ); + + assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) ); + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + + @Test + void shouldPullAllRecordsOnListAsyncWhenOverWatermark() throws Exception + { + StubServer server = StubServer.start( "streaming_records_v4_list_async.script", 9001 ); + try + { + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) ) + { + AsyncSession session = driver.asyncSession( builder().withFetchSize( 10 ).build() ); + + ResultCursor cursor = await( session.runAsync( "MATCH (n) RETURN n.name" ) ); + List records = await( cursor.listAsync( record -> record.get( 0 ).asString() ) ); + + assertEquals( records, asList( "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L" ) ); + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + @Test void shouldAllowPullAll() throws Exception { diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java index a7733e7629..f61a50a1a5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -35,6 +37,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.value.BooleanValue; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.driver.summary.QueryType; @@ -679,7 +682,6 @@ void shouldReturnNotTransformedListInListAsync() assertEquals( expectedRecords, list ); } - protected T newHandler() { return newHandler( new Query( "RETURN 1" ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandlerTest.java index 1a0b3ce12a..20091e119d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandlerTest.java @@ -18,32 +18,178 @@ */ package org.neo4j.driver.internal.handlers.pulln; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.Query; +import org.neo4j.driver.Value; import org.neo4j.driver.internal.handlers.PullAllResponseHandlerTestBase; import org.neo4j.driver.internal.handlers.PullResponseCompletionListener; import org.neo4j.driver.internal.handlers.RunResponseHandler; +import org.neo4j.driver.internal.messaging.request.PullMessage; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.value.BooleanValue; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.neo4j.driver.Values.value; +import static org.neo4j.driver.Values.values; import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.DEFAULT_FETCH_SIZE; import static org.neo4j.driver.internal.messaging.v1.BoltProtocolV1.METADATA_EXTRACTOR; class AutoPullResponseHandlerTest extends PullAllResponseHandlerTestBase { @Override - protected AutoPullResponseHandler newHandler(Query query, List queryKeys, Connection connection ) + protected AutoPullResponseHandler newHandler( Query query, List queryKeys, Connection connection ) { RunResponseHandler runResponseHandler = new RunResponseHandler( new CompletableFuture<>(), METADATA_EXTRACTOR ); - runResponseHandler.onSuccess( singletonMap( "fields", value(queryKeys) ) ); + runResponseHandler.onSuccess( singletonMap( "fields", value( queryKeys ) ) ); AutoPullResponseHandler handler = - new AutoPullResponseHandler(query, runResponseHandler, connection, METADATA_EXTRACTOR, mock( PullResponseCompletionListener.class ), - DEFAULT_FETCH_SIZE ); + new AutoPullResponseHandler( query, runResponseHandler, connection, METADATA_EXTRACTOR, mock( PullResponseCompletionListener.class ), + DEFAULT_FETCH_SIZE ); handler.prePopulateRecords(); return handler; } + + protected AutoPullResponseHandler newHandler( Query query, Connection connection, long fetchSize ) + { + RunResponseHandler runResponseHandler = new RunResponseHandler( new CompletableFuture<>(), METADATA_EXTRACTOR ); + runResponseHandler.onSuccess( emptyMap() ); + AutoPullResponseHandler handler = + new AutoPullResponseHandler( query, runResponseHandler, connection, METADATA_EXTRACTOR, mock( PullResponseCompletionListener.class ), + fetchSize ); + handler.prePopulateRecords(); + return handler; + } + + @Test + void shouldKeepRequestingWhenBetweenRange() + { + Connection connection = connectionMock(); + InOrder inOrder = Mockito.inOrder( connection ); + + //highwatermark=2, lowwatermark=1 + AutoPullResponseHandler handler = newHandler( new Query( "RETURN 1" ), connection, 4 ); + + Map metaData = new HashMap<>( 1 ); + metaData.put( "has_more", BooleanValue.TRUE ); + + inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() ); + + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + handler.onSuccess( metaData ); //2 in the record queue + + //should send another pulln request since maxValue not met + inOrder.verify( connection ).writeAndFlush( any(), any() ); + } + + @Test + void shouldStopRequestingWhenOverMaxWatermark() + { + Connection connection = connectionMock(); + InOrder inOrder = Mockito.inOrder( connection ); + + //highWatermark=2, lowWatermark=1 + AutoPullResponseHandler handler = newHandler( new Query( "RETURN 1" ), connection, 4 ); + + Map metaData = new HashMap<>( 1 ); + metaData.put( "has_more", BooleanValue.TRUE ); + + inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() ); + + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + handler.onRecord( values( 3 ) ); + handler.onSuccess( metaData ); + + //only initial writeAndFlush() + verify( connection, times( 1 ) ).writeAndFlush( any( PullMessage.class ), any() ); + } + + @Test + void shouldRestartRequestingWhenMinimumWatermarkMet() + { + Connection connection = connectionMock(); + InOrder inOrder = Mockito.inOrder( connection ); + + //highwatermark=4, lowwatermark=2 + AutoPullResponseHandler handler = newHandler( new Query( "RETURN 1" ), connection, 7 ); + + Map metaData = new HashMap<>( 1 ); + metaData.put( "has_more", BooleanValue.TRUE ); + + inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() ); + + handler.onRecord( values( 1 ) ); + handler.onRecord( values( 2 ) ); + handler.onRecord( values( 3 ) ); + handler.onRecord( values( 4 ) ); + handler.onRecord( values( 5 ) ); + handler.onSuccess( metaData ); + + verify( connection, times( 1 ) ).writeAndFlush( any( PullMessage.class ), any() ); + + handler.nextAsync(); + handler.nextAsync(); + handler.nextAsync(); + + inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() ); + } + + @Test + void shouldKeepRequestingMoreRecordsWhenPullAll() + { + Connection connection = connectionMock(); + AutoPullResponseHandler handler = newHandler( new Query( "RETURN 1" ), connection, -1 ); + + Map metaData = new HashMap<>( 1 ); + metaData.put( "has_more", BooleanValue.TRUE ); + + handler.onRecord( values( 1 ) ); + handler.onSuccess( metaData ); + + handler.onRecord( values( 2 ) ); + handler.onSuccess( metaData ); + + handler.onRecord( values( 3 ) ); + handler.onSuccess( emptyMap() ); + + verify( connection, times( 3 ) ).writeAndFlush( any( PullMessage.class ), any() ); + } + + @Test + void shouldFunctionWhenHighAndLowWatermarksAreEqual() + { + Connection connection = connectionMock(); + InOrder inOrder = Mockito.inOrder( connection ); + + //highwatermark=0, lowwatermark=0 + AutoPullResponseHandler handler = newHandler( new Query( "RETURN 1" ), connection, 1 ); + + Map metaData = new HashMap<>( 1 ); + metaData.put( "has_more", BooleanValue.TRUE ); + + inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() ); + + handler.onRecord( values( 1 ) ); + handler.onSuccess( metaData ); + + inOrder.verify( connection, never() ).writeAndFlush( any(), any() ); + + handler.nextAsync(); + + inOrder.verify( connection ).writeAndFlush( any( PullMessage.class ), any() ); + } } diff --git a/driver/src/test/resources/streaming_records_v4_buffering.script b/driver/src/test/resources/streaming_records_v4_buffering.script new file mode 100644 index 0000000000..2067657c16 --- /dev/null +++ b/driver/src/test/resources/streaming_records_v4_buffering.script @@ -0,0 +1,19 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO HELLO +!: AUTO GOODBYE + +C: RUN "MATCH (n) RETURN n.name" {} {} + PULL { "n": 2 } +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + SUCCESS {"has_more": true} +C: PULL { "n": 2 } +S: RECORD ["Tina"] + RECORD ["Frank"] + SUCCESS {"has_more": true} +C: PULL { "n": 2 } +S: RECORD ["Daisy"] + RECORD ["Clive"] + SUCCESS {} \ No newline at end of file diff --git a/driver/src/test/resources/streaming_records_v4_list_async.script b/driver/src/test/resources/streaming_records_v4_list_async.script new file mode 100644 index 0000000000..95ea02159f --- /dev/null +++ b/driver/src/test/resources/streaming_records_v4_list_async.script @@ -0,0 +1,23 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO HELLO +!: AUTO GOODBYE + +C: RUN "MATCH (n) RETURN n.name" {} {} + PULL { "n": 10 } +S: SUCCESS {"fields": ["n.name"]} + RECORD ["A"] + RECORD ["B"] + RECORD ["C"] + RECORD ["D"] + RECORD ["E"] + RECORD ["F"] + RECORD ["G"] + RECORD ["H"] + RECORD ["I"] + RECORD ["J"] + SUCCESS {"has_more": true} +C: PULL { "n": -1 } +S: RECORD ["K"] + RECORD ["L"] + SUCCESS {} \ No newline at end of file