Skip to content

Commit 5c968ca

Browse files
committed
Ensure a single handler manages channel's auto-read
`PullAllResponseHandler` is responsible for receiving a stream of records. Stream might be very large and handler tries to not fetch if fully into memory. It applies a network-level backpressure by disabling/enabling auto-read property of the underlying Netty channel. This property tells channel to not read from the network. Auto-read is disabled when number of queued records goes beyond the threshold. Management of auto-read turned out to be problematic for nested queries within a single transaction. Nested queries result in multiple `PullAllResponseHandler`s being added to the queue of handlers. They will try to manage auto-read concurrently and can sometimes disable it completely. Callers would then be blocked and unable to proceed. This is not a problem for `Session#run()` because every such call ensures there was a logical SYNC and the previous query has completed and its result is buffered. This commit fixes a problem by making only a single installed handler manage the channel's auto-read property. Making sure there only exists a single such handler is the responsibility of `InboundMessageDispatcher`. Auto-read is enabled when new auto-read managing handler is installed. Previous such handler is disabled.
1 parent 47dc7fa commit 5c968ca

File tree

6 files changed

+297
-6
lines changed

6 files changed

+297
-6
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
2929
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
3030
import org.neo4j.driver.internal.messaging.MessageHandler;
31+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3132
import org.neo4j.driver.internal.spi.ResponseHandler;
3233
import org.neo4j.driver.internal.util.ErrorUtil;
3334
import org.neo4j.driver.v1.Logger;
@@ -48,6 +49,8 @@ public class InboundMessageDispatcher implements MessageHandler
4849
private boolean fatalErrorOccurred;
4950
private boolean ackFailureMuted;
5051

52+
private AutoReadManagingResponseHandler autoReadManagingHandler;
53+
5154
public InboundMessageDispatcher( Channel channel, Logging logging )
5255
{
5356
this.channel = requireNonNull( channel );
@@ -63,6 +66,7 @@ public void enqueue( ResponseHandler handler )
6366
else
6467
{
6568
handlers.add( handler );
69+
updateAutoReadManagingHandlerIfNeeded( handler );
6670
}
6771
}
6872

@@ -249,4 +253,20 @@ private void ackFailureIfNeeded()
249253
channel.writeAndFlush( ACK_FAILURE, channel.voidPromise() );
250254
}
251255
}
256+
257+
private void updateAutoReadManagingHandlerIfNeeded( ResponseHandler handler )
258+
{
259+
if ( handler instanceof AutoReadManagingResponseHandler )
260+
{
261+
if ( autoReadManagingHandler != null )
262+
{
263+
// there already exists a handler that manages channel's auto-read
264+
// make it stop because new managing handler is being added and there should only be a single such handler
265+
autoReadManagingHandler.disableAutoReadManagement();
266+
// restore the original auto-read value
267+
channel.config().setAutoRead( true );
268+
}
269+
autoReadManagingHandler = (AutoReadManagingResponseHandler) handler;
270+
}
271+
}
252272
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.concurrent.CompletionStage;
2828

2929
import org.neo4j.driver.internal.InternalRecord;
30+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3031
import org.neo4j.driver.internal.spi.Connection;
31-
import org.neo4j.driver.internal.spi.ResponseHandler;
3232
import org.neo4j.driver.internal.util.Futures;
3333
import org.neo4j.driver.internal.util.Iterables;
3434
import org.neo4j.driver.internal.util.MetadataUtil;
@@ -44,7 +44,7 @@
4444
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4545
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4646

47-
public abstract class PullAllResponseHandler implements ResponseHandler
47+
public abstract class PullAllResponseHandler implements AutoReadManagingResponseHandler
4848
{
4949
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
5050

@@ -58,6 +58,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5858
// initialized lazily when first record arrives
5959
private Queue<Record> records = UNINITIALIZED_RECORDS;
6060

61+
private boolean autoReadManagementEnabled = true;
6162
private boolean finished;
6263
private Throwable failure;
6364
private ResultSummary summary;
@@ -129,6 +130,12 @@ public synchronized void onRecord( Value[] fields )
129130
}
130131
}
131132

133+
@Override
134+
public synchronized void disableAutoReadManagement()
135+
{
136+
autoReadManagementEnabled = false;
137+
}
138+
132139
public synchronized CompletionStage<Record> peekAsync()
133140
{
134141
Record record = records.peek();
@@ -209,7 +216,7 @@ else if ( finished )
209216
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
210217
// future will be completed with null on SUCCESS and completed with Throwable on FAILURE
211218
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
212-
connection.enableAutoRead();
219+
enableAutoRead();
213220
failureFuture = new CompletableFuture<>();
214221
}
215222
return failureFuture;
@@ -234,7 +241,7 @@ private void enqueueRecord( Record record )
234241
// more than high watermark records are already queued, tell connection to stop auto-reading from network
235242
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
236243
// fetched from network faster than consumed
237-
connection.disableAutoRead();
244+
disableAutoRead();
238245
}
239246
}
240247

