Skip to content

Commit c5bce12

Browse files
committed
Streaming
1 parent 06248c8 commit c5bce12

19 files changed

+354
-327
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalRecord.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal;
2020

2121
import java.util.Arrays;
22+
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.NoSuchElementException;
@@ -41,11 +42,19 @@ public class InternalRecord extends InternalRecordAccessor implements Record
4142
private final Value[] values;
4243
private int hashcode = 0;
4344

44-
public InternalRecord( List<String> keys, Map<String, Integer> keyIndexLookup, Value[] values )
45+
public InternalRecord( List<String> keys, Value[] values )
4546
{
4647
this.keys = keys;
47-
this.keyIndexLookup = keyIndexLookup;
4848
this.values = values;
49+
50+
int numFields = keys.size();
51+
Map<String, Integer> fieldLookup = new HashMap<>( numFields );
52+
for ( int i = 0; i < numFields; i++ )
53+
{
54+
String name = keys.get(i);
55+
fieldLookup.put( name, i );
56+
}
57+
this.keyIndexLookup = fieldLookup;
4958
}
5059

5160
@Override

driver/src/main/java/org/neo4j/driver/internal/InternalResultCursor.java

Lines changed: 197 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,153 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import org.neo4j.driver.v1.*;
22-
import org.neo4j.driver.v1.exceptions.ClientException;
23-
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
24-
2521
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.LinkedList;
2624
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Queue;
27+
28+
import org.neo4j.driver.internal.spi.Connection;
29+
import org.neo4j.driver.internal.spi.StreamCollector;
30+
import org.neo4j.driver.internal.summary.SummaryBuilder;
31+
import org.neo4j.driver.v1.Function;
32+
import org.neo4j.driver.v1.Notification;
33+
import org.neo4j.driver.v1.Plan;
34+
import org.neo4j.driver.v1.ProfiledPlan;
35+
import org.neo4j.driver.v1.Record;
36+
import org.neo4j.driver.v1.RecordAccessor;
37+
import org.neo4j.driver.v1.ResultCursor;
38+
import org.neo4j.driver.v1.ResultSummary;
39+
import org.neo4j.driver.v1.Statement;
40+
import org.neo4j.driver.v1.StatementType;
41+
import org.neo4j.driver.v1.UpdateStatistics;
42+
import org.neo4j.driver.v1.Value;
43+
import org.neo4j.driver.v1.exceptions.ClientException;
44+
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
2745

2846
import static java.lang.String.format;
2947
import static java.util.Collections.emptyList;
48+
import static java.util.Collections.unmodifiableMap;
49+
import static org.neo4j.driver.internal.ParameterSupport.NO_PARAMETERS;
3050
import static org.neo4j.driver.v1.Records.recordAsIs;
3151

