Skip to content

Commit c5e8da6

Browse files
authored
Merge pull request #504 from lutovich/1.6-failure-on-reset-and-ack-failure
Improve handling of RESET and ACK_FAILURE errors
2 parents 55d6fa4 + 4a4409d commit c5e8da6

File tree

9 files changed

+193
-9
lines changed

9 files changed

+193
-9
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,18 @@ public void unMuteAckFailure()
229229
ackFailureMuted = false;
230230
}
231231

232+
/**
233+
* Check if ACK_FAILURE is muted.
234+
* <p>
235+
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
236+
*
237+
* @return {@code true} if ACK_FAILURE has been muted via {@link #muteAckFailure()}, {@code false} otherwise.
238+
*/
239+
public boolean isAckFailureMuted()
240+
{
241+
return ackFailureMuted;
242+
}
243+
232244
private void ackFailureIfNeeded()
233245
{
234246
if ( !ackFailureMuted )

driver/src/main/java/org/neo4j/driver/internal/handlers/AckFailureResponseHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2424
import org.neo4j.driver.internal.spi.ResponseHandler;
2525
import org.neo4j.driver.v1.Value;
26+
import org.neo4j.driver.v1.exceptions.ClientException;
2627

2728
public class AckFailureResponseHandler implements ResponseHandler
2829
{
@@ -42,10 +43,21 @@ public void onSuccess( Map<String,Value> metadata )
4243
@Override
4344
public void onFailure( Throwable error )
4445
{
46+
if ( messageDispatcher.isAckFailureMuted() )
47+
{
48+
// RESET cancelled this ACK_FAILURE and made the database send an IGNORED message
49+
// this is not a protocol violation and database has all the connection stated cleared now
50+
messageDispatcher.clearCurrentError();
51+
}
52+
else
53+
{
54+
throw new ClientException( "Unable to acknowledge the previous error. Connection will be closed", error );
55+
}
4556
}
4657

4758
@Override
4859
public void onRecord( Value[] fields )
4960
{
61+
throw new UnsupportedOperationException();
5062
}
5163
}

driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,18 @@ public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool,
4545
}
4646

4747
@Override
48-
protected void resetCompleted( CompletableFuture<Void> completionFuture )
48+
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
4949
{
50-
setLastUsedTimestamp( channel, clock.millis() );
50+
if ( success )
51+
{
52+
// update the last-used timestamp before returning the channel back to the pool
53+
setLastUsedTimestamp( channel, clock.millis() );
54+
}
55+
else
56+
{
57+
// close the channel before returning it back to the pool if RESET failed
58+
channel.close();
59+
}
5160

5261
Future<Void> released = pool.release( channel );
5362
released.addListener( ignore -> completionFuture.complete( null ) );

driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ public ResetResponseHandler( InboundMessageDispatcher messageDispatcher, Complet
3939
@Override
4040
public final void onSuccess( Map<String,Value> metadata )
4141
{
42-
resetCompleted();
42+
resetCompleted( true );
4343
}
4444

4545
@Override
4646
public final void onFailure( Throwable error )
4747
{
48-
resetCompleted();
48+
resetCompleted( false );
4949
}
5050

5151
@Override
@@ -54,13 +54,13 @@ public final void onRecord( Value[] fields )
5454
throw new UnsupportedOperationException();
5555
}
5656

57-
private void resetCompleted()
57+
private void resetCompleted( boolean success )
5858
{
5959
messageDispatcher.unMuteAckFailure();
60-
resetCompleted( completionFuture );
60+
resetCompleted( completionFuture, success );
6161
}
6262

63-
protected void resetCompleted( CompletableFuture<Void> completionFuture )
63+
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
6464
{
6565
completionFuture.complete( null );
6666
}

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
import static java.util.Collections.emptyMap;
3737
import static org.hamcrest.Matchers.instanceOf;
3838
import static org.junit.Assert.assertEquals;
39+
import static org.junit.Assert.assertFalse;
3940
import static org.junit.Assert.assertNull;
4041
import static org.junit.Assert.assertThat;
42+
import static org.junit.Assert.assertTrue;
4143
import static org.junit.Assert.fail;
4244
import static org.mockito.Matchers.any;
4345
import static org.mockito.Matchers.eq;
@@ -428,6 +430,19 @@ public void shouldNotSupportAckFailureMessage()
428430
}
429431
}
430432

