Skip to content

Commit b33d284

Browse files
authored
Merge pull request #528 from lutovich/1.6-tx-with-nested-queries
Ensure a single handler manages channel's auto-read
2 parents b639b18 + 050a0c3 commit b33d284

File tree

12 files changed

+423
-44
lines changed

12 files changed

+423
-44
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void operationComplete( ChannelFuture future )
5656
InitMessage message = new InitMessage( userAgent, authToken );
5757
InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise );
5858

59-
messageDispatcher( channel ).queue( handler );
59+
messageDispatcher( channel ).enqueue( handler );
6060
channel.writeAndFlush( message, channel.voidPromise() );
6161
}
6262
else

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
199199
private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
200200
boolean flush )
201201
{
202-
messageDispatcher.queue( handler1 );
203-
messageDispatcher.queue( handler2 );
202+
messageDispatcher.enqueue( handler1 );
203+
messageDispatcher.enqueue( handler2 );
204204

205205
channel.write( message1, channel.voidPromise() );
206206

@@ -216,7 +216,7 @@ private void writeMessages( Message message1, ResponseHandler handler1, Message
216216

217217
private void writeAndFlushMessage( Message message, ResponseHandler handler )
218218
{
219-
messageDispatcher.queue( handler );
219+
messageDispatcher.enqueue( handler );
220220
channel.writeAndFlush( message, channel.voidPromise() );
221221
}
222222

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
2929
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
3030
import org.neo4j.driver.internal.messaging.MessageHandler;
31+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3132
import org.neo4j.driver.internal.spi.ResponseHandler;
3233
import org.neo4j.driver.internal.util.ErrorUtil;
3334
import org.neo4j.driver.v1.Logger;
@@ -48,13 +49,15 @@ public class InboundMessageDispatcher implements MessageHandler
4849
private boolean fatalErrorOccurred;
4950
private boolean ackFailureMuted;
5051

52+
private AutoReadManagingResponseHandler autoReadManagingHandler;
53+
5154
public InboundMessageDispatcher( Channel channel, Logging logging )
5255
{
5356
this.channel = requireNonNull( channel );
5457
this.log = new ChannelActivityLogger( channel, logging, getClass() );
5558
}
5659

57-
public void queue( ResponseHandler handler )
60+
public void enqueue( ResponseHandler handler )
5861
{
5962
if ( fatalErrorOccurred )
6063
{
@@ -63,6 +66,7 @@ public void queue( ResponseHandler handler )
6366
else
6467
{
6568
handlers.add( handler );
69+
updateAutoReadManagingHandlerIfNeeded( handler );
6670
}
6771
}
6872

@@ -115,7 +119,7 @@ public void handleAckFailureMessage()
115119
public void handleSuccessMessage( Map<String,Value> meta )
116120
{
117121
log.debug( "S: SUCCESS %s", meta );
118-
ResponseHandler handler = handlers.remove();
122+
ResponseHandler handler = removeHandler();
119123
handler.onSuccess( meta );
120124
}
121125

@@ -148,7 +152,7 @@ public void handleFailureMessage( String code, String message )
148152
// try to write ACK_FAILURE before notifying the next response handler
149153
ackFailureIfNeeded();
150154

151-
ResponseHandler handler = handlers.remove();
155+
ResponseHandler handler = removeHandler();
152156
handler.onFailure( currentError );
153157
}
154158

@@ -157,7 +161,7 @@ public void handleIgnoredMessage()
157161
{
158162
log.debug( "S: IGNORED" );
159163

160-
ResponseHandler handler = handlers.remove();
164+
ResponseHandler handler = removeHandler();
161165

162166
Throwable error;
163167
if ( currentError != null )
@@ -185,7 +189,7 @@ public void handleFatalError( Throwable error )
185189

186190
while ( !handlers.isEmpty() )
187191
{
188-
ResponseHandler handler = handlers.remove();
192+
ResponseHandler handler = removeHandler();
189193
handler.onFailure( currentError );
190194
}
191195
}
@@ -241,12 +245,53 @@ public boolean isAckFailureMuted()
241245
return ackFailureMuted;
242246
}
243247

248+
/**
249+
* <b>Visible for testing</b>
250+
*/
251+
AutoReadManagingResponseHandler autoReadManagingHandler()
252+
{
253+
return autoReadManagingHandler;
254+
}
255+
244256
private void ackFailureIfNeeded()
245257
{
246258
if ( !ackFailureMuted )
247259
{
248-
queue( new AckFailureResponseHandler( this ) );
260+
enqueue( new AckFailureResponseHandler( this ) );
249261
channel.writeAndFlush( ACK_FAILURE, channel.voidPromise() );
250262
}
251263
}
264+
265+
private ResponseHandler removeHandler()
266+
{
267+
ResponseHandler handler = handlers.remove();
268+
if ( handler == autoReadManagingHandler )
269+
{
270+
// the auto-read managing handler is being removed
271+
// make sure this dispatcher does not hold on to a removed handler
272+
updateAutoReadManagingHandler( null );
273+
}
274+
return handler;
275+
}
276+
277+
private void updateAutoReadManagingHandlerIfNeeded( ResponseHandler handler )
278+
{
279+
if ( handler instanceof AutoReadManagingResponseHandler )
280+
{
281+
updateAutoReadManagingHandler( (AutoReadManagingResponseHandler) handler );
282+
}
283+
}
284+
285+
private void updateAutoReadManagingHandler( AutoReadManagingResponseHandler newHandler )
286+
{
287+
if ( autoReadManagingHandler != null )
288+
{
289+
// there already exists a handler that manages channel's auto-read
290+
// make it stop because new managing handler is being added and there should only be a single such handler
291+
autoReadManagingHandler.disableAutoReadManagement();
292+
// restore the default value of auto-read
293+
channel.config().setAutoRead( true );
294+
}
295+
autoReadManagingHandler = newHandler;
296+
}
252297
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private boolean hasBeenIdleForTooLong( Channel channel )
103103
private Future<Boolean> ping( Channel channel )
104104
{
105105
Promise<Boolean> result = channel.eventLoop().newPromise();
106-
messageDispatcher( channel ).queue( new PingResponseHandler( result, channel, log ) );
106+
messageDispatcher( channel ).enqueue( new PingResponseHandler( result, channel, log ) );
107107
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
108108
return result;
109109
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.concurrent.CompletionStage;
2828

2929
import org.neo4j.driver.internal.InternalRecord;
30+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3031
import org.neo4j.driver.internal.spi.Connection;
31-
import org.neo4j.driver.internal.spi.ResponseHandler;
3232
import org.neo4j.driver.internal.util.Futures;
3333
import org.neo4j.driver.internal.util.Iterables;
3434
import org.neo4j.driver.internal.util.MetadataUtil;
@@ -44,7 +44,7 @@
4444
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4545
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4646

47-
public abstract class PullAllResponseHandler implements ResponseHandler
47+
public abstract class PullAllResponseHandler implements AutoReadManagingResponseHandler
4848
{
4949
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
5050

@@ -58,6 +58,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5858
// initialized lazily when first record arrives
5959
private Queue<Record> records = UNINITIALIZED_RECORDS;
6060

61+
private boolean autoReadManagementEnabled = true;
6162
private boolean finished;
6263
private Throwable failure;
6364
private ResultSummary summary;
@@ -129,6 +130,12 @@ public synchronized void onRecord( Value[] fields )
129130
}
130131
}
131132

133+
@Override
134+
public synchronized void disableAutoReadManagement()
135+
{
136+
autoReadManagementEnabled = false;
137+
}
138+
132139
public synchronized CompletionStage<Record> peekAsync()
133140
{
134141
Record record = records.peek();
@@ -209,7 +216,7 @@ else if ( finished )
209216
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
210217
// future will be completed with null on SUCCESS and completed with Throwable on FAILURE
211218
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
212-
connection.enableAutoRead();
219+
enableAutoRead();
213220
failureFuture = new CompletableFuture<>();
214221
}
215222
return failureFuture;
@@ -234,7 +241,7 @@ private void enqueueRecord( Record record )
234241
// more than high watermark records are already queued, tell connection to stop auto-reading from network
235242
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
236243
// fetched from network faster than consumed
237-
connection.disableAutoRead();
244+
disableAutoRead();
238245
}
239246
}
240247

