Skip to content

Commit a08f81c

Browse files
author
Zhen Li
committed
Merge branch '1.7' into 2.0
2 parents d5a4554 + ff9e61b commit a08f81c

28 files changed

+651
-289
lines changed

driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@
2323
import java.net.SocketAddress;
2424
import java.net.URI;
2525
import java.net.UnknownHostException;
26+
import java.util.List;
2627
import java.util.Objects;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
2730

2831
import org.neo4j.driver.net.ServerAddress;
2932

3033
import static java.util.Objects.requireNonNull;
34+
import static java.util.stream.Collectors.toList;
3135

3236
/**
3337
* Holds a host and port pair that denotes a Bolt server address.
@@ -37,11 +41,12 @@ public class BoltServerAddress implements ServerAddress
3741
public static final int DEFAULT_PORT = 7687;
3842
public static final BoltServerAddress LOCAL_DEFAULT = new BoltServerAddress( "localhost", DEFAULT_PORT );
3943

40-
private final String originalHost; // This keeps the original host name provided by the user.
4144
private final String host; // This could either be the same as originalHost or it is an IP address resolved from the original host.
4245
private final int port;
4346
private final String stringValue;
4447

48+
private InetAddress resolved;
49+
4550
public BoltServerAddress( String address )
4651
{
4752
this( uriFrom( address ) );
@@ -54,15 +59,15 @@ public BoltServerAddress( URI uri )
5459

5560
public BoltServerAddress( String host, int port )
5661
{
57-
this( host, host, port );
62+
this( host, null, port );
5863
}
5964

60-
public BoltServerAddress( String originalHost, String host, int port )
65+
private BoltServerAddress( String host, InetAddress resolved, int port )
6166
{
62-
this.originalHost = requireNonNull( originalHost, "original host" );
6367
this.host = requireNonNull( host, "host" );
68+
this.resolved = resolved;
6469
this.port = requireValidPort( port );
65-
this.stringValue = String.format( "%s:%d", host, port );
70+
this.stringValue = resolved != null ? String.format( "%s(%s):%d", host, resolved.getHostAddress(), port ) : String.format( "%s:%d", host, port );
6671
}
6772

6873
public static BoltServerAddress from( ServerAddress address )
@@ -84,13 +89,13 @@ public boolean equals( Object o )
8489
return false;
8590
}
8691
BoltServerAddress that = (BoltServerAddress) o;
87-
return port == that.port && originalHost.equals( that.originalHost ) && host.equals( that.host );
92+
return port == that.port && host.equals( that.host );
8893
}
8994

9095
@Override
9196
public int hashCode()
9297
{
93-
return Objects.hash( originalHost, host, port );
98+
return Objects.hash( host, port );
9499
}
95100

96101
@Override
@@ -108,38 +113,39 @@ public String toString()
108113
*/
109114
public SocketAddress toSocketAddress()
110115
{
111-
return new InetSocketAddress( host, port );
116+
return resolved == null ? new InetSocketAddress( host, port ) : new InetSocketAddress( resolved, port );
112117
}
113118

114119
/**
115-
* Resolve the host name down to an IP address, if not already resolved.
120+
* Resolve the host name down to an IP address
116121
*
117-
* @return this instance if already resolved, otherwise a new address instance
122+
* @return a new address instance
118123
* @throws UnknownHostException if no IP address for the host could be found
119124
* @see InetAddress#getByName(String)
120125
*/
121126
public BoltServerAddress resolve() throws UnknownHostException
122127
{
123-
String ipAddress = InetAddress.getByName( host ).getHostAddress();
124-
if ( ipAddress.equals( host ) )
125-
{
126-
return this;
127-
}
128-
else
129-
{
130-
return new BoltServerAddress( host, ipAddress, port );
131-
}
128+
return new BoltServerAddress( host, InetAddress.getByName( host ), port );
132129
}
133130

134-
@Override
135-
public String host()
131+
/**
132+
* Resolve the host name down to all IP addresses that can be resolved to
133+
*
134+
* @return an array of new address instances that holds resolved addresses
135+
* @throws UnknownHostException if no IP address for the host could be found
136+
* @see InetAddress#getAllByName(String)
137+
*/
138+
public List<BoltServerAddress> resolveAll() throws UnknownHostException
136139
{
137-
return host;
140+
return Stream.of( InetAddress.getAllByName( host ) )
141+
.map( address -> new BoltServerAddress( host, address, port ) )
142+
.collect( toList() );
138143
}
139144

140-
public String originalHost()
145+
@Override
146+
public String host()
141147
{
142-
return originalHost;
148+
return host;
143149
}
144150

