Skip to content

Commit c79fc2d

Browse files
authored
Merge pull request #408 from lutovich/1.5-writes-in-evt-loop
Perform writes fully in event loop
2 parents 31e9046 + 97e5f29 commit c79fc2d

File tree

8 files changed

+227
-111
lines changed

8 files changed

+227
-111
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.neo4j.driver.v1.exceptions.Neo4jException;
4646
import org.neo4j.driver.v1.types.TypeSystem;
4747

48+
import static java.util.Collections.emptyMap;
4849
import static java.util.concurrent.CompletableFuture.completedFuture;
4950
import static org.neo4j.driver.internal.async.Futures.failedFuture;
5051
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
@@ -120,19 +121,16 @@ public void begin( Bookmark initialBookmark )
120121

121122
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
122123
{
123-
Map<String,Value> parameters = initialBookmark.asBeginTransactionParameters();
124-
asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
125-
126124
if ( initialBookmark.isEmpty() )
127125
{
128-
asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
126+
asyncConnection.run( BEGIN_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, NoOpResponseHandler.INSTANCE );
129127
return completedFuture( this );
130128
}
131129
else
132130
{
133131
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
134-
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginFuture, this ) );
135-
asyncConnection.flush();
132+
asyncConnection.runAndFlush( BEGIN_QUERY, initialBookmark.asBeginTransactionParameters(),
133+
NoOpResponseHandler.INSTANCE, new BeginTxResponseHandler<>( beginFuture, this ) );
136134
return beginFuture;
137135
}
138136
}
@@ -258,10 +256,8 @@ private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
258256
private CompletionStage<Void> doCommitAsync()
259257
{
260258
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
261-
262-
asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
263-
asyncConnection.pullAll( new CommitTxResponseHandler( commitFuture, this ) );
264-
asyncConnection.flush();
259+
asyncConnection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
260+
new CommitTxResponseHandler( commitFuture, this ) );
265261

266262
return commitFuture.thenApply( ignore ->
267263
{
@@ -273,9 +269,8 @@ private CompletionStage<Void> doCommitAsync()
273269
private CompletionStage<Void> doRollbackAsync()
274270
{
275271
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
276-
asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
277-
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackFuture ) );
278-
asyncConnection.flush();
272+
asyncConnection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
273+
new RollbackTxResponseHandler( rollbackFuture ) );
279274

280275
return rollbackFuture.thenApply( ignore ->
281276
{

driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public interface AsyncConnection
3333

3434
void disableAutoRead();
3535

36-
void run( String statement, Map<String,Value> parameters, ResponseHandler handler );
36+
void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
37+
ResponseHandler pullAllHandler );
3738

38-
void pullAll( ResponseHandler handler );
39-
40-
void flush();
39+
void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
40+
ResponseHandler pullAllHandler );
4141

4242
void release();
4343

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.Channel;
22+
import io.netty.channel.EventLoop;
2223
import io.netty.channel.pool.ChannelPool;
2324
import io.netty.util.concurrent.Promise;
2425

@@ -74,7 +75,6 @@ public void enableAutoRead()
7475
{
7576
if ( autoReadEnabled.compareAndSet( false, true ) )
7677
{
77-
System.out.println( "=== enableAutoRead" );
7878
setAutoRead( true );
7979
}
8080
}
@@ -84,35 +84,30 @@ public void disableAutoRead()
8484
{
8585
if ( autoReadEnabled.compareAndSet( true, false ) )
8686
{
87-
System.out.println( "=== disableAutoRead" );
8887
setAutoRead( false );
8988
}
9089
}
9190

9291
@Override
93-
public void run( String statement, Map<String,Value> parameters, ResponseHandler handler )
92+
public void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
93+
ResponseHandler pullAllHandler )
9494
{
95-
write( new RunMessage( statement, parameters ), handler, false );
95+
run( statement, parameters, runHandler, pullAllHandler, false );
9696
}
9797

9898
@Override
99-
public void pullAll( ResponseHandler handler )
99+
public void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
100+
ResponseHandler pullAllHandler )
100101
{
101-
write( PullAllMessage.PULL_ALL, handler, false );
102-
}
103-
104-
@Override
105-
public void flush()
106-
{
107-
channel.flush();
102+
run( statement, parameters, runHandler, pullAllHandler, true );
108103
}
109104