@@ -246,7 +253,7 @@ private Record dequeueRecord()
246253
{
247254
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
248255
// and populate queue with new records from network
249-
connection.enableAutoRead();
256+
enableAutoRead();
250257
}
251258

252259
return record;
@@ -319,4 +326,20 @@ private ResultSummary extractResultSummary( Map<String,Value> metadata )
319326
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
320327
return MetadataUtil.extractSummary( statement, connection, resultAvailableAfter, metadata );
321328
}
329+
330+
private void enableAutoRead()
331+
{
332+
if ( autoReadManagementEnabled )
333+
{
334+
connection.enableAutoRead();
335+
}
336+
}
337+
338+
private void disableAutoRead()
339+
{
340+
if ( autoReadManagementEnabled )
341+
{
342+
connection.disableAutoRead();
343+
}
344+
}
322345
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.spi;
20+
21+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
22+
23+
/**
24+
* A type of {@link ResponseHandler handler} that manages auto-read of the underlying connection using {@link Connection#enableAutoRead()} and
25+
* {@link Connection#disableAutoRead()}.
26+
* <p>
27+
* Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
28+
* There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
29+
* racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
30+
* a single auto-read managing handler per connection.
31+
*/
32+
public interface AutoReadManagingResponseHandler extends ResponseHandler
33+
{
34+
/**
35+
* Tell this handler that it should stop changing auto-read setting for the connection.
36+
*/
37+
void disableAutoReadManagement();
38+
}

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void shouldWriteInitMessageWhenHandshakeCompleted()
9494
listener.operationComplete( handshakeCompletedPromise );
9595
assertTrue( channel.finish() );
9696

97-
verify( messageDispatcher ).queue( any( InitResponseHandler.class ) );
97+
verify( messageDispatcher ).enqueue( any( InitResponseHandler.class ) );
9898
Object outboundMessage = channel.readOutbound();
9999
assertThat( outboundMessage, instanceOf( InitMessage.class ) );
100100
InitMessage initMessage = (InitMessage) outboundMessage;

driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,10 +521,10 @@ private static class ThreadTrackingInboundMessageDispatcher extends InboundMessa
521521
}
522522

523523
@Override
524-
public void queue( ResponseHandler handler )
524+
public void enqueue( ResponseHandler handler )
525525
{
526526
queueThreadNames.add( Thread.currentThread().getName() );
527-
super.queue( handler );
527+
super.enqueue( handler );
528528
}
529529

530530
}

0 commit comments

Comments
 (0)