Skip to content

Commit d4708c5

Browse files
Gregory WoodsGregory Woods
Gregory Woods
authored and
Gregory Woods
committed
Allow simple/async sessions to request data as needed
1 parent bc105be commit d4708c5

File tree

5 files changed

+233
-4
lines changed

5 files changed

+233
-4
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,15 @@ public class AutoPullResponseHandler extends BasicPullResponseHandler implements
4949
{
5050
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
5151
private final long fetchSize;
52+
private final long LOW_RECORD_WATERMARK;
53+
private final long HIGH_RECORD_WATERMARK;
5254

5355
// initialized lazily when first record arrives
5456
private Queue<Record> records = UNINITIALIZED_RECORDS;
5557

5658
private ResultSummary summary;
5759
private Throwable failure;
60+
private boolean isAutoPullEnabled = true;
5861

5962
private CompletableFuture<Record> recordFuture;
6063
private CompletableFuture<ResultSummary> summaryFuture;
@@ -64,6 +67,19 @@ public AutoPullResponseHandler(Query query, RunResponseHandler runResponseHandle
6467
{
6568
super(query, runResponseHandler, connection, metadataExtractor, completionListener );
6669
this.fetchSize = fetchSize;
70+
71+
//For pull everything ensure conditions for disabling auto pull are never met
72+
if ( fetchSize == UNLIMITED_FETCH_SIZE )
73+
{
74+
this.HIGH_RECORD_WATERMARK = Long.MAX_VALUE;
75+
this.LOW_RECORD_WATERMARK = Long.MAX_VALUE;
76+
}
77+
else
78+
{
79+
this.HIGH_RECORD_WATERMARK = (long) (fetchSize * 0.7);
80+
this.LOW_RECORD_WATERMARK = (long) (fetchSize * 0.3);
81+
}
82+
6783
installRecordAndSummaryConsumers();
6884
}
6985

@@ -96,7 +112,10 @@ private void installRecordAndSummaryConsumers()
96112

97113
if ( error == null && summary == null ) // has_more
98114
{
99-
request( fetchSize );
115+
if ( isAutoPullEnabled )
116+
{
117+
request( fetchSize );
118+
}
100119
}
101120
} );
102121
}
@@ -198,11 +217,27 @@ private void enqueueRecord( Record record )
198217
records = new ArrayDeque<>();
199218
}
200219

220+
// too many records in the queue, pause auto request gathering
221+
if ( records.size() > HIGH_RECORD_WATERMARK )
222+
{
223+
isAutoPullEnabled = false;
224+
}
225+
201226
records.add( record );
202227
}
203228

204229
private Record dequeueRecord()
205230
{
231+
if ( records.size() < LOW_RECORD_WATERMARK )
232+
{
233+
//if not in streaming state we need to restart streaming
234+
if ( state() != State.STREAMING_STATE )
235+
{
236+
request( fetchSize );
237+
}
238+
isAutoPullEnabled = true;
239+
}
240+
206241
return records.poll();
207242
}
208243

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.test.StepVerifier;
2727

2828
import java.net.URI;
29+
import java.util.ArrayList;
2930
import java.util.List;
3031
import java.util.Optional;
3132
import java.util.function.Consumer;
@@ -38,16 +39,17 @@
3839
import org.neo4j.driver.GraphDatabase;
3940
import org.neo4j.driver.Logger;
4041
import org.neo4j.driver.Record;
41-
import org.neo4j.driver.Session;
4242
import org.neo4j.driver.Result;
43+
import org.neo4j.driver.Session;
4344
import org.neo4j.driver.Transaction;
45+
import org.neo4j.driver.async.AsyncSession;
4446
import org.neo4j.driver.exceptions.TransientException;
4547
import org.neo4j.driver.internal.cluster.RoutingSettings;
4648
import org.neo4j.driver.internal.retry.RetrySettings;
4749
import org.neo4j.driver.internal.util.Clock;
4850
import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory;
49-
import org.neo4j.driver.reactive.RxSession;
5051
import org.neo4j.driver.reactive.RxResult;
52+
import org.neo4j.driver.reactive.RxSession;
5153
import org.neo4j.driver.util.StubServer;
5254

5355
import static java.util.Arrays.asList;
@@ -72,6 +74,7 @@
7274
import static org.neo4j.driver.util.StubServer.INSECURE_CONFIG;
7375
import static org.neo4j.driver.util.StubServer.insecureBuilder;
7476
import static org.neo4j.driver.util.TestUtil.asOrderedSet;
77+
import static org.neo4j.driver.util.TestUtil.await;
7578

7679
class DirectDriverBoltKitTest
7780
{
@@ -306,6 +309,53 @@ void shouldChangeFetchSize() throws Exception
306309
}
307310
}
308311

