Skip to content

Commit 97e5f29

Browse files
committed
Perform writes fully in event loop
`NettyConnection` is the abstraction over netty channel that allows us to write RUN, PULL_ALL and other messages. Currently write consists of two steps: (1) enqueue a response handler (2) perform actual write. Step (1) is executed in the caller thread and step (2) is always executed in the event loop thread (guaranteed by Netty). Step (1) involves modification of a non-synchronized `LinkedList` which is a problem. Later event loop thread will try to pop handlers from the same list meaning we access it from different threads. This commit make both (1) and (2) execute in the event loop to avoid synchronization issues. It basically makes sure `InboundMessageDispatcher` is only accessed by the event loop. Also removed `ConcurrentSet` implementation in favour of the one existing in Netty.
1 parent d27f4f3 commit 97e5f29

File tree

8 files changed

+227
-111
lines changed

8 files changed

+227
-111
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.neo4j.driver.v1.exceptions.Neo4jException;
4646
import org.neo4j.driver.v1.types.TypeSystem;
4747

48+
import static java.util.Collections.emptyMap;
4849
import static java.util.concurrent.CompletableFuture.completedFuture;
4950
import static org.neo4j.driver.internal.async.Futures.failedFuture;
5051
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
@@ -120,19 +121,16 @@ public void begin( Bookmark initialBookmark )
120121

121122
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
122123
{
123-
Map<String,Value> parameters = initialBookmark.asBeginTransactionParameters();
124-
asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
125-
126124
if ( initialBookmark.isEmpty() )
127125
{
128-
asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
126+
asyncConnection.run( BEGIN_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, NoOpResponseHandler.INSTANCE );
129127
return completedFuture( this );
130128
}
131129
else
132130
{
133131
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
134-
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginFuture, this ) );
135-
asyncConnection.flush();
132+
asyncConnection.runAndFlush( BEGIN_QUERY, initialBookmark.asBeginTransactionParameters(),
133+
NoOpResponseHandler.INSTANCE, new BeginTxResponseHandler<>( beginFuture, this ) );
136134
return beginFuture;
137135
}
138136
}
@@ -258,10 +256,8 @@ private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
258256
private CompletionStage<Void> doCommitAsync()
259257
{
260258
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
261-
262-
asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
263-
asyncConnection.pullAll( new CommitTxResponseHandler( commitFuture, this ) );
264-
asyncConnection.flush();
259+
asyncConnection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
260+
new CommitTxResponseHandler( commitFuture, this ) );
265261

266262
return commitFuture.thenApply( ignore ->
267263
{
@@ -273,9 +269,8 @@ private CompletionStage<Void> doCommitAsync()
273269
private CompletionStage<Void> doRollbackAsync()
274270
{
275271
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
276-
asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
277-
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackFuture ) );
278-
asyncConnection.flush();
272+
asyncConnection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
273+
new RollbackTxResponseHandler( rollbackFuture ) );
279274

280275
return rollbackFuture.thenApply( ignore ->
281276
{

driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public interface AsyncConnection
3333

3434
void disableAutoRead();
3535

36-
void run( String statement, Map<String,Value> parameters, ResponseHandler handler );
36+
void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
37+
ResponseHandler pullAllHandler );
3738

38-
void pullAll( ResponseHandler handler );
39-
40-
void flush();
39+
void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
40+
ResponseHandler pullAllHandler );
4141

4242
void release();
4343

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.Channel;
22+
import io.netty.channel.EventLoop;
2223
import io.netty.channel.pool.ChannelPool;
2324
import io.netty.util.concurrent.Promise;
2425

@@ -74,7 +75,6 @@ public void enableAutoRead()
7475
{
7576
if ( autoReadEnabled.compareAndSet( false, true ) )
7677
{
77-
System.out.println( "=== enableAutoRead" );
7878
setAutoRead( true );
7979
}
8080
}
@@ -84,35 +84,30 @@ public void disableAutoRead()
8484
{
8585
if ( autoReadEnabled.compareAndSet( true, false ) )
8686
{
87-
System.out.println( "=== disableAutoRead" );
8887
setAutoRead( false );
8988
}
9089
}
9190

9291
@Override
93-
public void run( String statement, Map<String,Value> parameters, ResponseHandler handler )
92+
public void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
93+
ResponseHandler pullAllHandler )
9494
{
95-
write( new RunMessage( statement, parameters ), handler, false );
95+
run( statement, parameters, runHandler, pullAllHandler, false );
9696
}
9797

