Skip to content

Commit 6cc322c

Browse files
committed
Backpressure for reading records
Previously driver would read records from network even if client did not consume them. This happened because event loop threads were always in auto-read mode despite the amount of currently buffered records. This could lead to memory problems when handling large results that are slowly consumed. This commit adds simple network-level backpressure for reading records. Auto-read mode is turned off when amount of buffered records reaches high watermark. It is turned on again when bunch of records are consumed and buffer size reaches low watermark. Also added unit tests for PULL_ALL and RUN handlers.
1 parent a8fe2b4 commit 6cc322c

File tree

9 files changed

+1127
-44
lines changed

9 files changed

+1127
-44
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public class NettyConnection implements Connection
5050
private final Clock clock;
5151

5252
private final AtomicBoolean open = new AtomicBoolean( true );
53-
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );
5453

5554
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5655
{
@@ -72,8 +71,7 @@ public boolean isOpen()
7271
@Override
7372
public void enableAutoRead()
7473
{
75-
assertOpen();
76-
if ( autoReadEnabled.compareAndSet( false, true ) )
74+
if ( isOpen() )
7775
{
7876
setAutoRead( true );
7977
}
@@ -82,8 +80,7 @@ public void enableAutoRead()
8280
@Override
8381
public void disableAutoRead()
8482
{
85-
assertOpen();
86-
if ( autoReadEnabled.compareAndSet( true, false ) )
83+
if ( isOpen() )
8784
{
8885
setAutoRead( false );
8986
}
@@ -110,6 +107,9 @@ public CompletionStage<Void> release()
110107
{
111108
if ( open.compareAndSet( true, false ) )
112109
{
110+
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
111+
setAutoRead( true );
112+
113113
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
114114
}
115115
return releaseFuture;
@@ -180,7 +180,7 @@ private void setAutoRead( boolean value )
180180

181181
private void assertOpen()
182182
{
183-
if ( !open.get() )
183+
if ( !isOpen() )
184184
{
185185
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
186186
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21-
import java.util.LinkedList;
21+
import java.util.ArrayDeque;
2222
import java.util.Map;
2323
import java.util.Queue;
2424
import java.util.concurrent.CompletableFuture;
@@ -38,16 +38,16 @@
3838
import static java.util.concurrent.CompletableFuture.completedFuture;
3939
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4040

41-
// todo: unit tests
4241
public abstract class PullAllResponseHandler implements ResponseHandler
4342
{
44-
private static final boolean TOUCH_AUTO_READ = false;
43+
static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
44+
static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );
4545

4646
private final Statement statement;
4747
private final RunResponseHandler runResponseHandler;
4848
protected final Connection connection;
4949

50-
private final Queue<Record> records = new LinkedList<>();
50+
private final Queue<Record> records = new ArrayDeque<>();
5151

5252
private boolean finished;
5353
private Throwable failure;
@@ -199,25 +199,31 @@ else if ( finished )
199199
private void queueRecord( Record record )
200200
{
201201
records.add( record );
202-
if ( TOUCH_AUTO_READ )
202+
203+
boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null;
204+
// when summary or failure is requested we have to buffer all remaining records and then return summary/failure
205+
// do not disable auto-read in this case, otherwise records will not be consumed and trailing
206+
// SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure
207+
if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK )
203208
{
204-
if ( records.size() > 10_000 )
205-
{
206-
connection.disableAutoRead();
207-
}
209+
// more than high watermark records are already queued, tell connection to stop auto-reading from network
210+
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
211+
// fetched from network faster than consumed
212+
connection.disableAutoRead();
208213
}
209214
}
210215

211216
private Record dequeueRecord()
212217
{
213218
Record record = records.poll();
214-
if ( TOUCH_AUTO_READ )
219+
220+
if ( records.size() < RECORD_BUFFER_LOW_WATERMARK )
215221
{
216-
if ( record != null && records.size() < 100 )
217-
{
218-
connection.enableAutoRead();
219-
}
222+
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
223+
// and populate queue with new records from network
224+
connection.enableAutoRead();
220225
}
226+
221227
return record;
222228
}
223229

driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030

3131
public class RunResponseHandler implements ResponseHandler
3232
{
33+
private static final int UNKNOWN = -1;
34+
3335
private final CompletableFuture<Void> runCompletedFuture;
3436

3537
private List<String> statementKeys = emptyList();
36-
private long resultAvailableAfter;
38+
private long resultAvailableAfter = UNKNOWN;
3739

3840
public RunResponseHandler( CompletableFuture<Void> runCompletedFuture )
3941
{
@@ -107,6 +109,6 @@ private static long extractResultAvailableAfter( Map<String,Value> metadata )
107109
{
108110
return resultAvailableAfterValue.asLong();
109111
}
110-
return -1;
112+
return UNKNOWN;
111113
}
112114
}

driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,25 +122,15 @@ public void shouldWriteForceReleaseInEventLoopThread() throws Exception
122122
}
123123

124124
@Test
125-
public void shouldNotEnableAutoReadWhenReleased()
125+
public void shouldEnableAutoReadWhenReleased()
126126
{
127127
EmbeddedChannel channel = new EmbeddedChannel();
128128
channel.config().setAutoRead( false );
129129

130130
NettyConnection connection = newConnection( channel );
131131

132132
connection.release();
133-
134-
try
135-
{
136-
connection.enableAutoRead();
137-
fail( "Exception expected" );
138-
}
139-
catch ( IllegalStateException e )
140-
{
141-
assertConnectionReleasedError( e );
142-
}
143-
assertFalse( channel.config().isAutoRead() );
133+
assertTrue( channel.config().isAutoRead() );
144134
}
145135

146136
@Test
@@ -152,16 +142,7 @@ public void shouldNotDisableAutoReadWhenReleased()
152142
NettyConnection connection = newConnection( channel );
153143

154144
connection.release();
155-
156-
try
157-
{
158-
connection.disableAutoRead();
159-
fail( "Exception expected" );
160-
}
161-
catch ( IllegalStateException e )
162-
{
163-
assertConnectionReleasedError( e );
164-
}
145+
connection.disableAutoRead(); // does nothing on released connection
165146
assertTrue( channel.config().isAutoRead() );
166147
}
167148

0 commit comments

Comments
 (0)