Skip to content

Commit 0af5895

Browse files
committed
Backpressure for reading records
Previously driver would read records from network even if client did not consume them. This happened because event loop threads were always in auto-read mode despite the amount of currently buffered records. This could lead to memory problems when handling large results that are slowly consumed. This commit adds simple network-level backpressure for reading records. Auto-read mode is turned off when amount of buffered records reaches high watermark. It is turned on again when bunch of records are consumed and buffer size reaches low watermark. Also added unit tests for PULL_ALL and RUN handlers.
1 parent d582ccc commit 0af5895

File tree

9 files changed

+1127
-44
lines changed

9 files changed

+1127
-44
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public class NettyConnection implements Connection
5252
private final Clock clock;
5353

5454
private final AtomicBoolean open = new AtomicBoolean( true );
55-
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );
5655

5756
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5857
{
@@ -73,8 +72,7 @@ public boolean isOpen()
7372
@Override
7473
public void enableAutoRead()
7574
{
76-
assertOpen();
77-
if ( autoReadEnabled.compareAndSet( false, true ) )
75+
if ( isOpen() )
7876
{
7977
setAutoRead( true );
8078
}
@@ -83,8 +81,7 @@ public void enableAutoRead()
8381
@Override
8482
public void disableAutoRead()
8583
{
86-
assertOpen();
87-
if ( autoReadEnabled.compareAndSet( true, false ) )
84+
if ( isOpen() )
8885
{
8986
setAutoRead( false );
9087
}
@@ -111,6 +108,9 @@ public CompletionStage<Void> release()
111108
{
112109
if ( open.compareAndSet( true, false ) )
113110
{
111+
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
112+
setAutoRead( true );
113+
114114
Promise<Void> releasePromise = channel.eventLoop().newPromise();
115115
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
116116
return asCompletionStage( releasePromise );
@@ -186,7 +186,7 @@ private void setAutoRead( boolean value )
186186

187187
private void assertOpen()
188188
{
189-
if ( !open.get() )
189+
if ( !isOpen() )
190190
{
191191
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
192192
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21-
import java.util.LinkedList;
21+
import java.util.ArrayDeque;
2222
import java.util.Map;
2323
import java.util.Queue;
2424
import java.util.concurrent.CompletableFuture;
@@ -38,16 +38,16 @@
3838
import static java.util.concurrent.CompletableFuture.completedFuture;
3939
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4040

41-
// todo: unit tests
4241
public abstract class PullAllResponseHandler implements ResponseHandler
4342
{
44-
private static final boolean TOUCH_AUTO_READ = false;
43+
static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
44+
static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );
4545

4646
private final Statement statement;
4747
private final RunResponseHandler runResponseHandler;
4848
protected final Connection connection;
4949

50-
private final Queue<Record> records = new LinkedList<>();
50+
private final Queue<Record> records = new ArrayDeque<>();
5151

5252
private boolean finished;
5353
private Throwable failure;
@@ -199,25 +199,31 @@ else if ( finished )
199199
private void queueRecord( Record record )
200200
{
201201
records.add( record );
202-
if ( TOUCH_AUTO_READ )
202+
203+
boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null;
204+
// when summary or failure is requested we have to buffer all remaining records and then return summary/failure
205+
// do not disable auto-read in this case, otherwise records will not be consumed and trailing
206+
// SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure
207+
if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK )
203208
{
204-
if ( records.size() > 10_000 )
205-
{
206-
connection.disableAutoRead();
207-
}
209+
// more than high watermark records are already queued, tell connection to stop auto-reading from network
210+
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
211+
// fetched from network faster than consumed
212+
connection.disableAutoRead();
208213
}
209214
}
210215

211216
private Record dequeueRecord()
212217
{
213218
Record record = records.poll();
214-
if ( TOUCH_AUTO_READ )
219+
220+
if ( records.size() < RECORD_BUFFER_LOW_WATERMARK )
215221
{
216-
if ( record != null && records.size() < 100 )
217-
{
218-
connection.enableAutoRead();
219-
}
222+
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
223+
// and populate queue with new records from network
224+
connection.enableAutoRead();
220225
}
226+
221227
return record;
222228
}
223229

driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030

3131
public class RunResponseHandler implements ResponseHandler
3232
{
33+
private static final int UNKNOWN = -1;
34+
3335
private final CompletableFuture<Void> runCompletedFuture;
3436

3537
private List<String> statementKeys = emptyList();
36-
private long resultAvailableAfter;
38+
private long resultAvailableAfter = UNKNOWN;
3739

3840
public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
3941
{
@@ -107,6 +109,6 @@ private static long extractResultAvailableAfter( Map<String,Value> metadata )
107109
{
108110
return resultAvailableAfterValue.asLong();
109111
}
110-
return -1;
112+
return UNKNOWN;
111113
}
112114
}

driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -121,25 +121,15 @@ public void shouldWriteForceReleaseInEventLoopThread() throws Exception
121121
}
122122

123123
@Test
124-
public void shouldNotEnableAutoReadWhenReleased()
124+
public void shouldEnableAutoReadWhenReleased()
125125
{
126126
EmbeddedChannel channel = new EmbeddedChannel();
127127
channel.config().setAutoRead( false );
128128

129129
NettyConnection connection = newConnection( channel );
130130

131131
connection.release();
132-
133-
try
134-
{
135-
connection.enableAutoRead();
136-
fail( "Exception expected" );
137-
}
138-
catch ( IllegalStateException e )
139-
{
140-
assertConnectionReleasedError( e );
141-
}
142-
assertFalse( channel.config().isAutoRead() );
132+
assertTrue( channel.config().isAutoRead() );
143133
}
144134

145135
@Test
@@ -151,16 +141,7 @@ public void shouldNotDisableAutoReadWhenReleased()
151141
NettyConnection connection = newConnection( channel );
152142

153143
connection.release();
154-
155-
try
156-
{
157-
connection.disableAutoRead();
158-
fail( "Exception expected" );
159-
}
160-
catch ( IllegalStateException e )
161-
{
162-
assertConnectionReleasedError( e );
163-
}
144+
connection.disableAutoRead(); // does nothing on released connection
164145
assertTrue( channel.config().isAutoRead() );
165146
}
166147

0 commit comments

Comments
 (0)