@@ -246,7 +253,7 @@ private Record dequeueRecord()
246253
{
247254
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
248255
// and populate queue with new records from network
249-
connection.enableAutoRead();
256+
enableAutoRead();
250257
}
251258

252259
return record;
@@ -319,4 +326,20 @@ private ResultSummary extractResultSummary( Map<String,Value> metadata )
319326
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
320327
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata );
321328
}
329+
330+
private void enableAutoRead()
331+
{
332+
if ( autoReadManagementEnabled )
333+
{
334+
connection.enableAutoRead();
335+
}
336+
}
337+
338+
private void disableAutoRead()
339+
{
340+
if ( autoReadManagementEnabled )
341+
{
342+
connection.disableAutoRead();
343+
}
344+
}
322345
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.spi;
20+
21+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
22+
23+
/**
24+
* A type of {@link ResponseHandler handler} that manages auto-read of the underlying connection using {@link Connection#enableAutoRead()} and
25+
* {@link Connection#disableAutoRead()}.
26+
* <p>
27+
* Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
28+
* There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
29+
* racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
30+
* a single auto-read managing handler per connection.
31+
*/
32+
public interface AutoReadManagingResponseHandler extends ResponseHandler
33+
{
34+
/**
35+
* Tell this handler that it should stop changing auto-read setting for the connection.
36+
*/
37+
void disableAutoReadManagement();
38+
}

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.async.inbound;
2020

2121
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelConfig;
2223
import org.junit.Test;
2324
import org.mockito.ArgumentCaptor;
2425
import org.mockito.InOrder;
@@ -27,6 +28,7 @@
2728
import java.util.HashMap;
2829
import java.util.Map;
2930

31+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3032
import org.neo4j.driver.internal.spi.ResponseHandler;
3133
import org.neo4j.driver.internal.value.IntegerValue;
3234
import org.neo4j.driver.v1.Value;
@@ -49,6 +51,7 @@
4951
import static org.mockito.Mockito.times;
5052
import static org.mockito.Mockito.verify;
5153
import static org.mockito.Mockito.verifyZeroInteractions;
54+
import static org.mockito.Mockito.when;
5255
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5356
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
5457
import static org.neo4j.driver.v1.Values.value;
@@ -443,6 +446,25 @@ public void shouldMuteAndUnMuteAckFailure()
443446
assertFalse( dispatcher.isAckFailureMuted() );
444447
}
445448

449+
@Test
450+
public void shouldKeepSingleAutoReadManagingHandler()
451+
{
452+
InboundMessageDispatcher dispatcher = newDispatcher();
453+
454+
AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
455+
AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
456+
AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
457+
458+
dispatcher.enqueue( handler1 );
459+
dispatcher.enqueue( handler2 );
460+
dispatcher.enqueue( handler3 );
461+
462+
InOrder inOrder = inOrder( handler1, handler2, handler3 );
463+
inOrder.verify( handler1 ).disableAutoReadManagement();
464+
inOrder.verify( handler2 ).disableAutoReadManagement();
465+
inOrder.verify( handler3, never() ).disableAutoReadManagement();
466+
}
467+
446468
private static void verifyFailure( ResponseHandler handler )
447469
{
448470
ArgumentCaptor<Neo4jException> captor = ArgumentCaptor.forClass( Neo4jException.class );
@@ -453,7 +475,10 @@ private static void verifyFailure( ResponseHandler handler )
453475

454476
private static InboundMessageDispatcher newDispatcher()
455477
{
456-
return newDispatcher( mock( Channel.class ) );
478+
Channel channel = mock( Channel.class );
479+
ChannelConfig channelConfig = mock( ChannelConfig.class );
480+
when( channel.config() ).thenReturn( channelConfig );
481+
return newDispatcher( channel );
457482
}
458483

459484
private static InboundMessageDispatcher newDispatcher( Channel channel )

driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,21 @@ public void shouldEnableAutoReadOnConnectionWhenSummaryRequestedButNotAvailable(
823823
assertNotNull( summaryFuture.get() );
824824
}
825825

826+
@Test
827+
public void shouldNotDisableAutoReadWhenAutoReadManagementDisabled()
828+
{
829+
Connection connection = connectionMock();
830+
PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ), connection );
831+
handler.disableAutoReadManagement();
832+
833+
for ( int i = 0; i < PullAllResponseHandler.RECORD_BUFFER_HIGH_WATERMARK + 1; i++ )
834+
{
835+
handler.onRecord( values( 100, 200 ) );
836+
}
837+
838+
verify( connection, never() ).disableAutoRead();
839+
}
840+
826841
@Test
827842
public void shouldPropagateFailureFromListAsync()
828843
{

0 commit comments

Comments
 (0)