Skip to content

Commit 0f147ca

Browse files
Gregory WoodsGregory Woods
Gregory Woods
authored and
Gregory Woods
committed
Allow simple/async sessions to request data as needed
1 parent bc105be commit 0f147ca

File tree

6 files changed

+279
-4
lines changed

6 files changed

+279
-4
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,15 @@ public class AutoPullResponseHandler extends BasicPullResponseHandler implements
4949
{
5050
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
5151
private final long fetchSize;
52+
private final long LOW_RECORD_WATERMARK;
53+
private final long HIGH_RECORD_WATERMARK;
5254

5355
// initialized lazily when first record arrives
5456
private Queue<Record> records = UNINITIALIZED_RECORDS;
5557

5658
private ResultSummary summary;
5759
private Throwable failure;
60+
private boolean isAutoPullEnabled = true;
5861

5962
private CompletableFuture<Record> recordFuture;
6063
private CompletableFuture<ResultSummary> summaryFuture;
@@ -64,6 +67,19 @@ public AutoPullResponseHandler(Query query, RunResponseHandler runResponseHandle
6467
{
6568
super(query, runResponseHandler, connection, metadataExtractor, completionListener );
6669
this.fetchSize = fetchSize;
70+
71+
//For pull everything ensure conditions for disabling auto pull are never met
72+
if ( fetchSize == UNLIMITED_FETCH_SIZE )
73+
{
74+
this.HIGH_RECORD_WATERMARK = Long.MAX_VALUE;
75+
this.LOW_RECORD_WATERMARK = Long.MAX_VALUE;
76+
}
77+
else
78+
{
79+
this.HIGH_RECORD_WATERMARK = (long) (fetchSize * 0.7);
80+
this.LOW_RECORD_WATERMARK = (long) (fetchSize * 0.3);
81+
}
82+
6783
installRecordAndSummaryConsumers();
6884
}
6985

@@ -96,7 +112,10 @@ private void installRecordAndSummaryConsumers()
96112

97113
if ( error == null && summary == null ) // has_more
98114
{
99-
request( fetchSize );
115+
if ( isAutoPullEnabled )
116+
{
117+
request( fetchSize );
118+
}
100119
}
101120
} );
102121
}
@@ -198,11 +217,27 @@ private void enqueueRecord( Record record )
198217
records = new ArrayDeque<>();
199218
}
200219

220+
// too many records in the queue, pause auto request gathering
221+
if ( records.size() > HIGH_RECORD_WATERMARK )
222+
{
223+
isAutoPullEnabled = false;
224+
}
225+
201226
records.add( record );
202227
}
203228

204229
private Record dequeueRecord()
205230
{
231+
if ( records.size() < LOW_RECORD_WATERMARK )
232+
{
233+
//if not in streaming state we need to restart streaming
234+
if ( state() != State.STREAMING_STATE )
235+
{
236+
request( fetchSize );
237+
}
238+
isAutoPullEnabled = true;
239+
}
240+
206241
return records.poll();
207242
}
208243

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.test.StepVerifier;
2727

2828
import java.net.URI;
29+
import java.util.ArrayList;
2930
import java.util.List;
3031
import java.util.Optional;
3132
import java.util.function.Consumer;
@@ -38,16 +39,18 @@
3839
import org.neo4j.driver.GraphDatabase;
3940
import org.neo4j.driver.Logger;
4041
import org.neo4j.driver.Record;
41-
import org.neo4j.driver.Session;
4242
import org.neo4j.driver.Result;
43+
import org.neo4j.driver.Session;
4344
import org.neo4j.driver.Transaction;
45+
import org.neo4j.driver.async.AsyncSession;
46+
import org.neo4j.driver.async.ResultCursor;
4447
import org.neo4j.driver.exceptions.TransientException;
4548
import org.neo4j.driver.internal.cluster.RoutingSettings;
4649
import org.neo4j.driver.internal.retry.RetrySettings;
4750
import org.neo4j.driver.internal.util.Clock;
4851
import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory;
49-
import org.neo4j.driver.reactive.RxSession;
5052
import org.neo4j.driver.reactive.RxResult;
53+
import org.neo4j.driver.reactive.RxSession;
5154
import org.neo4j.driver.util.StubServer;
5255

5356
import static java.util.Arrays.asList;
@@ -72,6 +75,7 @@
7275
import static org.neo4j.driver.util.StubServer.INSECURE_CONFIG;
7376
import static org.neo4j.driver.util.StubServer.insecureBuilder;
7477
import static org.neo4j.driver.util.TestUtil.asOrderedSet;
78+
import static org.neo4j.driver.util.TestUtil.await;
7579

7680
class DirectDriverBoltKitTest
7781
{
@@ -306,6 +310,75 @@ void shouldChangeFetchSize() throws Exception
306310
}
307311
}
308312

