Skip to content

Commit 214725d

Browse files
committed
Mute ACK_FAILURE when sending RESET
It's needed because RESET makes server terminate all running messages and then moves state machine to idle READY state. That's why it does not expect clients to ACK_FAILURE for terminated messages. Absence of this muting caused server to sometimes fail saying: "ACK_FAILURE cannot be handled by a session in the READY state". Also removed some unused blocking API classes and better naming for connection methods.
1 parent 1e805d0 commit 214725d

21 files changed

+206
-248
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
5252
@Override
5353
public CompletionStage<Void> verifyConnectivity()
5454
{
55-
return acquireConnection( READ ).thenCompose( Connection::forceRelease );
55+
return acquireConnection( READ ).thenCompose( Connection::releaseNow );
5656
}
5757

5858
@Override

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ private BiConsumer<Void,Throwable> transactionClosed( State newState )
192192
{
193193
// todo: test that this state transition always happens when commit or rollback
194194
state = newState;
195-
connection.release();
195+
connection.releaseInBackground();
196196
session.setBookmark( bookmark );
197197
};
198198
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,11 +492,12 @@ private CompletionStage<Void> forceReleaseConnection()
492492
{
493493
if ( connection != null )
494494
{
495-
return connection.forceRelease();
495+
return connection.releaseNow();
496496
}
497497
return completedFuture( null );
498498
} ).exceptionally( error ->
499499
{
500+
// todo: this log message looks wrong, should it go to #rollbackTransaction() ?
500501
logger.error( "Failed to rollback active transaction", error );
501502
return null;
502503
} );

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
import java.util.Map;
2727

28-
import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler;
28+
import org.neo4j.driver.internal.handlers.InitResponseHandler;
2929
import org.neo4j.driver.internal.messaging.InitMessage;
3030
import org.neo4j.driver.v1.Value;
3131

@@ -54,7 +54,7 @@ public void operationComplete( ChannelFuture future )
5454
Channel channel = future.channel();
5555

5656
InitMessage message = new InitMessage( userAgent, authToken );
57-
AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise );
57+
InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise );
5858

5959
messageDispatcher( channel ).queue( handler );
6060
channel.writeAndFlush( message, channel.voidPromise() );

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicBoolean;
2828

2929
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
30+
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
3031
import org.neo4j.driver.internal.messaging.Message;
3132
import org.neo4j.driver.internal.messaging.PullAllMessage;
3233
import org.neo4j.driver.internal.messaging.ResetMessage;
@@ -61,6 +62,12 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
6162
this.clock = clock;
6263
}
6364

65+
@Override
66+
public boolean isInUse()
67+
{
68+
return state.isInUse();
69+
}
70+
6471
@Override
6572
public boolean tryMarkInUse()
6673
{
@@ -99,29 +106,22 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
99106
run( statement, parameters, runHandler, pullAllHandler, true );
100107
}
101108

102-
// TODO change this to return future or say that it does stuff in the background in the method name?
103109
@Override
104-
public void release()
110+
public void releaseInBackground()
105111
{
106112
if ( state.release() )
107113
{
108-
reset( new ReleaseChannelHandler( channel, channelPool, clock ) );
114+
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
109115
}
110116
}
111117

112118
@Override
113-
public boolean isInUse()
114-
{
115-
return state.isInUse();
116-
}
117-
118-
@Override
119-
public CompletionStage<Void> forceRelease()
119+
public CompletionStage<Void> releaseNow()
120120
{
121121
if ( state.forceRelease() )
122122
{
123123
Promise<Void> releasePromise = channel.eventLoop().newPromise();
124-
reset( new ReleaseChannelHandler( channel, channelPool, clock, releasePromise ) );
124+
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
125125
return asCompletionStage( releasePromise );
126126
}
127127
else
@@ -151,7 +151,11 @@ private void run( String statement, Map<String,Value> parameters, ResponseHandle
151151

152152
private void reset( ResponseHandler resetHandler )
153153
{
154-
writeAndFlushMessageInEventLoop( ResetMessage.RESET, resetHandler );
154+
channel.eventLoop().execute( () ->
155+
{
156+
messageDispatcher.muteAckFailure();
157+
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
158+
} );
155159
}
156160

157161
private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2,
@@ -160,11 +164,6 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
160164
channel.eventLoop().execute( () -> writeMessages( message1, handler1, message2, handler2, flush ) );
161165
}
162166

163-
private void writeAndFlushMessageInEventLoop( Message message, ResponseHandler handler )
164-
{
165-
channel.eventLoop().execute( () -> writeAndFlushMessage( message, handler ) );
166-
}
167-
168167
private void writeMessages( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2,
169168
boolean flush )
170169
{

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
7676
}
7777

7878
@Override
79-
public void release()
79+
public void releaseInBackground()
8080
{
81-
delegate.release();
81+
delegate.releaseInBackground();
8282
}
8383

8484
@Override
@@ -88,9 +88,9 @@ public boolean isInUse()
8888
}
8989