3252
public class InternalResultCursor extends InternalRecordAccessor implements ResultCursor
3353
{
34-
private final List<String> keys;
35-
private final PeekingIterator<Record> iter;
36-
private final ResultSummary summary;
54+
private final Connection connection;
55+
private final StreamCollector runResponseCollector;
56+
private final StreamCollector pullAllResponseCollector;
57+
private final Queue<Record> recordBuffer = new LinkedList<>();
58+
59+
private List<String> keys = null;
60+
private ResultSummary summary = null;
3761

3862
private boolean open = true;
3963
private Record current = null;
4064
private long position = -1;
4165
private long limit = -1;
66+
private boolean done = false;
4267

43-
public InternalResultCursor( List<String> keys, List<Record> body, ResultSummary summary )
68+
public InternalResultCursor( Connection connection, String statement, Map<String, Value> parameters )
4469
{
45-
this.keys = keys;
46-
this.iter = new PeekingIterator<>( body.iterator() );
47-
this.summary = summary;
70+
this.connection = connection;
71+
72+
Map<String, Value> unmodifiableParameters =
73+
(parameters == null) || (parameters.isEmpty()) ? NO_PARAMETERS : unmodifiableMap( parameters );
74+
final SummaryBuilder summaryBuilder = new SummaryBuilder( new Statement( statement, unmodifiableParameters ) );
75+
76+
this.runResponseCollector = new StreamCollector()
77+
{
78+
@Override
79+
public void keys( String[] names )
80+
{
81+
keys = new ArrayList<>( Arrays.asList( names ) );
82+
}
83+
84+
@Override
85+
public void record( Value[] fields ) {}
86+
87+
@Override
88+
public void statementType( StatementType type ) {}
89+
90+
@Override
91+
public void statementStatistics( UpdateStatistics statistics ) {}
92+
93+
@Override
94+
public void plan( Plan plan ) {}
95+
96+
@Override
97+
public void profile( ProfiledPlan plan ) {}
98+
99+
@Override
100+
public void notifications( List<Notification> notifications ) {}
101+
102+
@Override
103+
public void done()
104+
{
105+
if ( keys == null )
106+
{
107+
keys = new ArrayList<>();
108+
}
109+
}
110+
};
111+
this.pullAllResponseCollector = new StreamCollector()
112+
{
113+
@Override
114+
public void keys( String[] names ) {}
115+
116+
@Override
117+
public void record( Value[] fields )
118+
{
119+
recordBuffer.add( new InternalRecord( keys, fields ) );
120+
}
121+
122+
@Override
123+
public void statementType( StatementType type )
124+
{
125+
summaryBuilder.statementType( type );
126+
}
127+
128+
@Override
129+
public void statementStatistics( UpdateStatistics statistics )
130+
{
131+
summaryBuilder.statementStatistics( statistics );
132+
}
133+
134+
@Override
135+
public void plan( Plan plan )
136+
{
137+
summaryBuilder.plan( plan );
138+
}
139+
140+
@Override
141+
public void profile( ProfiledPlan plan )
142+
{
143+
summaryBuilder.profile( plan );
144+
}
145+
146+
@Override
147+
public void notifications( List<Notification> notifications )
148+
{
149+
summaryBuilder.notifications( notifications );
150+
}
151+
152+
@Override
153+
public void done() {
154+
summary = summaryBuilder.build();
155+
done = true;
156+
}
157+
};
158+
}
159+
160+
StreamCollector runResponseCollector()
161+
{
162+
return runResponseCollector;
163+
}
164+
165+
StreamCollector pullAllResponseCollector()
166+
{
167+
return pullAllResponseCollector;
48168
}
49169

50170
@Override
@@ -77,6 +197,9 @@ public int index( String key )
77197

78198
public List<String> keys()
79199
{
200+
while (keys == null && !done) {
201+
connection.receiveOne();
202+
}
80203
return keys;
81204
}
82205

@@ -96,8 +219,8 @@ public Record record()
96219
else
97220
{
98221
throw new NoSuchRecordException(
99-
"In order to access the fields of a record in a result, " +
100-
"you must first call next() to point the result to the next record in the result stream."
222+
"In order to access the fields of a record in a result, " +
223+
"you must first call next() to point the result to the next record in the result stream."
101224
);
102225
}
103226
}
@@ -113,27 +236,51 @@ public long position()
113236
public boolean atEnd()
114237
{
115238
assertOpen();
116-
return !iter.hasNext();
239+
if (!recordBuffer.isEmpty())
240+
{
241+
return false;
242+
}
243+
else if (done)
244+
{
245+
return true;
246+
}
247+
else
248+
{
249+
while ( recordBuffer.isEmpty() && !done )
250+
{
251+
connection.receiveOne();
252+
}
253+
return recordBuffer.isEmpty() && done;
254+
}
117255
}
118256

119257
@Override
120258
public boolean next()
121259
{
122260
assertOpen();
123-
if ( iter.hasNext() )
261+
Record nextRecord = recordBuffer.poll();
262+
if ( nextRecord != null )
124263
{
125-
current = iter.next();
264+
current = nextRecord;
126265
position += 1;
127266
if ( position == limit )
128267
{
129268
discard();
130269
}
131270
return true;
132271
}
133-
else
272+
else if ( done )
134273
{
135274
return false;
136275
}
276+
else
277+
{
278+
while ( recordBuffer.isEmpty() && !done )
279+
{
280+
connection.receiveOne();
281+
}
282+
return next();
283+
}
137284
}
138285