145151
@Override

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl;
3333
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
3434
import org.neo4j.driver.internal.async.pool.PoolSettings;
35-
import org.neo4j.driver.internal.cluster.DnsResolver;
3635
import org.neo4j.driver.internal.cluster.RoutingContext;
3736
import org.neo4j.driver.internal.cluster.RoutingSettings;
3837
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
@@ -62,7 +61,9 @@
6261

6362
import static java.lang.String.format;
6463
import static org.neo4j.driver.internal.metrics.MetricsProvider.METRICS_DISABLED_PROVIDER;
64+
import static org.neo4j.driver.internal.cluster.IdentityResolver.IDENTITY_RESOLVER;
6565
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
66+
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
6667

6768
public class DriverFactory
6869
{
@@ -245,7 +246,7 @@ private static LoadBalancingStrategy createLoadBalancingStrategy( Config config,
245246
private static ServerAddressResolver createResolver( Config config )
246247
{
247248
ServerAddressResolver configuredResolver = config.resolver();
248-
return configuredResolver != null ? configuredResolver : new DnsResolver( config.logging() );
249+
return configuredResolver != null ? configuredResolver : IDENTITY_RESOLVER;
249250
}
250251

251252
/**
@@ -330,12 +331,12 @@ private static SecurityPlan createSecurityPlanImpl( BoltServerAddress address, C
330331
case TRUST_ON_FIRST_USE:
331332
logger.warn(
332333
"Option `TRUST_ON_FIRST_USE` has been deprecated and will be removed in a future " +
333-
"version of the driver. Please switch to use `TRUST_ALL_CERTIFICATES` instead." );
334+
"version of the driver. Please switch to use `TRUST_ALL_CERTIFICATES` instead." );
334335
return SecurityPlan.forTrustOnFirstUse( trustStrategy.certFile(), hostnameVerificationEnabled, address, logger );
335336
case TRUST_SIGNED_CERTIFICATES:
336337
logger.warn(
337338
"Option `TRUST_SIGNED_CERTIFICATE` has been deprecated and will be removed in a future " +
338-
"version of the driver. Please switch to use `TRUST_CUSTOM_CA_SIGNED_CERTIFICATES` instead." );
339+
"version of the driver. Please switch to use `TRUST_CUSTOM_CA_SIGNED_CERTIFICATES` instead." );
339340
// intentional fallthrough
340341

341342
case TRUST_CUSTOM_CA_SIGNED_CERTIFICATES:
@@ -397,10 +398,7 @@ private static void closeConnectionPoolAndSuppressError( ConnectionPool connecti
397398
}
398399
catch ( Throwable closeError )
399400
{
400-
if ( mainError != closeError )
401-
{
402-
mainError.addSuppressed( closeError );
403-
}
401+
addSuppressed( mainError, closeError );
404402
}
405403
}
406404

driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyChannelInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private SslHandler createSslHandler()
7777
private SSLEngine createSslEngine()
7878
{
7979
SSLContext sslContext = securityPlan.sslContext();
80-
SSLEngine sslEngine = sslContext.createSSLEngine( address.originalHost(), address.port() );
80+
SSLEngine sslEngine = sslContext.createSSLEngine( address.host(), address.port() );
8181
sslEngine.setUseClientMode( true );
8282
if ( securityPlan.requiresHostnameVerification() )
8383
{

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
8888
else
8989
{
9090
failed = true;
91-
log.error( "Fatal error occurred in the pipeline", error );
91+
log.warn( "Fatal error occurred in the pipeline", error );
9292
fail( ctx, error );
9393
}
9494
}
9595

9696
private void fail( ChannelHandlerContext ctx, Throwable error )
9797
{
9898
Throwable cause = transformError( error );
99-
messageDispatcher.handleFatalError( cause );
99+
messageDispatcher.handleChannelError( cause );
100100
log.debug( "Closing channel because of a failure '%s'", error );
101101
ctx.close();
102102
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import static java.util.Objects.requireNonNull;
3939
import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET;
40+
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
4041

4142
public class InboundMessageDispatcher implements ResponseMessageHandler
4243
{
@@ -141,9 +142,17 @@ public void handleIgnoredMessage()
141142
handler.onFailure( error );
142143
}
143144

144-
public void handleFatalError( Throwable error )
145+
public void handleChannelError( Throwable error )
145146
{
146-
currentError = error;
147+
if ( currentError != null )
148+
{
149+
// we already have an error, this new error probably is caused by the existing one, thus we chain the new error on this current error
150+
addSuppressed( currentError, error );
151+
}
152+
else
153+
{
154+
currentError = error;
155+
}
147156
fatalErrorOccurred = true;
148157

149158
while ( !handlers.isEmpty() )

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
import java.util.concurrent.TimeoutException;
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636

37+
import org.neo4j.driver.Logger;
38+
import org.neo4j.driver.Logging;
39+
import org.neo4j.driver.exceptions.ClientException;
40+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3741
import org.neo4j.driver.internal.BoltServerAddress;
3842
import org.neo4j.driver.internal.async.connection.ChannelConnector;
3943
import org.neo4j.driver.internal.async.connection.DirectConnection;
@@ -43,9 +47,8 @@
4347
import org.neo4j.driver.internal.spi.ConnectionPool;
4448
import org.neo4j.driver.internal.util.Clock;
4549
import org.neo4j.driver.internal.util.Futures;
46-
import org.neo4j.driver.Logger;
47-
import org.neo4j.driver.Logging;
48-
import org.neo4j.driver.exceptions.ClientException;
50+
51+
import static java.lang.String.format;
4952

5053
public class ConnectionPoolImpl implements ConnectionPool
5154
{
@@ -59,11 +62,12 @@ public class ConnectionPoolImpl implements ConnectionPool
5962
private final MetricsListener metricsListener;
6063
private final boolean ownsEventLoopGroup;
6164

62-
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
65+
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
6366
private final AtomicBoolean closed = new AtomicBoolean();
6467

6568
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock, boolean ownsEventLoopGroup )
6669
{
70+
6771
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), settings, metricsListener, logging, clock, ownsEventLoopGroup );
6872
}
6973

@@ -87,7 +91,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
8791
log.trace( "Acquiring a connection from pool towards %s", address );
8892

8993
assertNotClosed();
90-
ChannelPool pool = getOrCreatePool( address );
94+
ExtendedChannelPool pool = getOrCreatePool( address );
9195

9296
ListenerEvent acquireEvent = metricsListener.createListenerEvent();
9397
metricsListener.beforeAcquiringOrCreating( address, acquireEvent );
@@ -97,7 +101,7 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
97101
{
98102
try
99103
{
100-
processAcquisitionError( address, error );
104+
processAcquisitionError( pool, address, error );
101105
assertNotClosed( address, channel, pool );
102106
Connection connection = new DirectConnection( channel, pool, clock, metricsListener );
103107

@@ -156,7 +160,7 @@ public CompletionStage<Void> close()
156160
try
157161
{
158162
nettyChannelTracker.prepareToCloseChannels();
159-
for ( Map.Entry<BoltServerAddress,ChannelPool> entry : pools.entrySet() )
163+
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
160164
{
161165
BoltServerAddress address = entry.getKey();
162166
ChannelPool pool = entry.getValue();
@@ -192,9 +196,9 @@ public boolean isOpen( BoltServerAddress address )
192196
return pools.containsKey( address );
193197
}
194198

195-
private ChannelPool getOrCreatePool( BoltServerAddress address )
199+
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
196200
{
197-
ChannelPool pool = pools.get( address );
201+
ExtendedChannelPool pool = pools.get( address );
198202
if ( pool != null )
199203
{
200204
return pool;
@@ -215,7 +219,7 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
215219
return pool;
216220
}
217221

218-
ChannelPool newPool( BoltServerAddress address )
222+
ExtendedChannelPool newPool( BoltServerAddress address )
219223
{
220224
return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker,
221225
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );
@@ -226,7 +230,7 @@ private EventLoopGroup eventLoopGroup()
226230
return bootstrap.config().group();
227231
}
228232

229-
private void processAcquisitionError( BoltServerAddress serverAddress, Throwable error )
233+
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
230234
{
231235
Throwable cause = Futures.completionExceptionCause( error );
232236
if ( cause != null )
@@ -240,6 +244,13 @@ private void processAcquisitionError( BoltServerAddress serverAddress, Throwable
240244
"Unable to acquire connection from the pool within configured maximum time of " +
241245
settings.connectionAcquisitionTimeout() + "ms" );
242246
}
247+
else if ( pool.isClosed() )
248+
{
249+
// There is a race condition where a thread tries to acquire a connection while the pool is closed by another concurrent thread.
250+
// Treat as failed to obtain connection for a direct driver. For a routing driver, this error should be retried.
251+
throw new ServiceUnavailableException( format( "Connection pool for server %s is closed while acquiring a connection.", serverAddress ),
252+
cause );
253+
}
243254
else
244255
{
245256
// some unknown error happened during connection acquisition, propagate it
@@ -272,4 +283,10 @@ public String toString()
272283
{
273284
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
274285
}
286+
287+
// for testing only
288+
ExtendedChannelPool getPool( BoltServerAddress address )
289+
{
290+
return pools.get( address );
291+
}
275292
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.pool;
20+
21+
import io.netty.channel.pool.ChannelPool;
22+
23+
public interface ExtendedChannelPool extends ChannelPool
24+
{
25+
boolean isClosed();
26+
}

0 commit comments

Comments
 (0)