313+
@Test
314+
void shouldOnlyPullRecordsWhenNeededSimpleSession() throws Exception
315+
{
316+
StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 );
317+
try
318+
{
319+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
320+
{
321+
Session session = driver.session( builder().withFetchSize( 2 ).build() );
322+
Result result = session.run( "MATCH (n) RETURN n.name" );
323+
ArrayList<String> resultList = new ArrayList<>();
324+
result.forEachRemaining( ( rec ) -> resultList.add( rec.get( 0 ).asString() ) );
325+
326+
assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) );
327+
}
328+
}
329+
finally
330+
{
331+
assertEquals( 0, server.exitStatus() );
332+
}
333+
}
334+
335+
@Test
336+
void shouldOnlyPullRecordsWhenNeededAsyncSession() throws Exception
337+
{
338+
StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 );
339+
try
340+
{
341+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
342+
{
343+
AsyncSession session = driver.asyncSession( builder().withFetchSize( 2 ).build() );
344+
345+
ArrayList<String> resultList = new ArrayList<>();
346+
347+
await( session.runAsync( "MATCH (n) RETURN n.name" )
348+
.thenCompose( resultCursor ->
349+
resultCursor.forEachAsync( record -> resultList.add( record.get( 0 ).asString() ) ) ) );
350+
351+
assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) );
352+
}
353+
}
354+
finally
355+
{
356+
assertEquals( 0, server.exitStatus() );
357+
}
358+
}
359+
360+
@Test
361+
void shouldPullAllRecordsOnListAsyncWhenOverWatermark() throws Exception
362+
{
363+
StubServer server = StubServer.start( "streaming_records_v4_list_async.script", 9001 );
364+
try
365+
{
366+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
367+
{
368+
AsyncSession session = driver.asyncSession( builder().withFetchSize( 10 ).build() );
369+
370+
ResultCursor cursor = await( session.runAsync( "MATCH (n) RETURN n.name" ) );
371+
List<String> records = await( cursor.listAsync( record -> record.get( 0 ).asString() ) );
372+
373+
assertEquals( records, asList( "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L" ) );
374+
}
375+
}
376+
finally
377+
{
378+
assertEquals( 0, server.exitStatus() );
379+
}
380+
}
381+
309382
@Test
310383
void shouldAllowPullAll() throws Exception
311384
{

driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import java.io.IOException;
2424
import java.nio.channels.ClosedChannelException;
25+
import java.util.HashMap;
2526
import java.util.List;
27+
import java.util.Map;
2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.concurrent.CompletionStage;
2830
import java.util.function.Function;
@@ -35,6 +37,7 @@
3537
import org.neo4j.driver.internal.BoltServerAddress;
3638
import org.neo4j.driver.internal.InternalRecord;
3739
import org.neo4j.driver.internal.spi.Connection;
40+
import org.neo4j.driver.internal.value.BooleanValue;
3841
import org.neo4j.driver.summary.ResultSummary;
3942
import org.neo4j.driver.summary.QueryType;
4043

@@ -679,7 +682,6 @@ void shouldReturnNotTransformedListInListAsync()
679682

680683
assertEquals( expectedRecords, list );
681684
}
682-
683685
protected T newHandler()
684686
{
685687
return newHandler( new Query( "RETURN 1" ) );

driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandlerTest.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,31 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers.pulln;
2020

21+
import org.junit.jupiter.api.Test;
22+
import org.mockito.InOrder;
23+
import org.mockito.Mockito;
24+
25+
import java.util.HashMap;
2126
import java.util.List;
27+
import java.util.Map;
2228
import java.util.concurrent.CompletableFuture;
2329

2430
import org.neo4j.driver.Query;
31+
import org.neo4j.driver.Value;
2532
import org.neo4j.driver.internal.handlers.PullAllResponseHandlerTestBase;
2633
import org.neo4j.driver.internal.handlers.PullResponseCompletionListener;
2734
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2835
import org.neo4j.driver.internal.spi.Connection;
36+
import org.neo4j.driver.internal.value.BooleanValue;
2937

38+
import static java.util.Collections.emptyMap;
3039
import static java.util.Collections.singletonMap;
40+
import static org.mockito.ArgumentMatchers.any;
3141
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.times;
43+
import static org.mockito.Mockito.verify;
3244
import static org.neo4j.driver.Values.value;
45+
import static org.neo4j.driver.Values.values;
3346
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.DEFAULT_FETCH_SIZE;
3447
import static org.neo4j.driver.internal.messaging.v1.BoltProtocolV1.METADATA_EXTRACTOR;
3548

@@ -46,4 +59,114 @@ protected AutoPullResponseHandler newHandler(Query query, List<String> queryKeys
4659
handler.prePopulateRecords();
4760
return handler;
4861
}
62+
63+
protected AutoPullResponseHandler newHandler(Query query, Connection connection, long fetchSize )
64+
{
65+
RunResponseHandler runResponseHandler = new RunResponseHandler( new CompletableFuture<>(), METADATA_EXTRACTOR );
66+
runResponseHandler.onSuccess( emptyMap() );
67+
AutoPullResponseHandler handler =
68+
new AutoPullResponseHandler(query, runResponseHandler, connection, METADATA_EXTRACTOR, mock( PullResponseCompletionListener.class ),
69+
fetchSize );
70+
handler.prePopulateRecords();
71+
return handler;
72+
}
73+
74+
@Test
75+
void shouldKeepRequestingWhenBetweenRange() {
76+
Connection connection = connectionMock();
77+
InOrder inOrder = Mockito.inOrder( connection );
78+
79+
//highwatermark=2, lowwatermark=1
80+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, 4 );
81+
82+
Map<String,Value> metaData = new HashMap<>( 1 );
83+
metaData.put( "has_more", BooleanValue.TRUE );
84+
85+
inOrder.verify( connection ).writeAndFlush( any(), any() );
86+
87+
handler.onRecord( values( 1 ) );
88+
handler.onRecord( values( 2 ) );
89+
handler.onSuccess( metaData ); //2 in the record queue
90+
91+
//should send another pulln request since maxValue not met
92+
inOrder.verify( connection ).writeAndFlush( any(), any() );
93+
}
94+
95+
@Test
96+
void shouldStopRequestingWhenOverMaxWatermark() {
97+
Connection connection = connectionMock();
98+
InOrder inOrder = Mockito.inOrder( connection );
99+
100+
//highWatermark=2, lowWatermark=1
101+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, 4 );
102+
103+
Map<String,Value> metaData = new HashMap<>( 1 );
104+
metaData.put( "has_more", BooleanValue.TRUE );
105+
106+
inOrder.verify( connection ).writeAndFlush( any(), any() );
107+
108+
handler.onRecord( values( 1 ) );
109+
handler.onRecord( values( 2 ) );
110+
handler.onRecord( values( 3 ) );
111+
handler.onRecord( values( 4 ) );
112+
handler.onSuccess( metaData );
113+
114+
//only initial writeAndFlush()
115+
verify( connection , times( 1 ) ).writeAndFlush( any(),any() );
116+
}
117+
118+
@Test
119+
void shouldRestartRequestingWhenMinimumWatermarkMet() {
120+
Connection connection = connectionMock();
121+
InOrder inOrder = Mockito.inOrder( connection );
122+
123+
//highwatermark=4, lowwatermark=2
124+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, 7);
125+
126+
Map<String,Value> metaData = new HashMap<>( 1 );
127+
metaData.put( "has_more", BooleanValue.TRUE );
128+
129+
inOrder.verify( connection ).writeAndFlush( any(), any() );
130+
131+
handler.onRecord( values( 1 ) );
132+
handler.onRecord( values( 2 ) );
133+
handler.onRecord( values( 3 ) );
134+
handler.onRecord( values( 4 ) );
135+
handler.onRecord( values( 5 ) );
136+
handler.onRecord( values( 6 ) );
137+
handler.onSuccess( metaData );
138+
139+
verify( connection , times( 1 ) ).writeAndFlush( any(),any() );
140+
141+
handler.nextAsync();
142+
handler.nextAsync();
143+
handler.nextAsync();
144+
handler.nextAsync();
145+
handler.nextAsync();
146+
handler.nextAsync();
147+
148+
inOrder.verify( connection ).writeAndFlush( any() , any() );
149+
}
150+
151+
@Test
152+
void shouldKeepRequestingMoreRecordsWhenPullAll() {
153+
Connection connection = connectionMock();
154+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, -1);
155+
156+
Map<String,Value> metaData = new HashMap<>( 1 );
157+
metaData.put( "has_more", BooleanValue.TRUE );
158+
159+
handler.onRecord( values( 1 ) );
160+
handler.onSuccess( metaData );
161+
162+
handler.onRecord( values( 2 ) );
163+
handler.onSuccess( metaData );
164+
165+
handler.onRecord( values( 3 ) );
166+
handler.onSuccess( emptyMap() );
167+
168+
verify( connection , times( 3 ) ).writeAndFlush( any(),any() );
169+
}
170+
171+
49172
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
!: BOLT 4
2+
!: AUTO RESET
3+
!: AUTO HELLO
4+
!: AUTO GOODBYE
5+
6+
C: RUN "MATCH (n) RETURN n.name" {} {}
7+
PULL { "n": 2 }
8+
S: SUCCESS {"fields": ["n.name"]}
9+
RECORD ["Bob"]
10+
RECORD ["Alice"]
11+
SUCCESS {"has_more": true}
12+
C: PULL { "n": 2 }
13+
S: RECORD ["Tina"]
14+
RECORD ["Frank"]
15+
SUCCESS {"has_more": true}
16+
C: PULL { "n": 2 }
17+
S: RECORD ["Daisy"]
18+
RECORD ["Clive"]
19+
SUCCESS {}

0 commit comments

Comments
 (0)