139286
@Override
@@ -176,8 +323,8 @@ public Record first()
176323
if( position() >= 1 )
177324
{
178325
throw new NoSuchRecordException( "Cannot retrieve the first record, because this result cursor has been moved already. " +
179-
"Please ensure you are not calling `first` multiple times, or are mixing it with calls " +
180-
"to `next`, `single`, `list` or any other method that changes the position of the cursor." );
326+
"Please ensure you are not calling `first` multiple times, or are mixing it with calls " +
327+
"to `next`, `single`, `list` or any other method that changes the position of the cursor." );
181328
}
182329

183330
if( position == 0 )
@@ -209,11 +356,11 @@ public Value first(int index) throws NoSuchRecordException
209356
public Record single()
210357
{
211358
Record first = first();
212-
if( iter.hasNext() )
359+
if( !atEnd() )
213360
{
214361
throw new NoSuchRecordException( "Expected a result with a single record, but this result contains at least one more. " +
215-
"Ensure your query returns only one record, or use `first` instead of `single` if " +
216-
"you do not care about the number of records in the result." );
362+
"Ensure your query returns only one record, or use `first` instead of `single` if " +
363+
"you do not care about the number of records in the result." );
217364
}
218365
return first;
219366
}
@@ -233,7 +380,24 @@ public Value single( int index ) throws NoSuchRecordException
233380
@Override
234381
public Record peek()
235382
{
236-
return iter.peek();
383+
assertOpen();
384+
Record nextRecord = recordBuffer.peek();
385+
if ( nextRecord != null )
386+
{
387+
return nextRecord;
388+
}
389+
else if ( done )
390+
{
391+
return null;
392+
}
393+
else
394+
{
395+
while ( recordBuffer.isEmpty() && !done )
396+
{
397+
connection.receiveOne();
398+
}
399+
return peek();
400+
}
237401
}
238402

239403
@Override
@@ -264,7 +428,7 @@ else if ( position == 0 || ( position == -1 && next() ) )
264428
else
265429
{
266430
throw new ClientException(
267-
format( "Can't retain records when cursor is not pointing at the first record (currently at position %d)", position )
431+
format( "Can't retain records when cursor is not pointing at the first record (currently at position %d)", position )
268432
);
269433
}
270434
}
@@ -302,11 +466,18 @@ private void assertOpen()
302466

303467
private boolean isEmpty()
304468
{
305-
return position == -1 && !iter.hasNext();
469+
return position == -1 && recordBuffer.isEmpty() && done;
306470
}
307471

308472
private void discard()
309473
{
310-
iter.discard();
474+
assertOpen();
475+
recordBuffer.clear();
476+
while ( !done )
477+
{
478+
connection.receiveOne();
479+
recordBuffer.clear();
480+
}
311481
}
482+
312483
}

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import org.neo4j.driver.internal.spi.Connection;
2424
import org.neo4j.driver.internal.spi.Logger;
25-
import org.neo4j.driver.internal.summary.ResultBuilder;
2625
import org.neo4j.driver.internal.types.InternalTypeSystem;
2726
import org.neo4j.driver.v1.ResultCursor;
2827
import org.neo4j.driver.v1.Session;
@@ -61,11 +60,11 @@ public InternalSession( Connection connection, Logger logger )
6160
public ResultCursor run( String statementText, Map<String,Value> statementParameters )
6261
{
6362
ensureConnectionIsValid();
64-
ResultBuilder resultBuilder = new ResultBuilder( statementText, statementParameters );
65-
connection.run( statementText, statementParameters, resultBuilder );
66-
connection.pullAll( resultBuilder );
67-
connection.sync();
68-
return resultBuilder.build();
63+
InternalResultCursor cursor = new InternalResultCursor( connection, statementText, statementParameters );
64+
connection.run( statementText, statementParameters, cursor.runResponseCollector() );
65+
connection.pullAll( cursor.pullAllResponseCollector() );
66+
connection.sendAll();
67+
return cursor;
6968
}
7069

7170
@Override

0 commit comments

Comments
 (0)