312+
@Test
313+
void shouldOnlyPullRecordsWhenNeededSimpleSession() throws Exception
314+
{
315+
StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 );
316+
try
317+
{
318+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
319+
{
320+
Session session = driver.session( builder().withFetchSize( 2 ).build() );
321+
Result result = session.run( "MATCH (n) RETURN n.name" );
322+
ArrayList<String> resultList = new ArrayList<>();
323+
result.forEachRemaining( ( rec ) -> resultList.add( rec.get( 0 ).asString() ) );
324+
325+
assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) );
326+
}
327+
}
328+
finally
329+
{
330+
assertEquals( 0, server.exitStatus() );
331+
}
332+
}
333+
334+
@Test
335+
void shouldOnlyPullRecordsWhenNeededAsyncSession() throws Exception
336+
{
337+
StubServer server = StubServer.start( "streaming_records_v4_buffering.script", 9001 );
338+
try
339+
{
340+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
341+
{
342+
AsyncSession session = driver.asyncSession( builder().withFetchSize( 2 ).build() );
343+
344+
ArrayList<String> resultList = new ArrayList<>();
345+
346+
await( session.runAsync( "MATCH (n) RETURN n.name" )
347+
.thenCompose( resultCursor ->
348+
resultCursor.forEachAsync( record -> resultList.add( record.get( 0 ).asString() ) ) ) );
349+
350+
assertEquals( resultList, asList( "Bob", "Alice", "Tina", "Frank", "Daisy", "Clive" ) );
351+
}
352+
}
353+
finally
354+
{
355+
assertEquals( 0, server.exitStatus() );
356+
}
357+
}
358+
309359
@Test
310360
void shouldAllowPullAll() throws Exception
311361
{

driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import java.io.IOException;
2424
import java.nio.channels.ClosedChannelException;
25+
import java.util.HashMap;
2526
import java.util.List;
27+
import java.util.Map;
2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.concurrent.CompletionStage;
2830
import java.util.function.Function;
@@ -35,6 +37,7 @@
3537
import org.neo4j.driver.internal.BoltServerAddress;
3638
import org.neo4j.driver.internal.InternalRecord;
3739
import org.neo4j.driver.internal.spi.Connection;
40+
import org.neo4j.driver.internal.value.BooleanValue;
3841
import org.neo4j.driver.summary.ResultSummary;
3942
import org.neo4j.driver.summary.QueryType;
4043

@@ -679,7 +682,6 @@ void shouldReturnNotTransformedListInListAsync()
679682

680683
assertEquals( expectedRecords, list );
681684
}
682-
683685
protected T newHandler()
684686
{
685687
return newHandler( new Query( "RETURN 1" ) );

driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandlerTest.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,31 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers.pulln;
2020

21+
import org.junit.jupiter.api.Test;
22+
import org.mockito.InOrder;
23+
import org.mockito.Mockito;
24+
25+
import java.util.HashMap;
2126
import java.util.List;
27+
import java.util.Map;
2228
import java.util.concurrent.CompletableFuture;
2329

2430
import org.neo4j.driver.Query;
31+
import org.neo4j.driver.Value;
2532
import org.neo4j.driver.internal.handlers.PullAllResponseHandlerTestBase;
2633
import org.neo4j.driver.internal.handlers.PullResponseCompletionListener;
2734
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2835
import org.neo4j.driver.internal.spi.Connection;
36+
import org.neo4j.driver.internal.value.BooleanValue;
2937

38+
import static java.util.Collections.emptyMap;
3039
import static java.util.Collections.singletonMap;
40+
import static org.mockito.ArgumentMatchers.any;
3141
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.times;
43+
import static org.mockito.Mockito.verify;
3244
import static org.neo4j.driver.Values.value;
45+
import static org.neo4j.driver.Values.values;
3346
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.DEFAULT_FETCH_SIZE;
3447
import static org.neo4j.driver.internal.messaging.v1.BoltProtocolV1.METADATA_EXTRACTOR;
3548

@@ -46,4 +59,114 @@ protected AutoPullResponseHandler newHandler(Query query, List<String> queryKeys
4659
handler.prePopulateRecords();
4760
return handler;
4861
}
62+
63+
protected AutoPullResponseHandler newHandler(Query query, Connection connection, long fetchSize )
64+
{
65+
RunResponseHandler runResponseHandler = new RunResponseHandler( new CompletableFuture<>(), METADATA_EXTRACTOR );
66+
runResponseHandler.onSuccess( emptyMap() );
67+
AutoPullResponseHandler handler =
68+
new AutoPullResponseHandler(query, runResponseHandler, connection, METADATA_EXTRACTOR, mock( PullResponseCompletionListener.class ),
69+
fetchSize );
70+
handler.prePopulateRecords();
71+
return handler;
72+
}
73+
74+
@Test
75+
void shouldKeepRequestingWhenBetweenRange() {
76+
Connection connection = connectionMock();
77+
InOrder inOrder = Mockito.inOrder( connection );
78+
79+
//highwatermark=2, lowwatermark=1
80+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, 4 );
81+
82+
Map<String,Value> metaData = new HashMap<>( 1 );
83+
metaData.put( "has_more", BooleanValue.TRUE );
84+
85+
inOrder.verify( connection ).writeAndFlush( any(), any() );
86+
87+
handler.onRecord( values( 1 ) );
88+
handler.onRecord( values( 2 ) );
89+
handler.onSuccess( metaData ); //2 in the record queue
90+
91+
//should send another pulln request since maxValue not met
92+
inOrder.verify( connection ).writeAndFlush( any(), any() );
93+
}
94+
95+
@Test
96+
void shouldStopRequestingWhenOverMaxWatermark() {
97+
Connection connection = connectionMock();
98+
InOrder inOrder = Mockito.inOrder( connection );
99+
100+
//highWatermark=2, lowWatermark=1
101+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, 4 );
102+
103+
Map<String,Value> metaData = new HashMap<>( 1 );
104+
metaData.put( "has_more", BooleanValue.TRUE );
105+
106+
inOrder.verify( connection ).writeAndFlush( any(), any() );
107+
108+
handler.onRecord( values( 1 ) );
109+
handler.onRecord( values( 2 ) );
110+
handler.onRecord( values( 3 ) );
111+
handler.onRecord( values( 4 ) );
112+
handler.onSuccess( metaData );
113+
114+
//only initial writeAndFlush()
115+
verify( connection , times( 1 ) ).writeAndFlush( any(),any() );
116+
}
117+
118+
@Test
119+
void shouldRestartRequestingWhenMinimumWatermarkMet() {
120+
Connection connection = connectionMock();
121+
InOrder inOrder = Mockito.inOrder( connection );
122+
123+
//highwatermark=4, lowwatermark=2
124+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, 7);
125+
126+
Map<String,Value> metaData = new HashMap<>( 1 );
127+
metaData.put( "has_more", BooleanValue.TRUE );
128+
129+
inOrder.verify( connection ).writeAndFlush( any(), any() );
130+
131+
handler.onRecord( values( 1 ) );
132+
handler.onRecord( values( 2 ) );
133+
handler.onRecord( values( 3 ) );
134+
handler.onRecord( values( 4 ) );
135+
handler.onRecord( values( 5 ) );
136+
handler.onRecord( values( 6 ) );
137+
handler.onSuccess( metaData );
138+
139+
verify( connection , times( 1 ) ).writeAndFlush( any(),any() );
140+
141+
handler.nextAsync();
142+
handler.nextAsync();
143+
handler.nextAsync();
144+
handler.nextAsync();
145+
handler.nextAsync();
146+
handler.nextAsync();
147+
148+
inOrder.verify( connection ).writeAndFlush( any() , any() );
149+
}
150+
151+
@Test
152+
void shouldKeepRequestingMoreRecordsWhenPullAll() {
153+
Connection connection = connectionMock();
154+
AutoPullResponseHandler handler = newHandler( new Query("RETURN 1"), connection, -1);
155+
156+
Map<String,Value> metaData = new HashMap<>( 1 );
157+
metaData.put( "has_more", BooleanValue.TRUE );
158+
159+
handler.onRecord( values( 1 ) );
160+
handler.onSuccess( metaData );
161+
162+
handler.onRecord( values( 2 ) );
163+
handler.onSuccess( metaData );
164+
165+
handler.onRecord( values( 3 ) );
166+
handler.onSuccess( emptyMap() );
167+
168+
verify( connection , times( 3 ) ).writeAndFlush( any(),any() );
169+
}
170+
171+
49172
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
!: BOLT 4
2+
!: AUTO RESET
3+
!: AUTO HELLO
4+
!: AUTO GOODBYE
5+
6+
C: RUN "MATCH (n) RETURN n.name" {} {}
7+
PULL { "n": 2 }
8+
S: SUCCESS {"fields": ["n.name"]}
9+
RECORD ["Bob"]
10+
RECORD ["Alice"]
11+
SUCCESS {"has_more": true}
12+
C: PULL { "n": 2 }
13+
S: RECORD ["Tina"]
14+
RECORD ["Frank"]
15+
SUCCESS {"has_more": true}
16+
C: PULL { "n": 2 }
17+
S: RECORD ["Daisy"]
18+
RECORD ["Clive"]
19+
SUCCESS {}

0 commit comments

Comments
 (0)