Skip to content

Commit 861acd2

Browse files
author
Zhen Li
committed
Make getRoutingTable procedure aware of multi-databases
1 parent 488e33d commit 861acd2

32 files changed

+1600
-814
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import java.util.concurrent.CompletionStage;
2222

2323
import org.neo4j.driver.AccessMode;
24-
import org.neo4j.driver.internal.async.connection.DecoratedConnection;
24+
import org.neo4j.driver.internal.async.connection.DirectConnection;
2525
import org.neo4j.driver.internal.spi.Connection;
2626
import org.neo4j.driver.internal.spi.ConnectionPool;
2727
import org.neo4j.driver.internal.spi.ConnectionProvider;
2828

2929
import static org.neo4j.driver.AccessMode.READ;
30-
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
30+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
3131

3232
/**
3333
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
@@ -47,14 +47,14 @@ public class DirectConnectionProvider implements ConnectionProvider
4747
@Override
4848
public CompletionStage<Connection> acquireConnection( String databaseName, AccessMode mode )
4949
{
50-
return connectionPool.acquire( address ).thenApply( connection -> new DecoratedConnection( connection, databaseName, mode ) );
50+
return connectionPool.acquire( address ).thenApply( connection -> new DirectConnection( connection, databaseName, mode ) );
5151
}
5252

5353
@Override
5454
public CompletionStage<Void> verifyConnectivity()
5555
{
5656
// we verify the connection by establishing the connection to the default database
57-
return acquireConnection( ABSENT_DB_NAME, READ ).thenCompose( Connection::release );
57+
return acquireConnection( SYSTEM_DB_NAME, READ ).thenCompose( Connection::release );
5858
}
5959

6060
@Override
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.channel.Channel;
22+
import io.netty.channel.pool.ChannelPool;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
import org.neo4j.driver.internal.BoltServerAddress;
29+
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
30+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
31+
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
32+
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
33+
import org.neo4j.driver.internal.messaging.BoltProtocol;
34+
import org.neo4j.driver.internal.messaging.Message;
35+
import org.neo4j.driver.internal.messaging.request.ResetMessage;
36+
import org.neo4j.driver.internal.metrics.ListenerEvent;
37+
import org.neo4j.driver.internal.metrics.MetricsListener;
38+
import org.neo4j.driver.internal.spi.Connection;
39+
import org.neo4j.driver.internal.spi.ResponseHandler;
40+
import org.neo4j.driver.internal.util.Clock;
41+
import org.neo4j.driver.internal.util.ServerVersion;
42+
43+
import static java.util.Collections.emptyMap;
44+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
45+
46+
/**
47+
* This connection represents a simple network connection to a remote server.
48+
* It wraps a channel obtained from a connection pool.
49+
* The life cycle of this connection start from the moment the channel is borrowed out of the pool
50+
* and end at the time the connection is released back to the pool.
51+
*/
52+
public class NetworkConnection implements Connection
53+
{
54+
private final Channel channel;
55+
private final InboundMessageDispatcher messageDispatcher;
56+
private final BoltServerAddress serverAddress;
57+
private final ServerVersion serverVersion;
58+
private final BoltProtocol protocol;
59+
private final ChannelPool channelPool;
60+
private final CompletableFuture<Void> releaseFuture;
61+
private final Clock clock;
62+
63+
private final AtomicReference<Status> status = new AtomicReference<>( Status.OPEN );
64+
private final MetricsListener metricsListener;
65+
private final ListenerEvent inUseEvent;
66+
67+
public NetworkConnection( Channel channel, ChannelPool channelPool, Clock clock, MetricsListener metricsListener )
68+
{
69+
this.channel = channel;
70+
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
71+
this.serverAddress = ChannelAttributes.serverAddress( channel );
72+
this.serverVersion = ChannelAttributes.serverVersion( channel );
73+
this.protocol = BoltProtocol.forChannel( channel );
74+
this.channelPool = channelPool;
75+
this.releaseFuture = new CompletableFuture<>();
76+
this.clock = clock;
77+
this.metricsListener = metricsListener;
78+
this.inUseEvent = metricsListener.createListenerEvent();
79+
metricsListener.afterConnectionCreated( this.serverAddress, this.inUseEvent );
80+
}
81+
82+
@Override
83+
public boolean isOpen()
84+
{
85+
return status.get() == Status.OPEN;
86+
}
87+
88+
@Override
89+
public void enableAutoRead()
90+
{
91+
if ( isOpen() )
92+
{
93+
setAutoRead( true );
94+
}
95+
}
96+
97+
@Override
98+
public void disableAutoRead()
99+
{
100+
if ( isOpen() )
101+
{
102+
setAutoRead( false );
103+
}
104+
}
105+
106+
@Override
107+
public void flush()
108+
{
109+
if ( verifyOpen( null, null ) )
110+
{
111+
flushInEventLoop();
112+
}
113+
}
114+
115+
@Override
116+
public void write( Message message, ResponseHandler handler )
117+
{
118+
if ( verifyOpen( handler, null ) )
119+
{
120+
writeMessageInEventLoop( message, handler, false );
121+
}
122+
}
123+
124+
@Override
125+
public void write( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2 )
126+
{
127+
if ( verifyOpen( handler1, handler2 ) )
128+
{
129+
writeMessagesInEventLoop( message1, handler1, message2, handler2, false );
130+
}
131+
}
132+
133+
@Override
134+
public void writeAndFlush( Message message, ResponseHandler handler )
135+
{
136+
if ( verifyOpen( handler, null ) )
137+
{
138+
writeMessageInEventLoop( message, handler, true );
139+
}
140+
}
141+
142+
@Override
143+
public void writeAndFlush( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2 )
144+
{
145+
if ( verifyOpen( handler1, handler2 ) )
146+
{
147+
writeMessagesInEventLoop( message1, handler1, message2, handler2, true );
148+
}
149+
}
150+
151+
@Override
152+
public CompletionStage<Void> reset()
153+
{
154+
CompletableFuture<Void> result = new CompletableFuture<>();
155+
ResetResponseHandler handler = new ResetResponseHandler( messageDispatcher, result );
156+
writeResetMessageIfNeeded( handler, true );
157+
return result;
158+
}
159+
160+
@Override
161+
public CompletionStage<Void> release()
162+
{
163+
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
164+
{
165+
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
166+
channelPool, messageDispatcher, clock, releaseFuture );
167+
168+
writeResetMessageIfNeeded( handler, false );
169+
metricsListener.afterConnectionReleased( this.serverAddress, this.inUseEvent );
170+
}
171+
return releaseFuture;
172+
}
173+
174+
@Override
175+
public void terminateAndRelease( String reason )
176+
{
177+
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
178+
{
179+
setTerminationReason( channel, reason );
180+
channel.close();
181+
channelPool.release( channel );
182+
releaseFuture.complete( null );
183+
metricsListener.afterConnectionReleased( this.serverAddress, this.inUseEvent );
184+
}
185+
}
186+
187+
@Override
188+
public BoltServerAddress serverAddress()
189+
{
190+
return serverAddress;
191+
}
192+
193+
@Override
194+
public ServerVersion serverVersion()
195+
{
196+
return serverVersion;
197+
}
198+
199+
@Override
200+
public BoltProtocol protocol()
201+
{
202+
return protocol;
203+
}
204+
205+
private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean isSessionReset )
206+
{
207+
channel.eventLoop().execute( () ->
208+
{
209+
if ( isSessionReset && !isOpen() )
210+
{
211+
resetHandler.onSuccess( emptyMap() );
212+
}
213+
else
214+
{
215+
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
216+
setAutoRead( true );
217+
218+
messageDispatcher.enqueue( resetHandler );
219+
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
220+
}
221+
} );
222+
}
223+
224+
private void flushInEventLoop()
225+
{
226+
channel.eventLoop().execute( channel::flush );
227+
}
228+
229+
private void writeMessageInEventLoop( Message message, ResponseHandler handler, boolean flush )
230+
{
231+
channel.eventLoop().execute( () ->
232+
{
233+
messageDispatcher.enqueue( handler );
234+
235+
if ( flush )
236+
{
237+
channel.writeAndFlush( message, channel.voidPromise() );
238+
}
239+
else
240+
{
241+
channel.write( message, channel.voidPromise() );
242+
}
243+
} );
244+
}
245+
246+
private void writeMessagesInEventLoop( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2, boolean flush )
247+
{
248+
channel.eventLoop().execute( () ->
249+
{
250+
messageDispatcher.enqueue( handler1 );
251+
messageDispatcher.enqueue( handler2 );
252+
253+
channel.write( message1, channel.voidPromise() );
254+
255+
if ( flush )
256+
{
257+
channel.writeAndFlush( message2, channel.voidPromise() );
258+
}
259+
else
260+
{
261+
channel.write( message2, channel.voidPromise() );
262+
}
263+
} );
264+
}
265+
266+
private void setAutoRead( boolean value )
267+
{
268+
channel.config().setAutoRead( value );
269+
}
270+
271+
private boolean verifyOpen( ResponseHandler handler1, ResponseHandler handler2 )
272+
{
273+
Status connectionStatus = this.status.get();
274+
switch ( connectionStatus )
275+
{
276+
case OPEN:
277+
return true;
278+
case RELEASED:
279+
Exception error = new IllegalStateException( "Connection has been released to the pool and can't be used" );
280+
if ( handler1 != null )
281+
{
282+
handler1.onFailure( error );
283+
}
284+
if ( handler2 != null )
285+
{
286+
handler2.onFailure( error );
287+
}
288+
return false;
289+
case TERMINATED:
290+
Exception terminatedError = new IllegalStateException( "Connection has been terminated and can't be used" );
291+
if ( handler1 != null )
292+
{
293+
handler1.onFailure( terminatedError );
294+
}
295+
if ( handler2 != null )
296+
{
297+
handler2.onFailure( terminatedError );
298+
}
299+
return false;
300+
default:
301+
throw new IllegalStateException( "Unknown status: " + connectionStatus );
302+
}
303+
}
304+
305+
private enum Status
306+
{
307+
OPEN,
308+
RELEASED,
309+
TERMINATED
310+
}
311+
}

0 commit comments

Comments
 (0)