Skip to content

Commit e908801

Browse files
committed
ClusterDriver implementation
1 parent 1ffe696 commit e908801

File tree

69 files changed

+721
-504
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+721
-504
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.neo4j.driver.internal;
21+
22+
import org.neo4j.driver.internal.net.BoltServerAddress;
23+
import org.neo4j.driver.internal.security.SecurityPlan;
24+
import org.neo4j.driver.v1.Driver;
25+
import org.neo4j.driver.v1.Logger;
26+
import org.neo4j.driver.v1.Logging;
27+
import org.neo4j.driver.v1.Session;
28+
29+
import java.util.*;
30+
import java.util.concurrent.ThreadLocalRandom;
31+
32+
abstract class BaseDriver implements Driver
33+
{
34+
private final SessionParameters sessionParameters;
35+
private final SecurityPlan securityPlan;
36+
protected final List<BoltServerAddress> servers = new LinkedList<>();
37+
protected final Logger log;
38+
39+
BaseDriver( BoltServerAddress address, SessionParameters sessionParameters, SecurityPlan securityPlan, Logging logging )
40+
{
41+
this.servers.add( address );
42+
this.sessionParameters = sessionParameters;
43+
this.securityPlan = securityPlan;
44+
this.log = logging.getLog( Session.LOG_NAME );
45+
}
46+
47+
@Override
48+
public boolean isEncrypted()
49+
{
50+
return securityPlan.requiresEncryption();
51+
}
52+
53+
@Override
54+
public List<BoltServerAddress> servers()
55+
{
56+
return Collections.unmodifiableList( servers );
57+
}
58+
59+
protected BoltServerAddress randomServer()
60+
{
61+
return servers.get( ThreadLocalRandom.current().nextInt( 0, servers.size() ) );
62+
}
63+
64+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.neo4j.driver.internal;
21+
22+
import org.neo4j.driver.internal.net.BoltServerAddress;
23+
import org.neo4j.driver.internal.net.pooling.PoolSettings;
24+
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
25+
import org.neo4j.driver.internal.security.SecurityPlan;
26+
import org.neo4j.driver.internal.spi.ConnectionPool;
27+
import org.neo4j.driver.v1.Logging;
28+
import org.neo4j.driver.v1.Record;
29+
import org.neo4j.driver.v1.Session;
30+
import org.neo4j.driver.v1.StatementResult;
31+
import org.neo4j.driver.v1.exceptions.ClientException;
32+
import org.neo4j.driver.v1.util.Function;
33+
34+
import java.util.LinkedList;
35+
import java.util.List;
36+
37+
import static java.lang.String.format;
38+
39+
public class ClusterDriver extends BaseDriver
40+
{
41+
private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverMembers";
42+
private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints";
43+
44+
private final ConnectionPool connections;
45+
46+
public ClusterDriver( BoltServerAddress seedAddress, SessionParameters sessionParameters, SecurityPlan securityPlan,
47+
PoolSettings poolSettings, Logging logging )
48+
{
49+
super( seedAddress, sessionParameters, securityPlan, logging );
50+
this.connections = new SocketConnectionPool( sessionParameters, securityPlan, poolSettings, logging );
51+
discover();
52+
}
53+
54+
public void discover()
55+
{
56+
final List<BoltServerAddress> newServers = new LinkedList<>( );
57+
try
58+
{
59+
call( DISCOVER_MEMBERS, new Function<Record, Integer>()
60+
{
61+
@Override
62+
public Integer apply( Record record )
63+
{
64+
newServers.add( new BoltServerAddress( record.get( "address" ).asString() ) );
65+
return 0;
66+
}
67+
} );
68+
this.servers.clear();
69+
this.servers.addAll( newServers );
70+
log.debug( "~~ [MEMBERS] -> %s", newServers );
71+
}
72+
catch ( ClientException ex )
73+
{
74+
if ( ex.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
75+
{
76+
// no discovery available; keep servers as they are
77+
log.warn( "C: Discovery failed; could not find procedure %s", DISCOVER_MEMBERS );
78+
}
79+
else
80+
{
81+
throw ex;
82+
}
83+
}
84+
}
85+
86+
// public void acquire(final String role)
87+
// {
88+
// BoltServerAddress server;
89+
// try
90+
// {
91+
// call( ACQUIRE_ENDPOINTS, new Function<Record, Integer>()
92+
// {
93+
// @Override
94+
// public Integer apply( Record record )
95+
// {
96+
// if (record.get( "role" ).asString().equals( role ))
97+
// {
98+
// server = new BoltServerAddress( record.get( "address" ).asString() );
99+
// }
100+
// return 0;
101+
// }
102+
// } );
103+
// this.servers.clear();
104+
// this.servers.addAll( newServers );
105+
// log.debug( "~~ [MEMBERS] -> %s", newServers );
106+
// }
107+
// catch ( ClientException ex )
108+
// {
109+
// if ( ex.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
110+
// {
111+
// // no discovery available; keep servers as they are
112+
// log.warn( "C: Discovery failed; could not find procedure %s", DISCOVER_MEMBERS );
113+
// }
114+
// else
115+
// {
116+
// throw ex;
117+
// }
118+
// }
119+
// }
120+
121+
// public void
122+
123+
void call( String procedureName, Function<Record, Integer> recorder )
124+
{
125+
try ( Session session = new InternalSession( connections.acquire( randomServer() ), log ) )
126+
{
127+
StatementResult records = session.run( format( "CALL %s", procedureName ) );
128+
while ( records.hasNext() )
129+
{
130+
recorder.apply( records.next() );
131+
}
132+
}
133+
}
134+
135+
@Override
136+
public Session session()
137+
{
138+
throw new UnsupportedOperationException();
139+
}
140+
141+
@Override
142+
public void close()
143+
{
144+
try
145+
{
146+
connections.close();
147+
}
148+
catch ( Exception ex )
149+
{
150+
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
151+
}
152+
}
153+
154+
}

driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java

Lines changed: 16 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,76 +16,47 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19+
1920
package org.neo4j.driver.internal;
2021

21-
import org.neo4j.driver.internal.pool.SocketConnectionPool;
22-
import org.neo4j.driver.internal.pool.PoolSettings;
22+
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
23+
import org.neo4j.driver.internal.net.pooling.PoolSettings;
2324
import org.neo4j.driver.internal.security.SecurityPlan;
2425
import org.neo4j.driver.internal.spi.ConnectionPool;
25-
import org.neo4j.driver.internal.util.BoltServerAddress;
26-
import org.neo4j.driver.v1.Driver;
26+
import org.neo4j.driver.internal.net.BoltServerAddress;
2727
import org.neo4j.driver.v1.Logging;
2828
import org.neo4j.driver.v1.Session;
29-
import org.neo4j.driver.v1.exceptions.ClientException;
30-
import org.neo4j.driver.v1.exceptions.Neo4jException;
3129

32-
import java.util.*;
30+
import static java.lang.String.format;
3331

34-
public class DirectDriver implements Driver
32+
public class DirectDriver extends BaseDriver
3533
{
36-
private final Set<BoltServerAddress> servers;
37-
private final BoltServerAddress address;
38-
private final SecurityPlan securityPlan;
39-
private final Logging logging;
4034
private final ConnectionPool connections;
4135

42-
public DirectDriver( BoltServerAddress address, SecurityPlan securityPlan, PoolSettings poolSettings, Logging logging )
43-
{
44-
this.address = address;
45-
Set<BoltServerAddress> servers = new HashSet<>();
46-
servers.add( this.address );
47-
this.servers = Collections.unmodifiableSet( servers );
48-
this.securityPlan = securityPlan;
49-
this.logging = logging;
50-
this.connections = new SocketConnectionPool( securityPlan, poolSettings, logging );
51-
}
52-
53-
@Override
54-
public Set<BoltServerAddress> servers()
36+
public DirectDriver( BoltServerAddress address, SessionParameters sessionParameters, SecurityPlan securityPlan,
37+
PoolSettings poolSettings, Logging logging )
5538
{
56-
return servers;
39+
super( address, sessionParameters, securityPlan, logging );
40+
this.connections = new SocketConnectionPool( sessionParameters, securityPlan, poolSettings, logging );
5741
}
5842

59-
@Override
60-
public boolean isEncrypted()
61-
{
62-
return securityPlan.requiresEncryption();
63-
}
64-
65-
/**
66-
* Establish a session
67-
* @return a session that could be used to run {@link Session#run(String) a statement} or
68-
* {@link Session#beginTransaction() a transaction }.
69-
*/
7043
@Override
7144
public Session session()
7245
{
73-
return new InternalSession( connections.acquire( address ), logging.getLog( "session" ) );
46+
return new InternalSession( connections.acquire( randomServer() ), log );
7447
}
7548

76-
/**
77-
* Close all the resources assigned to this driver
78-
* @throws Neo4jException any error that might happen when releasing all resources
79-
*/
80-
public void close() throws Neo4jException
49+
@Override
50+
public void close()
8151
{
8252
try
8353
{
8454
connections.close();
8555
}
86-
catch ( Exception e )
56+
catch ( Exception ex )
8757
{
88-
throw new ClientException( "Failed to close driver.", e );
58+
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
8959
}
9060
}
61+
9162
}

driver/src/main/java/org/neo4j/driver/internal/ParameterSupport.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)