Skip to content

Commit 49b26a4

Browse files
committed
Added authorization expired response handling
This update adds support for authorization expired response handling. When such error arrives the driver will mark all connections that were created on or before the creation timestamp of the receiving connection for disposal. The connections will be allowed to finish what they are doing and released back to the pool and will be disposed on the next acquire call. This update also makes sure that we always register the HELLO response handler so that we can handle the response appropriately. In addition, it adds the ROLLBACK message support to the driver backend.
1 parent 573e890 commit 49b26a4

19 files changed

+423
-54
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
/**
22+
* The authorization info maintained on the server has expired. The client should reconnect.
23+
* <p>
24+
* Error code: Neo.ClientError.Security.AuthorizationExpired
25+
*/
26+
public class AuthorizationExpiredException extends SecurityException
27+
{
28+
public AuthorizationExpiredException( String code, String message )
29+
{
30+
super( code, message );
31+
}
32+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.connection;
20+
21+
import io.netty.channel.Channel;
22+
23+
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
24+
25+
/**
26+
* Listener for authorization info state maintained on the server side.
27+
*/
28+
public interface AuthorizationStateListener
29+
{
30+
/**
31+
* Notifies the listener that the credentials stored on the server side have expired.
32+
*
33+
* @param e the {@link AuthorizationExpiredException} exception.
34+
* @param channel the channel that received the error.
35+
*/
36+
void onExpired( AuthorizationExpiredException e, Channel channel );
37+
}

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class ChannelAttributes
4040
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
4141
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
4242
private static final AttributeKey<String> TERMINATION_REASON = newInstance( "terminationReason" );
43+
private static final AttributeKey<AuthorizationStateListener> AUTHORIZATION_STATE_LISTENER = newInstance( "authorizationStateListener" );
4344

4445
private ChannelAttributes()
4546
{
@@ -145,6 +146,16 @@ public static void setTerminationReason( Channel channel, String reason )
145146
setOnce( channel, TERMINATION_REASON, reason );
146147
}
147148

149+
public static AuthorizationStateListener authorizationStateListener( Channel channel )
150+
{
151+
return get( channel, AUTHORIZATION_STATE_LISTENER );
152+
}
153+
154+
public static void setAuthorizationStateListener( Channel channel, AuthorizationStateListener authorizationStateListener )
155+
{
156+
set( channel, AUTHORIZATION_STATE_LISTENER, authorizationStateListener );
157+
}
158+
148159
private static <T> T get( Channel channel, AttributeKey<T> key )
149160
{
150161
return channel.attr( key ).get();

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,19 @@
2525
import java.util.Map;
2626
import java.util.Queue;
2727

28-
import org.neo4j.driver.exceptions.ServiceUnavailableException;
28+
import org.neo4j.driver.Logger;
29+
import org.neo4j.driver.Logging;
30+
import org.neo4j.driver.Value;
31+
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
32+
import org.neo4j.driver.exceptions.ClientException;
2933
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
3034
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
3135
import org.neo4j.driver.internal.messaging.ResponseMessageHandler;
3236
import org.neo4j.driver.internal.spi.ResponseHandler;
3337
import org.neo4j.driver.internal.util.ErrorUtil;
34-
import org.neo4j.driver.Logger;
35-
import org.neo4j.driver.Logging;
36-
import org.neo4j.driver.Value;
37-
import org.neo4j.driver.exceptions.ClientException;
3838

3939
import static java.util.Objects.requireNonNull;
40+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authorizationStateListener;
4041
import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET;
4142
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
4243

@@ -114,9 +115,17 @@ public void handleFailureMessage( String code, String message )
114115
return;
115116
}
116117

117-
// write a RESET to "acknowledge" the failure
118-
enqueue( new ResetResponseHandler( this ) );
119-
channel.writeAndFlush( RESET, channel.voidPromise() );
118+
Throwable currentError = this.currentError;
119+
if ( currentError instanceof AuthorizationExpiredException )
120+
{
121+
authorizationStateListener( channel ).onExpired( (AuthorizationExpiredException) currentError, channel );
122+
}
123+
else
124+
{
125+
// write a RESET to "acknowledge" the failure
126+
enqueue( new ResetResponseHandler( this ) );
127+
channel.writeAndFlush( RESET, channel.voidPromise() );
128+
}
120129

121130
ResponseHandler handler = removeHandler();
122131
handler.onFailure( currentError );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.neo4j.driver.internal.util.Futures;
4747

4848
import static java.lang.String.format;
49+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener;
4950
import static org.neo4j.driver.internal.util.Futures.combineErrors;
5051
import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError;
5152

@@ -66,19 +67,22 @@ public class ConnectionPoolImpl implements ConnectionPool
6667
private final ConnectionFactory connectionFactory;
6768

6869
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging,
69-
Clock clock, boolean ownsEventLoopGroup )
70+
Clock clock, boolean ownsEventLoopGroup )
7071
{
71-
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging,
72-
clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener ) );
72+
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ),
73+
new NettyChannelHealthChecker( settings, clock, logging ), settings, metricsListener, logging,
74+
clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener ) );
7375
}
7476

