Skip to content

Commit 9654cdb

Browse files
committed
all connections state
1 parent 6d42a8f commit 9654cdb

File tree

8 files changed

+396
-13
lines changed

8 files changed

+396
-13
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.neo4j.driver.internal.bolt.pooledimpl.PooledBoltConnectionProvider;
5454
import org.neo4j.driver.internal.bolt.routedimpl.Rediscovery;
5555
import org.neo4j.driver.internal.bolt.routedimpl.RoutedBoltConnectionProvider;
56+
import org.neo4j.driver.internal.boltlistener.BoltConnectionListener;
5657
import org.neo4j.driver.internal.homedb.HomeDatabaseCache;
5758
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
5859
import org.neo4j.driver.internal.metrics.InternalMetricsProvider;
@@ -165,8 +166,10 @@ private InternalDriver createDriver(
165166
Supplier<Rediscovery> rediscoverySupplier) {
166167
DriverBoltConnectionProvider boltConnectionProvider = null;
167168
try {
168-
boltConnectionProvider =
169-
createBoltConnectionProvider(uri, config, eventLoopGroup, routingSettings, rediscoverySupplier);
169+
var homeDatabaseCache =
170+
HomeDatabaseCache.newInstance(uri.getScheme().startsWith("neo4j"));
171+
boltConnectionProvider = createBoltConnectionProvider(
172+
uri, config, eventLoopGroup, routingSettings, rediscoverySupplier, homeDatabaseCache);
170173
boltConnectionProvider.init(
171174
new BoltServerAddress(address.host(), address.port()),
172175
new RoutingContext(uri),
@@ -180,7 +183,7 @@ private InternalDriver createDriver(
180183
retryLogic,
181184
config,
182185
authTokenManager,
183-
HomeDatabaseCache.newInstance(uri.getScheme().startsWith("neo4j")));
186+
homeDatabaseCache);
184187
Supplier<CompletionStage<Void>> shutdownSupplier = ownsEventLoopGroup
185188
? () -> {
186189
var closeFuture = new CompletableFuture<Void>();
@@ -219,12 +222,14 @@ private DriverBoltConnectionProvider createBoltConnectionProvider(
219222
Config config,
220223
EventLoopGroup eventLoopGroup,
221224
RoutingSettings routingSettings,
222-
Supplier<Rediscovery> rediscoverySupplier) {
225+
Supplier<Rediscovery> rediscoverySupplier,
226+
BoltConnectionListener boltConnectionListener) {
223227
DriverBoltConnectionProvider boltConnectionProvider;
224228
var clock = createClock();
225229
var loggingProvider = new BoltLoggingProvider(config.logging());
226230
Supplier<BoltConnectionProvider> pooledBoltConnectionProviderSupplier =
227-
() -> createPooledBoltConnectionProvider(config, eventLoopGroup, clock, loggingProvider);
231+
() -> createPooledBoltConnectionProvider(
232+
config, eventLoopGroup, clock, loggingProvider, boltConnectionListener);
228233
var errorMapper = ErrorMapper.getInstance();
229234
if (uri.getScheme().startsWith("bolt")) {
230235
assertNoRoutingContext(uri, routingSettings);
@@ -266,8 +271,14 @@ private BoltConnectionProvider createRoutedBoltConnectionProvider(
266271
}
267272

268273
private BoltConnectionProvider createPooledBoltConnectionProvider(
269-
Config config, EventLoopGroup eventLoopGroup, Clock clock, LoggingProvider loggingProvider) {
274+
Config config,
275+
EventLoopGroup eventLoopGroup,
276+
Clock clock,
277+
LoggingProvider loggingProvider,
278+
BoltConnectionListener boltConnectionListener) {
270279
var nettyBoltConnectionProvider = createNettyBoltConnectionProvider(eventLoopGroup, clock, loggingProvider);
280+
nettyBoltConnectionProvider = BoltConnectionListener.listeningBoltConnectionProvider(
281+
nettyBoltConnectionProvider, boltConnectionListener);
271282
return new PooledBoltConnectionProvider(
272283
nettyBoltConnectionProvider,
273284
config.maxConnectionPoolSize(),

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,6 @@ private CompletionStage<BoltConnectionWithCloseTracking> acquireConnection(Acces
496496
additionalParams)
497497
.thenApply(connection -> {
498498
if (connection.serverSideRoutingEnabled()) {
499-
homeDatabaseCache.enable();
500499
if (databaseName == null) {
501500
// home database was requested
502501
connectionContext
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.boltlistener;
18+
19+
import org.neo4j.driver.internal.bolt.api.BoltConnection;
20+
import org.neo4j.driver.internal.bolt.api.BoltConnectionProvider;
21+
22+
public interface BoltConnectionListener {
23+
void onOpen(BoltConnection boltConnection);
24+
25+
void onClose(BoltConnection boltConnection);
26+
27+
static BoltConnectionProvider listeningBoltConnectionProvider(
28+
BoltConnectionProvider provider, BoltConnectionListener boltConnectionListener) {
29+
return new ListeningBoltConnectionProvider(provider, boltConnectionListener);
30+
}
31+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [https://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.driver.internal.boltlistener;
18+
19+
import java.time.Duration;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.Set;
23+
import java.util.concurrent.CompletionStage;
24+
import org.neo4j.driver.internal.bolt.api.AccessMode;
25+
import org.neo4j.driver.internal.bolt.api.AuthData;
26+
import org.neo4j.driver.internal.bolt.api.BoltConnection;
27+
import org.neo4j.driver.internal.bolt.api.BoltConnectionState;
28+
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
29+
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
30+
import org.neo4j.driver.internal.bolt.api.DatabaseName;
31+
import org.neo4j.driver.internal.bolt.api.NotificationConfig;
32+
import org.neo4j.driver.internal.bolt.api.ResponseHandler;
33+
import org.neo4j.driver.internal.bolt.api.TelemetryApi;
34+
import org.neo4j.driver.internal.bolt.api.TransactionType;
35+
import org.neo4j.driver.internal.bolt.api.values.Value;
36+
37+
final class ListeningBoltConnection implements BoltConnection {
38+
private final BoltConnection delegate;
39+
private final BoltConnectionListener boltConnectionListener;
40+
41+
public ListeningBoltConnection(BoltConnection delegate, BoltConnectionListener boltConnectionListener) {
42+
this.delegate = Objects.requireNonNull(delegate);
43+
this.boltConnectionListener = Objects.requireNonNull(boltConnectionListener);
44+
}
45+
46+
@Override
47+
public CompletionStage<BoltConnection> onLoop() {
48+
return delegate.onLoop().thenApply(ignored -> this);
49+
}
50+
51+
@Override
52+
public CompletionStage<BoltConnection> route(
53+
DatabaseName databaseName, String impersonatedUser, Set<String> bookmarks) {
54+
return delegate.route(databaseName, impersonatedUser, bookmarks).thenApply(ignored -> this);
55+
}
56+
57+
@Override
58+
public CompletionStage<BoltConnection> beginTransaction(
59+
DatabaseName databaseName,
60+
AccessMode accessMode,
61+
String impersonatedUser,
62+
Set<String> bookmarks,
63+
TransactionType transactionType,
64+
Duration txTimeout,
65+
Map<String, Value> txMetadata,
66+
String txType,
67+
NotificationConfig notificationConfig) {
68+
return delegate.beginTransaction(
69+
databaseName,
70+
accessMode,
71+
impersonatedUser,
72+
bookmarks,
73+
transactionType,
74+
txTimeout,
75+
txMetadata,
76+
txType,
77+
notificationConfig)
78+
.thenApply(ignored -> this);
79+
}
80+
81+
@Override
82+
public CompletionStage<BoltConnection> runInAutoCommitTransaction(
83+
DatabaseName databaseName,
84+
AccessMode accessMode,
85+
String impersonatedUser,
86+
Set<String> bookmarks,
87+
String query,
88+
Map<String, Value> parameters,
89+
Duration txTimeout,
90+
Map<String, Value> txMetadata,
91+
NotificationConfig notificationConfig) {
92+
return delegate.runInAutoCommitTransaction(
93+
databaseName,
94+
accessMode,
95+
impersonatedUser,
96+
bookmarks,
97+
query,
98+
parameters,
99+
txTimeout,
100+
txMetadata,
101+
notificationConfig)
102+
.thenApply(ignored -> this);
103+
}
104+
105+
@Override
106+
public CompletionStage<BoltConnection> run(String query, Map<String, Value> parameters) {
107+
return delegate.run(query, parameters).thenApply(ignored -> this);
108+
}
109+
110+
@Override
111+
public CompletionStage<BoltConnection> pull(long qid, long request) {
112+
return delegate.pull(qid, request).thenApply(ignored -> this);
113+
}
114+
115+
@Override
116+
public CompletionStage<BoltConnection> discard(long qid, long number) {
117+
return delegate.discard(qid, number).thenApply(ignored -> this);
118+
}
119+
120+
@Override
121+
public CompletionStage<BoltConnection> commit() {
122+
return delegate.commit().thenApply(ignored -> this);
123+
}
124+
125+
@Override
126+
public CompletionStage<BoltConnection> rollback() {
127+
return delegate.rollback().thenApply(ignored -> this);
128+
}
129+
130+
@Override
131+
public CompletionStage<BoltConnection> reset() {
132+
return delegate.reset().thenApply(ignored -> this);
133+
}
134+
135+
@Override
136+
public CompletionStage<BoltConnection> logoff() {
137+
return delegate.logoff().thenApply(ignored -> this);
138+
}
139+
140+
@Override
141+
public CompletionStage<BoltConnection> logon(Map<String, Value> authMap) {
142+
return delegate.logon(authMap).thenApply(ignored -> this);
143+
}
144+
145+
@Override
146+
public CompletionStage<BoltConnection> telemetry(TelemetryApi telemetryApi) {
147+
return delegate.telemetry(telemetryApi).thenApply(ignored -> this);
148+
}
149+
150+
@Override
151+
public CompletionStage<BoltConnection> clear() {
152+
return delegate.clear().thenApply(ignored -> this);
153+
}
154+
155+
@Override
156+
public CompletionStage<Void> flush(ResponseHandler handler) {
157+
return delegate.flush(handler);
158+
}
159+
160+
@Override
161+
public CompletionStage<Void> forceClose(String reason) {
162+
return delegate.forceClose(reason);
163+
}
164+
165+
@Override
166+
public CompletionStage<Void> close() {
167+
return delegate.close().whenComplete((ignored, throwable) -> boltConnectionListener.onClose(this));
168+
}
169+
170+
@Override
171+
public BoltConnectionState state() {
172+
return delegate.state();
173+
}
174+
175+
@Override
176+
public CompletionStage<AuthData> authData() {
177+
return delegate.authData();
178+
}
179+
180+
@Override
181+
public String serverAgent() {
182+
return delegate.serverAgent();
183+
}
184+
185+
@Override
186+
public BoltServerAddress serverAddress() {
187+
return delegate.serverAddress();
188+
}
189+
190+
@Override
191+
public BoltProtocolVersion protocolVersion() {
192+
return delegate.protocolVersion();
193+
}
194+
195+
@Override
196+
public boolean telemetrySupported() {
197+
return delegate.telemetrySupported();
198+
}
199+
200+
@Override
201+
public boolean serverSideRoutingEnabled() {
202+
return delegate.serverSideRoutingEnabled();
203+
}
204+
}

0 commit comments

Comments
 (0)