9898
@Override
99-
public void pullAll( ResponseHandler handler )
99+
public void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
100+
ResponseHandler pullAllHandler )
100101
{
101-
write( PullAllMessage.PULL_ALL, handler, false );
102-
}
103-
104-
@Override
105-
public void flush()
106-
{
107-
channel.flush();
102+
run( statement, parameters, runHandler, pullAllHandler, true );
108103
}
109104

110105
@Override
111106
public void release()
112107
{
113108
if ( state.release() )
114109
{
115-
write( ResetMessage.RESET, new ReleaseChannelHandler( channel, channelPool, clock ), true );
110+
reset( new ReleaseChannelHandler( channel, channelPool, clock ) );
116111
}
117112
}
118113

@@ -122,7 +117,7 @@ public CompletionStage<Void> forceRelease()
122117
if ( state.forceRelease() )
123118
{
124119
Promise<Void> releasePromise = channel.eventLoop().newPromise();
125-
write( ResetMessage.RESET, new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ), true );
120+
reset( new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ) );
126121
return asCompletionStage( releasePromise );
127122
}
128123
else
@@ -137,19 +132,71 @@ public ServerInfo serverInfo()
137132
return new InternalServerInfo( address( channel ), serverVersion( channel ) );
138133
}
139134

140-
private void write( Message message, ResponseHandler handler, boolean flush )
135+
private void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
136+
ResponseHandler pullAllHandler, boolean flush )
141137
{
142-
messageDispatcher.queue( handler );
138+
writeMessagesInEventLoop( new RunMessage( statement, parameters ), runHandler, PullAllMessage.PULL_ALL,
139+
pullAllHandler, flush );
140+
}
141+
142+
private void reset( ResponseHandler resetHandler )
143+
{
144+
writeAndFlushMessageInEventLoop( ResetMessage.RESET, resetHandler );
145+
}
146+
147+
private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2,
148+
ResponseHandler handler2, boolean flush )
149+
{
150+
EventLoop eventLoop = channel.eventLoop();
151+
152+
if ( eventLoop.inEventLoop() )
153+
{
154+
writeMessages( message1, handler1, message2, handler2, flush );
155+
}
156+
else
157+
{
158+
eventLoop.execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) );
159+
}
160+
}
161+
162+
private void writeAndFlushMessageInEventLoop( Message message, ResponseHandler handler )
163+
{
164+
EventLoop eventLoop = channel.eventLoop();
165+
166+
if ( eventLoop.inEventLoop() )
167+
{
168+
writeAndFlushMessage( message, handler );
169+
}
170+
else
171+
{
172+
eventLoop.execute( () -> writeAndFlushMessage( message, handler ) );
173+
}
174+
}
175+
176+
private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
177+
boolean flush )
178+
{
179+
messageDispatcher.queue( handler1 );
180+
messageDispatcher.queue( handler2 );
181+
182+
channel.write( message1 );
183+
143184
if ( flush )
144185
{
145-
channel.writeAndFlush( message );
186+
channel.writeAndFlush( message2 );
146187
}
147188
else
148189
{
149-
channel.write( message );
190+
channel.write( message2 );
150191
}
151192
}
152193

194+
private void writeAndFlushMessage( Message message, ResponseHandler handler )
195+
{
196+
messageDispatcher.queue( handler );
197+
channel.writeAndFlush( message );
198+
}
199+
153200
private void setAutoRead( boolean value )
154201
{
155202
channel.config().setAutoRead( value );

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ public static CompletionStage<StatementResultCursor> runAsync( AsyncConnection c
5454
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, tx );
5555
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
5656

57-
connection.run( query, params, runHandler );
58-
connection.pullAll( pullAllHandler );
59-
connection.flush();
57+
connection.runAndFlush( query, params, runHandler, pullAllHandler );
6058

6159
return runCompletedFuture.thenApply( ignore ->
6260
new InternalStatementResultCursor( runHandler, pullAllHandler ) );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.pool.ChannelPoolHandler;
23+
import io.netty.util.internal.ConcurrentSet;
2324

2425
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.ConcurrentMap;
2627

2728
import org.neo4j.driver.internal.net.BoltServerAddress;
28-
import org.neo4j.driver.internal.util.ConcurrentSet;
2929
import org.neo4j.driver.v1.Logger;
3030
import org.neo4j.driver.v1.Logging;
3131

driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentSet.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)