433+
@Test
434+
public void shouldMuteAndUnMuteAckFailure()
435+
{
436+
InboundMessageDispatcher dispatcher = newDispatcher();
437+
assertFalse( dispatcher.isAckFailureMuted() );
438+
439+
dispatcher.muteAckFailure();
440+
assertTrue( dispatcher.isAckFailureMuted() );
441+
442+
dispatcher.unMuteAckFailure();
443+
assertFalse( dispatcher.isAckFailureMuted() );
444+
}
445+
431446
private static void verifyFailure( ResponseHandler handler )
432447
{
433448
ArgumentCaptor<Neo4jException> captor = ArgumentCaptor.forClass( Neo4jException.class );
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.handlers;
20+
21+
import org.junit.Test;
22+
23+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
24+
import org.neo4j.driver.v1.Value;
25+
import org.neo4j.driver.v1.exceptions.ClientException;
26+
27+
import static java.util.Collections.emptyMap;
28+
import static org.junit.Assert.assertSame;
29+
import static org.junit.Assert.fail;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.never;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.when;
34+
35+
public class AckFailureResponseHandlerTest
36+
{
37+
private final InboundMessageDispatcher dispatcher = mock( InboundMessageDispatcher.class );
38+
private final AckFailureResponseHandler handler = new AckFailureResponseHandler( dispatcher );
39+
40+
@Test
41+
public void shouldClearCurrentErrorOnSuccess()
42+
{
43+
verify( dispatcher, never() ).clearCurrentError();
44+
handler.onSuccess( emptyMap() );
45+
verify( dispatcher ).clearCurrentError();
46+
}
47+
48+
@Test
49+
public void shouldThrowOnFailure()
50+
{
51+
RuntimeException error = new RuntimeException( "Unable to process ACK_FAILURE" );
52+
53+
try
54+
{
55+
handler.onFailure( error );
56+
fail( "Exception expected" );
57+
}
58+
catch ( ClientException e )
59+
{
60+
assertSame( error, e.getCause() );
61+
}
62+
}
63+
64+
@Test
65+
public void shouldClearCurrentErrorWhenAckFailureMutedAndFailureReceived()
66+
{
67+
RuntimeException error = new RuntimeException( "Some error" );
68+
when( dispatcher.isAckFailureMuted() ).thenReturn( true );
69+
70+
handler.onFailure( error );
71+
72+
verify( dispatcher ).clearCurrentError();
73+
}
74+
75+
@Test
76+
public void shouldThrowOnRecord()
77+
{
78+
try
79+
{
80+
handler.onRecord( new Value[0] );
81+
fail( "Exception expected" );
82+
}
83+
catch ( UnsupportedOperationException ignore )
84+
{
85+
}
86+
}
87+
}

driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void shouldReleaseChannelOnSuccess()
7171
}
7272

