Skip to content

Commit dcb534f

Browse files
committed
Update
1 parent 89b6991 commit dcb534f

35 files changed

+688
-96
lines changed

driver/src/main/java/org/neo4j/driver/AuthTokenManagers.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static AuthTokenManager temporalAsync(Supplier<CompletionStage<TemporalAu
7676
* {@link AuthToken} and its UTC expiration timestamp.
7777
* @since 5.7
7878
*/
79-
public interface TemporalAuthData {
79+
public sealed interface TemporalAuthData permits TemporalAuthTokenManager.InternalTemporalAuthData {
8080
/**
8181
* Returns a new instance with the provided token and {@link Long#MAX_VALUE} expiration timestamp.
8282
* @param authToken the token
@@ -89,11 +89,11 @@ static TemporalAuthData of(AuthToken authToken) {
8989
/**
9090
* Returns a new instance with the provided token and its expiration timestamp.
9191
* @param authToken the token
92-
* @param expirationTimestamp the expiration timestamp
92+
* @param utcExpirationTimestamp the UTC expiration timestamp
9393
* @return a new instance
9494
*/
95-
static TemporalAuthData of(AuthToken authToken, long expirationTimestamp) {
96-
return new TemporalAuthTokenManager.InternalTemporalAuthData(authToken, expirationTimestamp);
95+
static TemporalAuthData of(AuthToken authToken, long utcExpirationTimestamp) {
96+
return new TemporalAuthTokenManager.InternalTemporalAuthData(authToken, utcExpirationTimestamp);
9797
}
9898

9999
/**

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public boolean verifyAuthentication(AuthToken authToken) {
166166
.withDefaultAccessMode(AccessMode.READ)
167167
.build();
168168
try (var session = session(Session.class, config, authToken)) {
169-
session.run("RETURN 1").consume();
169+
session.run("SHOW DEFAULT DATABASE").consume();
170170
return true;
171171
} catch (RuntimeException e) {
172172
if (e instanceof Neo4jException neo4jException) {

driver/src/main/java/org/neo4j/driver/internal/async/connection/HandshakeCompletedListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public void operationComplete(ChannelFuture future) {
6161
connectionInitializedPromise.setFailure(throwable);
6262
} else {
6363
authContext.initiateAuth(authToken);
64+
authContext.setValidToken(authToken);
6465
protocol.initializeChannel(
6566
userAgent,
6667
authToken,

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void handleFailureMessage(String code, String message) {
128128
tokenExpiredException.code(), tokenExpiredException.getMessage());
129129
}
130130
var authToken = authContext.getAuthToken();
131-
if (authToken != null) {
131+
if (authToken != null && authContext.isManaged()) {
132132
authTokenProvider.onExpired(authToken);
133133
}
134134
} else {

driver/src/main/java/org/neo4j/driver/internal/async/pool/AuthContext.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,25 @@ public class AuthContext {
2828
private AuthToken authToken;
2929
private Long authTimestamp;
3030
private boolean pendingLogoff;
31+
private boolean managed;
32+
private AuthToken validToken;
3133

3234
public AuthContext(AuthTokenManager authTokenManager) {
3335
requireNonNull(authTokenManager, "authTokenProvider must not be null");
3436
this.authTokenManager = authTokenManager;
37+
this.managed = true;
3538
}
3639

3740
public void initiateAuth(AuthToken authToken) {
41+
initiateAuth(authToken, true);
42+
}
43+
44+
public void initiateAuth(AuthToken authToken, boolean managed) {
3845
requireNonNull(authToken, "authToken must not be null");
3946
this.authToken = authToken;
4047
authTimestamp = null;
4148
pendingLogoff = false;
49+
this.managed = managed;
4250
}
4351

4452
public AuthToken getAuthToken() {
@@ -61,6 +69,18 @@ public boolean isPendingLogoff() {
6169
return pendingLogoff;
6270
}
6371

72+
public void setValidToken(AuthToken validToken) {
73+
this.validToken = validToken;
74+
}
75+
76+
public AuthToken getValidToken() {
77+
return validToken;
78+
}
79+
80+
public boolean isManaged() {
81+
return managed;
82+
}
83+
6484
public AuthTokenManager getAuthTokenManager() {
6585
return authTokenManager;
6686
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public Future<Boolean> isHealthy(Channel channel) {
7070
} else {
7171
var authContext = authContext(channel);
7272
if (authContext.getAuthTimestamp() != null) {
73+
authContext.setValidToken(authToken);
7374
var equal = authToken.equals(authContext.getAuthToken());
7475
if (isAuthExpiredByFailure(channel) || !equal) {
7576
// Bolt versions prior to 5.1 do not support auth renewal

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 69 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.netty.channel.pool.FixedChannelPool;
3333
import java.time.Clock;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionException;
3536
import java.util.concurrent.CompletionStage;
3637
import java.util.concurrent.atomic.AtomicBoolean;
3738
import org.neo4j.driver.AuthToken;
@@ -129,59 +130,94 @@ public NettyChannelHealthChecker healthChecker() {
129130

130131
@Override
131132
public CompletionStage<Channel> acquire(AuthToken overrideAuthToken) {
132-
return asCompletionStage(delegate.acquire()).thenCompose(channel -> pipelineAuth(channel, overrideAuthToken));
133+
return asCompletionStage(delegate.acquire()).thenCompose(channel -> auth(channel, overrideAuthToken));
133134
}
134135

135-
private CompletionStage<Channel> pipelineAuth(Channel channel, AuthToken overrideAuthToken) {
136-
CompletionStage<Channel> pipelinedAuthStage;
136+
private CompletionStage<Channel> auth(Channel channel, AuthToken overrideAuthToken) {
137+
CompletionStage<Channel> authStage;
137138
var authContext = authContext(channel);
138139
if (overrideAuthToken != null) {
139140
// check protocol version
140141
var protocolVersion = protocolVersion(channel);
141142
if (!SessionAuthUtil.supportsSessionAuth(protocolVersion)) {
142-
pipelinedAuthStage = Futures.failedFuture(new UnsupportedFeatureException(String.format(
143+
authStage = Futures.failedFuture(new UnsupportedFeatureException(String.format(
143144
"Detected Bolt %s connection that does not support the auth token override feature, please make sure to have all servers communicating over Bolt 5.1 or above to use the feature",
144145
protocolVersion)));
145146
} else {
146147
// auth or re-auth only if necessary
147148
if (!overrideAuthToken.equals(authContext.getAuthToken())) {
149+
CompletableFuture<Void> logoffFuture;
148150
if (authContext.getAuthTimestamp() != null) {
149-
messageDispatcher(channel).enqueue(new LogoffResponseHandler());
151+
logoffFuture = new CompletableFuture<>();
152+
messageDispatcher(channel).enqueue(new LogoffResponseHandler(logoffFuture));
150153
channel.write(LogoffMessage.INSTANCE);
154+
} else {
155+
logoffFuture = null;
151156
}
152-
messageDispatcher(channel).enqueue(new LogonResponseHandler(channel.voidPromise(), clock));
153-
authContext.initiateAuth(overrideAuthToken);
157+
var logonFuture = new CompletableFuture<Void>();
158+
messageDispatcher(channel).enqueue(new LogonResponseHandler(logonFuture, channel, clock));
159+
authContext.initiateAuth(overrideAuthToken, false);
160+
authContext.setValidToken(null);
154161
channel.write(new LogonMessage(((InternalAuthToken) overrideAuthToken).toMap()));
162+
if (logoffFuture == null) {
163+
authStage = logonFuture.thenApply(ignored -> channel);
164+
channel.flush();
165+
} else {
166+
// do not await for re-login
167+
authStage = CompletableFuture.completedStage(channel);
168+
}
169+
} else {
170+
authStage = CompletableFuture.completedStage(channel);
155171
}
156-
pipelinedAuthStage = CompletableFuture.completedStage(channel);
157172
}
158173
} else {
159-
pipelinedAuthStage = authContext
160-
.getAuthTokenManager()
161-
.getToken()
162-
.thenApplyAsync(
163-
latestAuthToken -> {
164-
if (authContext.getAuthTimestamp() != null) {
165-
if (!authContext.getAuthToken().equals(latestAuthToken)
166-
|| authContext.isPendingLogoff()) {
167-
messageDispatcher(channel).enqueue(new LogoffResponseHandler());
168-
channel.write(LogoffMessage.INSTANCE);
169-
messageDispatcher(channel)
170-
.enqueue(new LogonResponseHandler(channel.voidPromise(), clock));
171-
authContext.initiateAuth(latestAuthToken);
172-
channel.write(new LogonMessage(((InternalAuthToken) latestAuthToken).toMap()));
173-
}
174-
} else {
175-
messageDispatcher(channel)
176-
.enqueue(new LogonResponseHandler(channel.voidPromise(), clock));
177-
authContext.initiateAuth(latestAuthToken);
178-
channel.write(new LogonMessage(((InternalAuthToken) latestAuthToken).toMap()));
179-
}
180-
return channel;
181-
},
182-
channel.eventLoop());
174+
var validToken = authContext.getValidToken();
175+
authContext.setValidToken(null);
176+
var stage = validToken != null
177+
? CompletableFuture.completedStage(validToken)
178+
: authContext.getAuthTokenManager().getToken();
179+
authStage = stage.thenComposeAsync(
180+
latestAuthToken -> {
181+
CompletionStage<Channel> result;
182+
if (authContext.getAuthTimestamp() != null) {
183+
if (!authContext.getAuthToken().equals(latestAuthToken) || authContext.isPendingLogoff()) {
184+
var logoffFuture = new CompletableFuture<Void>();
185+
messageDispatcher(channel).enqueue(new LogoffResponseHandler(logoffFuture));
186+
channel.write(LogoffMessage.INSTANCE);
187+
var logonFuture = new CompletableFuture<Void>();
188+
messageDispatcher(channel)
189+
.enqueue(new LogonResponseHandler(logonFuture, channel, clock));
190+
authContext.initiateAuth(latestAuthToken);
191+
channel.write(new LogonMessage(((InternalAuthToken) latestAuthToken).toMap()));
192+
// do not await for re-login
193+
result = CompletableFuture.completedStage(channel);
194+
} else {
195+
result = CompletableFuture.completedStage(channel);
196+
}
197+
} else {
198+
var logonFuture = new CompletableFuture<Void>();
199+
messageDispatcher(channel).enqueue(new LogonResponseHandler(logonFuture, channel, clock));
200+
result = logonFuture.thenApply(ignored -> channel);
201+
authContext.initiateAuth(latestAuthToken);
202+
channel.writeAndFlush(new LogonMessage(((InternalAuthToken) latestAuthToken).toMap()));
203+
}
204+
return result;
205+
},
206+
channel.eventLoop());
183207
}
184-
return pipelinedAuthStage;
208+
return authStage.handle((ignored, throwable) -> {
209+
if (throwable != null) {
210+
channel.close();
211+
release(channel);
212+
if (throwable instanceof RuntimeException runtimeException) {
213+
throw runtimeException;
214+
} else {
215+
throw new CompletionException(throwable);
216+
}
217+
} else {
218+
return channel;
219+
}
220+
});
185221
}
186222

187223
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/LogoffResponseHandler.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,33 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21+
import static java.util.Objects.requireNonNull;
22+
2123
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
2225
import org.neo4j.driver.Value;
26+
import org.neo4j.driver.exceptions.ProtocolException;
2327
import org.neo4j.driver.internal.spi.ResponseHandler;
2428

2529
public class LogoffResponseHandler implements ResponseHandler {
30+
private final CompletableFuture<?> future;
31+
32+
public LogoffResponseHandler(CompletableFuture<?> future) {
33+
this.future = requireNonNull(future, "future must not be null");
34+
}
35+
2636
@Override
27-
public void onSuccess(Map<String, Value> metadata) {}
37+
public void onSuccess(Map<String, Value> metadata) {
38+
future.complete(null);
39+
}
2840

2941
@Override
30-
public void onFailure(Throwable error) {}
42+
public void onFailure(Throwable error) {
43+
future.completeExceptionally(error);
44+
}
3145

3246
@Override
33-
public void onRecord(Value[] fields) {}
47+
public void onRecord(Value[] fields) {
48+
this.future.completeExceptionally(new ProtocolException("Records are not supported on LOGON"));
49+
}
3450
}

driver/src/main/java/org/neo4j/driver/internal/handlers/LogonResponseHandler.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,37 @@
2222
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authContext;
2323

2424
import io.netty.channel.Channel;
25-
import io.netty.channel.ChannelPromise;
2625
import java.time.Clock;
2726
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
2828
import org.neo4j.driver.Value;
29+
import org.neo4j.driver.exceptions.ProtocolException;
2930
import org.neo4j.driver.internal.spi.ResponseHandler;
3031

3132
public class LogonResponseHandler implements ResponseHandler {
32-
33-
private final ChannelPromise connectionInitializedPromise;
33+
private final CompletableFuture<?> future;
3434
private final Channel channel;
3535
private final Clock clock;
3636

37-
public LogonResponseHandler(ChannelPromise connectionInitializedPromise, Clock clock) {
38-
requireNonNull(clock, "clock must not be null");
39-
this.connectionInitializedPromise = connectionInitializedPromise;
40-
this.channel = connectionInitializedPromise.channel();
41-
this.clock = clock;
37+
public LogonResponseHandler(CompletableFuture<?> future, Channel channel, Clock clock) {
38+
this.future = requireNonNull(future, "future must not be null");
39+
this.channel = requireNonNull(channel, "channel must not be null");
40+
this.clock = requireNonNull(clock, "clock must not be null");
4241
}
4342

4443
@Override
4544
public void onSuccess(Map<String, Value> metadata) {
4645
authContext(channel).finishAuth(clock.millis());
47-
connectionInitializedPromise.setSuccess();
46+
future.complete(null);
4847
}
4948

5049
@Override
5150
public void onFailure(Throwable error) {
52-
channel.close().addListener(future -> connectionInitializedPromise.setFailure(error));
51+
channel.close().addListener(future -> this.future.completeExceptionally(error));
5352
}
5453

5554
@Override
5655
public void onRecord(Value[] fields) {
57-
throw new UnsupportedOperationException();
56+
future.completeExceptionally(new ProtocolException("Records are not supported on LOGON"));
5857
}
5958
}

driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.junit.jupiter.api.Assertions.assertThrows;
3030
import static org.junit.jupiter.api.Assertions.assertTrue;
3131
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
32+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V51;
3233
import static org.neo4j.driver.testutil.TestUtil.await;
3334

3435
import io.netty.bootstrap.Bootstrap;
@@ -64,6 +65,7 @@
6465
import org.neo4j.driver.internal.security.SecurityPlan;
6566
import org.neo4j.driver.internal.security.SecurityPlanImpl;
6667
import org.neo4j.driver.internal.security.StaticAuthTokenManager;
68+
import org.neo4j.driver.internal.util.DisabledOnNeo4jWith;
6769
import org.neo4j.driver.internal.util.FakeClock;
6870
import org.neo4j.driver.testutil.DatabaseExtension;
6971
import org.neo4j.driver.testutil.ParallelizableIT;
@@ -129,6 +131,8 @@ void shouldFailToConnectToWrongAddress() throws Exception {
129131
assertFalse(channel.isActive());
130132
}
131133

134+
// Beginning with Bolt 5.1 auth is not sent on HELLO message.
135+
@DisabledOnNeo4jWith(BOLT_V51)
132136
@Test
133137
void shouldFailToConnectWithWrongCredentials() throws Exception {
134138
AuthToken authToken = AuthTokens.basic("neo4j", "wrong-password");

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ void shouldEmitTokenExpiredRetryableExceptionAndNotifyAuthTokenManager() {
445445
var channel = new EmbeddedChannel();
446446
var authTokenManager = mock(AuthTokenManager.class);
447447
var authContext = mock(AuthContext.class);
448+
given(authContext.isManaged()).willReturn(true);
448449
given(authContext.getAuthTokenManager()).willReturn(authTokenManager);
449450
var authToken = mock(AuthToken.class);
450451
given(authContext.getAuthToken()).willReturn(authToken);
@@ -473,6 +474,7 @@ void shouldEmitTokenExpiredExceptionAndNotifyAuthTokenManager() {
473474
var authToken = mock(AuthToken.class);
474475
var authTokenManager = spy(new StaticAuthTokenManager(authToken));
475476
var authContext = mock(AuthContext.class);
477+
given(authContext.isManaged()).willReturn(true);
476478
given(authContext.getAuthTokenManager()).willReturn(authTokenManager);
477479
given(authContext.getAuthToken()).willReturn(authToken);
478480
setAuthContext(channel, authContext);

0 commit comments

Comments
 (0)