Skip to content

Commit ae43d77

Browse files
authored
Merge pull request #438 from lutovich/1.5-auto-read
Backpressure for reading records
2 parents 0c57215 + 28ad95e commit ae43d77

17 files changed

+1698
-186
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public class NettyConnection implements Connection
5050
private final Clock clock;
5151

5252
private final AtomicBoolean open = new AtomicBoolean( true );
53-
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );
5453

5554
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5655
{
@@ -72,8 +71,7 @@ public boolean isOpen()
7271
@Override
7372
public void enableAutoRead()
7473
{
75-
assertOpen();
76-
if ( autoReadEnabled.compareAndSet( false, true ) )
74+
if ( isOpen() )
7775
{
7876
setAutoRead( true );
7977
}
@@ -82,8 +80,7 @@ public void enableAutoRead()
8280
@Override
8381
public void disableAutoRead()
8482
{
85-
assertOpen();
86-
if ( autoReadEnabled.compareAndSet( true, false ) )
83+
if ( isOpen() )
8784
{
8885
setAutoRead( false );
8986
}
@@ -110,6 +107,9 @@ public CompletionStage<Void> release()
110107
{
111108
if ( open.compareAndSet( true, false ) )
112109
{
110+
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
111+
setAutoRead( true );
112+
113113
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
114114
}
115115
return releaseFuture;
@@ -180,7 +180,7 @@ private void setAutoRead( boolean value )
180180

181181
private void assertOpen()
182182
{
183-
if ( !open.get() )
183+
if ( !isOpen() )
184184
{
185185
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
186186
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 23 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21-
import java.util.Collections;
22-
import java.util.LinkedList;
23-
import java.util.List;
21+
import java.util.ArrayDeque;
2422
import java.util.Map;
2523
import java.util.Queue;
2624
import java.util.concurrent.CompletableFuture;
@@ -29,36 +27,27 @@
2927
import org.neo4j.driver.internal.InternalRecord;
3028
import org.neo4j.driver.internal.spi.Connection;
3129
import org.neo4j.driver.internal.spi.ResponseHandler;
32-
import org.neo4j.driver.internal.summary.InternalNotification;
33-
import org.neo4j.driver.internal.summary.InternalPlan;
34-
import org.neo4j.driver.internal.summary.InternalProfiledPlan;
35-
import org.neo4j.driver.internal.summary.InternalResultSummary;
36-
import org.neo4j.driver.internal.summary.InternalServerInfo;
37-
import org.neo4j.driver.internal.summary.InternalSummaryCounters;
30+
import org.neo4j.driver.internal.util.MetadataUtil;
3831
import org.neo4j.driver.v1.Record;
3932
import org.neo4j.driver.v1.Statement;
4033
import org.neo4j.driver.v1.Value;
41-
import org.neo4j.driver.v1.summary.Notification;
42-
import org.neo4j.driver.v1.summary.Plan;
43-
import org.neo4j.driver.v1.summary.ProfiledPlan;
4434
import org.neo4j.driver.v1.summary.ResultSummary;
45-
import org.neo4j.driver.v1.summary.StatementType;
4635

4736
import static java.util.Collections.emptyMap;
4837
import static java.util.Objects.requireNonNull;
4938
import static java.util.concurrent.CompletableFuture.completedFuture;
5039
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5140

52-
// todo: unit tests
5341
public abstract class PullAllResponseHandler implements ResponseHandler
5442
{
55-
private static final boolean TOUCH_AUTO_READ = false;
43+
static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
44+
static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );
5645

5746
private final Statement statement;
5847
private final RunResponseHandler runResponseHandler;
5948
protected final Connection connection;
6049

61-
private final Queue<Record> records = new LinkedList<>();
50+
private final Queue<Record> records = new ArrayDeque<>();
6251

6352
private boolean finished;
6453
private Throwable failure;
@@ -210,25 +199,31 @@ else if ( finished )
210199
private void queueRecord( Record record )
211200
{
212201
records.add( record );
213-
if ( TOUCH_AUTO_READ )
202+
203+
boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null;
204+
// when summary or failure is requested we have to buffer all remaining records and then return summary/failure
205+
// do not disable auto-read in this case, otherwise records will not be consumed and trailing
206+
// SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure
207+
if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK )
214208
{
215-
if ( records.size() > 10_000 )
216-
{
217-
connection.disableAutoRead();
218-
}
209+
// more than high watermark records are already queued, tell connection to stop auto-reading from network
210+
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
211+
// fetched from network faster than consumed
212+
connection.disableAutoRead();
219213
}
220214
}
221215

222216
private Record dequeueRecord()
223217
{
224218
Record record = records.poll();
225-
if ( TOUCH_AUTO_READ )
219+
220+
if ( records.size() < RECORD_BUFFER_LOW_WATERMARK )
226221
{
227-
if ( record != null && records.size() < 100 )
228-
{
229-
connection.enableAutoRead();
230-
}
222+
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
223+
// and populate queue with new records from network
224+
connection.enableAutoRead();
231225
}
226+
232227
return record;
233228
}
234229

@@ -302,89 +297,7 @@ private boolean completeFailureFuture( Throwable error )
302297

303298
private ResultSummary extractResultSummary( Map<String,Value> metadata )
304299
{
305-
InternalServerInfo serverInfo = new InternalServerInfo( connection.serverAddress(),
306-
connection.serverVersion() );
307-
return new InternalResultSummary( statement, serverInfo, extractStatementType( metadata ),
308-
extractCounters( metadata ), extractPlan( metadata ), extractProfiledPlan( metadata ),
309-
extractNotifications( metadata ), runResponseHandler.resultAvailableAfter(),
310-
extractResultConsumedAfter( metadata ) );
311-
}
312-
313-
private static StatementType extractStatementType( Map<String,Value> metadata )
314-
{
315-
Value typeValue = metadata.get( "type" );
316-
if ( typeValue != null )
317-
{
318-
return StatementType.fromCode( typeValue.asString() );
319-
}
320-
return null;
321-
}
322-
323-
private static InternalSummaryCounters extractCounters( Map<String,Value> metadata )
324-
{
325-
Value countersValue = metadata.get( "stats" );
326-
if ( countersValue != null )
327-
{
328-
return new InternalSummaryCounters(
329-
counterValue( countersValue, "nodes-created" ),
330-
counterValue( countersValue, "nodes-deleted" ),
331-
counterValue( countersValue, "relationships-created" ),
332-
counterValue( countersValue, "relationships-deleted" ),
333-
counterValue( countersValue, "properties-set" ),
334-
counterValue( countersValue, "labels-added" ),
335-
counterValue( countersValue, "labels-removed" ),
336-
counterValue( countersValue, "indexes-added" ),
337-
counterValue( countersValue, "indexes-removed" ),
338-
counterValue( countersValue, "constraints-added" ),
339-
counterValue( countersValue, "constraints-removed" )
340-
);
341-
}
342-
return null;
343-
}
344-
345-
private static int counterValue( Value countersValue, String name )
346-
{
347-
Value value = countersValue.get( name );
348-
return value.isNull() ? 0 : value.asInt();
349-
}
350-
351-
private static Plan extractPlan( Map<String,Value> metadata )
352-
{
353-
Value planValue = metadata.get( "plan" );
354-
if ( planValue != null )
355-
{
356-
return InternalPlan.EXPLAIN_PLAN_FROM_VALUE.apply( planValue );
357-
}
358-
return null;
359-
}
360-
361-
private static ProfiledPlan extractProfiledPlan( Map<String,Value> metadata )
362-
{
363-
Value profiledPlanValue = metadata.get( "profile" );
364-
if ( profiledPlanValue != null )
365-
{
366-
return InternalProfiledPlan.PROFILED_PLAN_FROM_VALUE.apply( profiledPlanValue );
367-
}
368-
return null;
369-
}
370-
371-
private static List<Notification> extractNotifications( Map<String,Value> metadata )
372-
{
373-
Value notificationsValue = metadata.get( "notifications" );
374-
if ( notificationsValue != null )
375-
{
376-
return notificationsValue.asList( InternalNotification.VALUE_TO_NOTIFICATION );
377-
}
378-
return Collections.emptyList();
379-
}
380-
381-
private static long extractResultConsumedAfter( Map<String,Value> metadata )
382-
{
383-
Value resultConsumedAfterValue = metadata.get( "result_consumed_after" );
384-
if ( resultConsumedAfterValue != null )
385-
{
386-
return resultConsumedAfterValue.asLong();
387-
}
388-
return -1;
300+
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
301+
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata );
389302
}
390303
}

driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21-
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.concurrent.CompletableFuture;
@@ -27,13 +26,15 @@
2726
import org.neo4j.driver.v1.Value;
2827

2928
import static java.util.Collections.emptyList;
29+
import static org.neo4j.driver.internal.util.MetadataUtil.extractResultAvailableAfter;
30+
import static org.neo4j.driver.internal.util.MetadataUtil.extractStatementKeys;
3031

3132
public class RunResponseHandler implements ResponseHandler
3233
{
3334
private final CompletableFuture<Void> runCompletedFuture;
3435

3536
private List<String> statementKeys = emptyList();
36-
private long resultAvailableAfter;
37+
private long resultAvailableAfter = -1;
3738

3839
public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
3940
{
@@ -43,7 +44,7 @@ public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
4344
@Override
4445
public void onSuccess( Map<String,Value> metadata )
4546
{
46-
statementKeys = extractKeys( metadata );
47+
statementKeys = extractStatementKeys( metadata );
4748
resultAvailableAfter = extractResultAvailableAfter( metadata );
4849

4950
completeRunFuture();
@@ -80,33 +81,4 @@ private void completeRunFuture()
8081
{
8182
runCompletedFuture.complete( null );
8283
}
83-
84-
private static List<String> extractKeys( Map<String,Value> metadata )
85-
{
86-
Value keysValue = metadata.get( "fields" );
87-
if ( keysValue != null )
88-
{
89-
if ( !keysValue.isEmpty() )
90-
{
91-
List<String> keys = new ArrayList<>( keysValue.size() );
92-
for ( Value value : keysValue.values() )
93-
{
94-
keys.add( value.asString() );
95-
}
96-
97-
return keys;
98-
}
99-
}
100-
return emptyList();
101-
}
102-
103-
private static long extractResultAvailableAfter( Map<String,Value> metadata )
104-
{
105-
Value resultAvailableAfterValue = metadata.get( "result_available_after" );
106-
if ( resultAvailableAfterValue != null )
107-
{
108-
return resultAvailableAfterValue.asLong();
109-
}
110-
return -1;
111-
}
11284
}

driver/src/main/java/org/neo4j/driver/internal/summary/InternalInputPosition.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.summary;
2020

21+
import java.util.Objects;
22+
2123
import org.neo4j.driver.v1.summary.InputPosition;
2224

2325
/**
@@ -61,6 +63,29 @@ public int column()
6163
return column;
6264
}
6365

66+
@Override
67+
public boolean equals( Object o )
68+
{
69+
if ( this == o )
70+
{
71+
return true;
72+
}
73+
if ( o == null || getClass() != o.getClass() )
74+
{
75+
return false;
76+
}
77+
InternalInputPosition that = (InternalInputPosition) o;
78+
return offset == that.offset &&
79+
line == that.line &&
80+
column == that.column;
81+
}
82+
83+
@Override
84+
public int hashCode()
85+
{
86+
return Objects.hash( offset, line, column );
87+
}
88+
6489
@Override
6590
public String toString()
6691
{

driver/src/main/java/org/neo4j/driver/internal/summary/InternalResultSummary.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,15 @@ public List<Notification> notifications()
110110
@Override
111111
public long resultAvailableAfter( TimeUnit unit )
112112
{
113-
return unit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS );
113+
return resultAvailableAfter == -1 ? resultAvailableAfter
114+
: unit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS );
114115
}
115116

116117
@Override
117118
public long resultConsumedAfter( TimeUnit unit )
118119
{
119-
return unit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS );
120+
return resultConsumedAfter == -1 ? resultConsumedAfter
121+
: unit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS );
120122
}
121123

122124
@Override

driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,19 @@
2525

2626
public class InternalServerInfo implements ServerInfo
2727
{
28-
private final BoltServerAddress address;
28+
private final String address;
2929
private final String version;
3030

3131
public InternalServerInfo( BoltServerAddress address, ServerVersion version )
3232
{
33-
this( address, version.toString() );
34-
}
35-
36-
public InternalServerInfo( BoltServerAddress address, String version )
37-
{
38-
this.address = address;
39-
this.version = version;
40-
}
41-
42-
public BoltServerAddress boltServerAddress()
43-
{
44-
return this.address;
33+
this.address = address.toString();
34+
this.version = version.toString();
4535
}
4636

4737
@Override
4838
public String address()
4939
{
50-
return this.address.toString();
40+
return address;
5141
}
5242

5343
@Override

0 commit comments

Comments
 (0)