7373
@Test
74-
public void shouldReleaseChannelOnFailure()
74+
public void shouldCloseAndReleaseChannelOnFailure()
7575
{
7676
ChannelPool pool = newChannelPoolMock();
7777
FakeClock clock = new FakeClock();
@@ -81,7 +81,7 @@ public void shouldReleaseChannelOnFailure()
8181

8282
handler.onFailure( new RuntimeException() );
8383

84-
verifyLastUsedTimestamp( 100 );
84+
assertTrue( channel.closeFuture().isDone() );
8585
verify( pool ).release( eq( channel ) );
8686
assertTrue( releaseFuture.isDone() );
8787
assertFalse( releaseFuture.isCompletedExceptionally() );

driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
package org.neo4j.driver.v1.integration;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.Channel;
2223
import org.junit.After;
2324
import org.junit.Before;
2425
import org.junit.ClassRule;
2526
import org.junit.Test;
2627
import org.mockito.Mockito;
2728

29+
import java.net.URI;
2830
import java.util.List;
2931
import java.util.concurrent.CompletableFuture;
3032
import java.util.concurrent.CompletionStage;
@@ -41,8 +43,10 @@
4143
import org.neo4j.driver.internal.security.SecurityPlan;
4244
import org.neo4j.driver.internal.spi.Connection;
4345
import org.neo4j.driver.internal.spi.ConnectionPool;
46+
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
4447
import org.neo4j.driver.internal.util.Clock;
4548
import org.neo4j.driver.v1.AuthToken;
49+
import org.neo4j.driver.v1.AuthTokens;
4650
import org.neo4j.driver.v1.Config;
4751
import org.neo4j.driver.v1.Driver;
4852
import org.neo4j.driver.v1.Logging;
@@ -53,18 +57,22 @@
5357
import org.neo4j.driver.v1.Transaction;
5458
import org.neo4j.driver.v1.exceptions.ClientException;
5559
import org.neo4j.driver.v1.summary.ResultSummary;
60+
import org.neo4j.driver.v1.util.StubServer;
5661
import org.neo4j.driver.v1.util.TestNeo4j;
5762

63+
import static java.util.concurrent.TimeUnit.SECONDS;
5864
import static org.hamcrest.Matchers.instanceOf;
5965
import static org.junit.Assert.assertEquals;
6066
import static org.junit.Assert.assertNotNull;
67+
import static org.junit.Assert.assertNull;
6168
import static org.junit.Assert.assertSame;
6269
import static org.junit.Assert.assertThat;
6370
import static org.junit.Assert.fail;
6471
import static org.mockito.Mockito.atLeastOnce;
6572
import static org.mockito.Mockito.never;
6673
import static org.mockito.Mockito.spy;
6774
import static org.mockito.Mockito.verify;
75+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
6876
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
6977
import static org.neo4j.driver.v1.Config.defaultConfig;
7078
import static org.neo4j.driver.v1.Values.parameters;
@@ -280,6 +288,36 @@ public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToC
280288
verify( connection2 ).release();
281289
}
282290

291+
@Test
292+
public void shouldCloseChannelWhenResetFails() throws Exception
293+
{
294+
StubServer server = StubServer.start( "reset_error.script", 9001 );
295+
try
296+
{
297+
URI uri = URI.create( "bolt://localhost:9001" );
298+
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).withoutEncryption().toConfig();
299+
ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM );
300+
301+
try ( Driver driver = driverFactory.newInstance( uri, AuthTokens.none(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) )
302+
{
303+
try ( Session session = driver.session() )
304+
{
305+
assertEquals( 42, session.run( "RETURN 42 AS answer" ).single().get( 0 ).asInt() );
306+
}
307+
308+
List<Channel> channels = driverFactory.pollChannels();
309+
// there should be a single channel
310+
assertEquals( 1, channels.size() );
311+
// and it should be closed because it failed to RESET
312+
assertNull( channels.get( 0 ).closeFuture().get( 30, SECONDS ) );
313+
}
314+
}
315+
finally
316+
{
317+
assertEquals( 0, server.exitStatus() );
318+
}
319+
}
320+
283321
private StatementResult createNodesInNewSession( int nodesToCreate )
284322
{
285323
return createNodes( nodesToCreate, driver.session() );
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: AUTO INIT
2+
3+
C: RESET
4+
S: SUCCESS {}
5+
C: RUN "RETURN 42 AS answer" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["answer"]}
8+
RECORD [42]
9+
SUCCESS {}
10+
C: RESET
11+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to reset"}

0 commit comments

Comments
 (0)