75-
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker, PoolSettings settings,
76-
MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup, ConnectionFactory connectionFactory )
77+
protected ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
78+
NettyChannelHealthChecker nettyChannelHealthChecker, PoolSettings settings,
79+
MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup,
80+
ConnectionFactory connectionFactory )
7781
{
7882
this.connector = connector;
7983
this.bootstrap = bootstrap;
8084
this.nettyChannelTracker = nettyChannelTracker;
81-
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
85+
this.channelHealthChecker = nettyChannelHealthChecker;
8286
this.settings = settings;
8387
this.metricsListener = metricsListener;
8488
this.log = logging.getLog( ConnectionPool.class.getSimpleName() );
@@ -104,6 +108,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
104108
{
105109
processAcquisitionError( pool, address, error );
106110
assertNotClosed( address, channel, pool );
111+
setAuthorizationStateListener( channel, channelHealthChecker );
107112
Connection connection = connectionFactory.createConnection( channel, pool );
108113

109114
metricsListener.afterAcquiredOrCreated( pool.id(), acquireEvent );

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,34 @@
2323
import io.netty.util.concurrent.Future;
2424
import io.netty.util.concurrent.Promise;
2525

26+
import java.util.Optional;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import org.neo4j.driver.Logger;
30+
import org.neo4j.driver.Logging;
31+
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
32+
import org.neo4j.driver.internal.async.connection.AuthorizationStateListener;
2633
import org.neo4j.driver.internal.handlers.PingResponseHandler;
2734
import org.neo4j.driver.internal.messaging.request.ResetMessage;
2835
import org.neo4j.driver.internal.util.Clock;
29-
import org.neo4j.driver.Logger;
30-
import org.neo4j.driver.Logging;
3136

3237
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.creationTimestamp;
3338
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.lastUsedTimestamp;
3439
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
3540

36-
public class NettyChannelHealthChecker implements ChannelHealthChecker
41+
public class NettyChannelHealthChecker implements ChannelHealthChecker, AuthorizationStateListener
3742
{
3843
private final PoolSettings poolSettings;
3944
private final Clock clock;
4045
private final Logger log;
46+
private final AtomicReference<Optional<Long>> minCreationTimestampMillisOpt;
4147

4248
public NettyChannelHealthChecker( PoolSettings poolSettings, Clock clock, Logging logging )
4349
{
4450
this.poolSettings = poolSettings;
4551
this.clock = clock;
4652
this.log = logging.getLog( getClass().getSimpleName() );
53+
this.minCreationTimestampMillisOpt = new AtomicReference<>( Optional.empty() );
4754
}
4855

4956
@Override
@@ -60,11 +67,27 @@ public Future<Boolean> isHealthy( Channel channel )
6067
return ACTIVE.isHealthy( channel );
6168
}
6269

70+
@Override
71+
public void onExpired( AuthorizationExpiredException e, Channel channel )
72+
{
73+
long ts = creationTimestamp( channel );
74+
// Override current value ONLY if the new one is greater
75+
minCreationTimestampMillisOpt.getAndUpdate( prev -> Optional.of( prev.filter( prevTs -> ts <= prevTs ).orElse( ts ) ) );
76+
}
77+
6378
private boolean isTooOld( Channel channel )
6479
{
65-
if ( poolSettings.maxConnectionLifetimeEnabled() )
80+
long creationTimestampMillis = creationTimestamp( channel );
81+
Optional<Long> minCreationTimestampMillisOpt = this.minCreationTimestampMillisOpt.get();
82+
83+
if ( minCreationTimestampMillisOpt.isPresent() && creationTimestampMillis <= minCreationTimestampMillisOpt.get() )
84+
{
85+
log.trace( "The channel %s is marked for closure as its creation timestamp is older than or equal to the acceptable minimum timestamp: %s <= %s",
86+
channel, creationTimestampMillis, minCreationTimestampMillisOpt.get() );
87+
return true;
88+
}
89+
else if ( poolSettings.maxConnectionLifetimeEnabled() )
6690
{
67-
long creationTimestampMillis = creationTimestamp( channel );
6891
long currentTimestampMillis = clock.millis();
6992

7093
long ageMillis = currentTimestampMillis - creationTimestampMillis;
@@ -74,7 +97,7 @@ private boolean isTooOld( Channel channel )
7497
if ( tooOld )
7598
{
7699
log.trace( "Failed acquire channel %s from the pool because it is too old: %s > %s",
77-
channel, ageMillis, maxAgeMillis );
100+
channel, ageMillis, maxAgeMillis );
78101
}
79102

80103
return tooOld;

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelPromise;
2323

24-
import java.util.Map;
2524
import java.util.concurrent.CompletableFuture;
2625
import java.util.concurrent.CompletionStage;
2726

2827
import org.neo4j.driver.AuthToken;
2928
import org.neo4j.driver.Bookmark;
3029
import org.neo4j.driver.Query;
3130
import org.neo4j.driver.TransactionConfig;
32-
import org.neo4j.driver.Value;
3331
import org.neo4j.driver.internal.BookmarkHolder;
3432
import org.neo4j.driver.internal.DatabaseName;
3533
import org.neo4j.driver.internal.async.UnmanagedTransaction;
@@ -123,19 +121,10 @@ public CompletionStage<Void> beginTransaction( Connection connection, Bookmark b
123121
return Futures.failedFuture( error );
124122
}
125123

124+
CompletableFuture<Void> beginTxFuture = new CompletableFuture<>();
126125
BeginMessage beginMessage = new BeginMessage( bookmark, config, connection.databaseName(), connection.mode() );
127-
128-
if ( bookmark.isEmpty() )
129-
{
130-
connection.write( beginMessage, NoOpResponseHandler.INSTANCE );
131-
return Futures.completedWithNull();
132-
}
133-
else
134-
{
135-
CompletableFuture<Void> beginTxFuture = new CompletableFuture<>();
136-
connection.writeAndFlush( beginMessage, new BeginTxResponseHandler( beginTxFuture ) );
137-
return beginTxFuture;
138-
}
126+
connection.writeAndFlush( beginMessage, new BeginTxResponseHandler( beginTxFuture ) );
127+
return beginTxFuture;
139128
}
140129

141130
@Override

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.neo4j.driver.Logger;
4141
import org.neo4j.driver.Logging;
42+
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
4243
import org.neo4j.driver.exceptions.ClientException;
4344
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4445
import org.neo4j.driver.exceptions.SessionExpiredException;
@@ -155,7 +156,8 @@ protected boolean canRetryOn( Throwable error )
155156
@Experimental
156157
public static boolean isRetryable( Throwable error )
157158
{
158-
return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || isTransientError( error );
159+
return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || error instanceof AuthorizationExpiredException ||
160+
isTransientError( error );
159161
}
160162

161163
/**

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.stream.Stream;
2626

2727
import org.neo4j.driver.exceptions.AuthenticationException;
28+
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
2829
import org.neo4j.driver.exceptions.ClientException;
2930
import org.neo4j.driver.exceptions.DatabaseException;
3031
import org.neo4j.driver.exceptions.FatalDiscoveryException;
@@ -75,6 +76,10 @@ else if ( code.equalsIgnoreCase( "Neo.ClientError.Database.DatabaseNotFound" ) )
7576
{
7677
return new FatalDiscoveryException( code, message );
7778
}
79+
else if ( code.equalsIgnoreCase( "Neo.ClientError.Security.AuthorizationExpired" ) )
80+
{
81+
return new AuthorizationExpiredException( code, message );
82+
}
7883
else
7984
{
8085
return new ClientException( code, message );

driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.junit.jupiter.api.Assertions.assertNull;
3131
import static org.junit.jupiter.api.Assertions.assertThrows;
3232
import static org.mockito.Mockito.mock;
33+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authorizationStateListener;
3334
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionId;
3435
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.creationTimestamp;
3536
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.lastUsedTimestamp;
@@ -38,6 +39,7 @@
3839
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress;
3940
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAgent;
4041
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverVersion;
42+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener;
4143
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
4244
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp;
4345
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
@@ -197,4 +199,23 @@ void shouldFailToSetTerminationReasonTwice()
197199

198200
assertThrows( IllegalStateException.class, () -> setTerminationReason( channel, "Reason 2" ) );
199201
}
202+
203+
@Test
204+
void shouldSetAndGetAuthorizationStateListener()
205+
{
206+
AuthorizationStateListener listener = mock( AuthorizationStateListener.class );
207+
setAuthorizationStateListener( channel, listener );
208+
assertEquals( listener, authorizationStateListener( channel ) );
209+
}
210+
211+
@Test
212+
void shouldAllowOverridingAuthorizationStateListener()
213+
{
214+
AuthorizationStateListener listener = mock( AuthorizationStateListener.class );
215+
setAuthorizationStateListener( channel, listener );
216+
assertEquals( listener, authorizationStateListener( channel ) );
217+
AuthorizationStateListener newListener = mock( AuthorizationStateListener.class );
218+
setAuthorizationStateListener( channel, newListener );
219+
assertEquals( newListener, authorizationStateListener( channel ) );
220+
}
200221
}

0 commit comments

Comments
 (0)