110105
@Override
111106
public void release()
112107
{
113108
if ( state.release() )
114109
{
115-
write( ResetMessage.RESET, new ReleaseChannelHandler( channel, channelPool, clock ), true );
110+
reset( new ReleaseChannelHandler( channel, channelPool, clock ) );
116111
}
117112
}
118113

@@ -122,7 +117,7 @@ public CompletionStage<Void> forceRelease()
122117
if ( state.forceRelease() )
123118
{
124119
Promise<Void> releasePromise = channel.eventLoop().newPromise();
125-
write( ResetMessage.RESET, new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ), true );
120+
reset( new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ) );
126121
return asCompletionStage( releasePromise );
127122
}
128123
else
@@ -137,19 +132,71 @@ public ServerInfo serverInfo()
137132
return new InternalServerInfo( address( channel ), serverVersion( channel ) );
138133
}
139134

140-
private void write( Message message, ResponseHandler handler, boolean flush )
135+
private void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
136+
ResponseHandler pullAllHandler, boolean flush )
141137
{
142-
messageDispatcher.queue( handler );
138+
writeMessagesInEventLoop( new RunMessage( statement, parameters ), runHandler, PullAllMessage.PULL_ALL,
139+
pullAllHandler, flush );
140+
}
141+
142+
private void reset( ResponseHandler resetHandler )
143+
{
144+
writeAndFlushMessageInEventLoop( ResetMessage.RESET, resetHandler );
145+
}
146+
147+
private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2,
148+
ResponseHandler handler2, boolean flush )
149+
{
150+
EventLoop eventLoop = channel.eventLoop();
151+
152+
if ( eventLoop.inEventLoop() )
153+
{
154+
writeMessages( message1, handler1, message2, handler2, flush );
155+
}
156+
else
157+
{
158+
eventLoop.execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) );
159+
}
160+
}
161+
162+
private void writeAndFlushMessageInEventLoop( Message message, ResponseHandler handler )
163+
{
164+
EventLoop eventLoop = channel.eventLoop();
165+
166+
if ( eventLoop.inEventLoop() )
167+
{
168+
writeAndFlushMessage( message, handler );
169+
}
170+
else
171+
{
172+
eventLoop.execute( () -> writeAndFlushMessage( message, handler ) );
173+
}
174+
}
175+
176+
private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
177+
boolean flush )
178+
{
179+
messageDispatcher.queue( handler1 );
180+
messageDispatcher.queue( handler2 );
181+
182+
channel.write( message1 );
183+
143184
if ( flush )
144185
{
145-
channel.writeAndFlush( message );
186+
channel.writeAndFlush( message2 );
146187
}
147188
else
148189
{
149-
channel.write( message );
190+
channel.write( message2 );
150191
}
151192
}
152193

194+
private void writeAndFlushMessage( Message message, ResponseHandler handler )
195+
{
196+
messageDispatcher.queue( handler );
197+
channel.writeAndFlush( message );
198+
}
199+
153200
private void setAutoRead( boolean value )
154201
{
155202
channel.config().setAutoRead( value );

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ public static CompletionStage<StatementResultCursor> runAsync( AsyncConnection c
5454
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, tx );
5555
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
5656

57-
connection.run( query, params, runHandler );
58-
connection.pullAll( pullAllHandler );
59-
connection.flush();
57+
connection.runAndFlush( query, params, runHandler, pullAllHandler );
6058

6159
return runCompletedFuture.thenApply( ignore ->
6260
new InternalStatementResultCursor( runHandler, pullAllHandler ) );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.pool.ChannelPoolHandler;
23+
import io.netty.util.internal.ConcurrentSet;
2324

2425
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.ConcurrentMap;
2627

2728
import org.neo4j.driver.internal.net.BoltServerAddress;
28-
import org.neo4j.driver.internal.util.ConcurrentSet;
2929
import org.neo4j.driver.v1.Logger;
3030
import org.neo4j.driver.v1.Logging;
3131

driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentSet.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)