9090
@Override
91-
public CompletionStage<Void> forceRelease()
91+
public CompletionStage<Void> releaseNow()
9292
{
93-
return delegate.forceRelease();
93+
return delegate.releaseNow();
9494
}
9595

9696
@Override

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Queue;
2727

2828
import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
29-
import org.neo4j.driver.internal.messaging.AckFailureMessage;
3029
import org.neo4j.driver.internal.messaging.MessageHandler;
3130
import org.neo4j.driver.internal.spi.ResponseHandler;
3231
import org.neo4j.driver.internal.util.ErrorUtil;
@@ -35,6 +34,7 @@
3534
import org.neo4j.driver.v1.Value;
3635

3736
import static java.util.Objects.requireNonNull;
37+
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
3838

3939
public class InboundMessageDispatcher implements MessageHandler
4040
{
@@ -44,6 +44,7 @@ public class InboundMessageDispatcher implements MessageHandler
4444

4545
private Throwable currentError;
4646
private boolean fatalErrorOccurred;
47+
private boolean ackFailureMuted;
4748

4849
public InboundMessageDispatcher( Channel channel, Logging logging )
4950
{
@@ -133,9 +134,8 @@ public void handleFailureMessage( String code, String message )
133134
log.debug( "Received FAILURE message with code '%s' and message '%s'", code, message );
134135
currentError = ErrorUtil.newNeo4jError( code, message );
135136

136-
// queue ACK_FAILURE before notifying the next response handler
137-
queue( new AckFailureResponseHandler( this ) );
138-
channel.writeAndFlush( AckFailureMessage.ACK_FAILURE, channel.voidPromise() );
137+
// try to write ACK_FAILURE before notifying the next response handler
138+
ackFailureIfNeeded();
139139

140140
ResponseHandler handler = handlers.remove();
141141
handler.onFailure( currentError );
@@ -179,4 +179,39 @@ public Throwable currentError()
179179
{
180180
return currentError;
181181
}
182+
183+
/**
184+
* Makes this message dispatcher not send ACK_FAILURE in response to FAILURE until it's un-muted using
185+
* {@link #unMuteAckFailure()}. Muting ACK_FAILURE is needed <b>only</b> when sending RESET message. RESET "jumps"
186+
* over all queued messages on server and makes them fail. Received failures do not need to be acknowledge because
187+
* RESET moves server's state machine to READY state.
188+
*/
189+
public void muteAckFailure()
190+
{
191+
ackFailureMuted = true;
192+
}
193+
194+
/**
195+
* Makes this message dispatcher send ACK_FAILURE in response to FAILURE. Should be used in combination with
196+
* {@link #muteAckFailure()} when sending RESET message.
197+
*
198+
* @throws IllegalStateException if ACK_FAILURE is not muted right now.
199+
*/
200+
public void unMuteAckFailure()
201+
{
202+
if ( !ackFailureMuted )
203+
{
204+
throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" );
205+
}
206+
ackFailureMuted = false;
207+
}
208+
209+
private void ackFailureIfNeeded()
210+
{
211+
if ( !ackFailureMuted )
212+
{
213+
queue( new AckFailureResponseHandler( this ) );
214+
channel.writeAndFlush( ACK_FAILURE, channel.voidPromise() );
215+
}
216+
}
182217
}

driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java

Lines changed: 0 additions & 93 deletions
This file was deleted.

0 commit comments

Comments
 (0)