Skip to content

Commit fd4ec00

Browse files
committed
WIP handling read sessions
1 parent d106218 commit fd4ec00

19 files changed

+423
-103
lines changed

driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@
1919

2020
package org.neo4j.driver.internal;
2121

22-
import java.util.Collections;
23-
import java.util.HashSet;
2422
import java.util.Set;
25-
import java.util.concurrent.ThreadLocalRandom;
2623

2724
import org.neo4j.driver.internal.net.BoltServerAddress;
2825
import org.neo4j.driver.internal.security.SecurityPlan;
26+
import org.neo4j.driver.internal.spi.ConnectionPool;
2927
import org.neo4j.driver.v1.Driver;
3028
import org.neo4j.driver.v1.Logger;
3129
import org.neo4j.driver.v1.Logging;
@@ -35,11 +33,12 @@ abstract class BaseDriver implements Driver
3533
{
3634
private final SecurityPlan securityPlan;
3735
protected final Logger log;
38-
protected final Set<BoltServerAddress> servers = new HashSet<>();
36+
protected final ConnectionPool connections;
3937

40-
BaseDriver( BoltServerAddress address, SecurityPlan securityPlan, Logging logging )
38+
BaseDriver( ConnectionPool connections, BoltServerAddress address, SecurityPlan securityPlan, Logging logging )
4139
{
42-
this.servers.add( address );
40+
this.connections = connections;
41+
this.connections.add( address );
4342
this.securityPlan = securityPlan;
4443
this.log = logging.getLog( Session.LOG_NAME );
4544
}
@@ -52,24 +51,7 @@ public boolean isEncrypted()
5251

5352
Set<BoltServerAddress> servers()
5453
{
55-
return Collections.unmodifiableSet( servers );
56-
}
57-
58-
//This is somewhat silly and has O(n) complexity
59-
protected BoltServerAddress randomServer()
60-
{
61-
ThreadLocalRandom random = ThreadLocalRandom.current();
62-
int item = random.nextInt(servers.size());
63-
int i = 0;
64-
for ( BoltServerAddress server : servers )
65-
{
66-
if (i == item)
67-
{
68-
return server;
69-
}
70-
}
71-
72-
throw new IllegalStateException( "This cannot happen" );
54+
return connections.keys();
7355
}
7456

7557
}

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 153 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,15 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
2019
package org.neo4j.driver.internal;
2120

22-
import java.util.LinkedList;
2321
import java.util.List;
2422

2523
import org.neo4j.driver.internal.net.BoltServerAddress;
2624
import org.neo4j.driver.internal.net.pooling.PoolSettings;
2725
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
2826
import org.neo4j.driver.internal.security.SecurityPlan;
2927
import org.neo4j.driver.internal.spi.Connection;
30-
import org.neo4j.driver.internal.spi.ConnectionPool;
3128
import org.neo4j.driver.internal.util.Consumer;
3229
import org.neo4j.driver.internal.util.Supplier;
3330
import org.neo4j.driver.v1.Logging;
@@ -36,8 +33,8 @@
3633
import org.neo4j.driver.v1.SessionMode;
3734
import org.neo4j.driver.v1.StatementResult;
3835
import org.neo4j.driver.v1.exceptions.ClientException;
39-
import org.neo4j.driver.v1.exceptions.ClusterUnavailableException;
4036
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
37+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4138

4239
import static java.lang.String.format;
4340

@@ -47,50 +44,44 @@ public class ClusterDriver extends BaseDriver
4744
private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints";
4845
private static final int MINIMUM_NUMBER_OF_SERVERS = 3;
4946

50-
private final ConnectionPool connections;
47+
private final Endpoints endpoints = new Endpoints();
5148

52-
public ClusterDriver( BoltServerAddress seedAddress, ConnectionSettings connectionSettings, SecurityPlan securityPlan,
53-
PoolSettings poolSettings, Logging logging )
49+
public ClusterDriver( BoltServerAddress seedAddress, ConnectionSettings connectionSettings,
50+
SecurityPlan securityPlan,
51+
PoolSettings poolSettings, Logging logging )
5452
{
55-
super( seedAddress, securityPlan, logging );
56-
this.connections = new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging );
53+
super( new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ),seedAddress, securityPlan, logging );
5754
discover();
5855
}
5956

60-
void discover()
57+
synchronized void discover()
6158
{
62-
final List<BoltServerAddress> newServers = new LinkedList<>( );
6359
try
6460
{
6561
boolean success = false;
66-
while ( !servers.isEmpty() && !success )
62+
while ( !connections.isEmpty() && !success )
6763
{
6864
success = call( DISCOVER_MEMBERS, new Consumer<Record>()
6965
{
7066
@Override
7167
public void accept( Record record )
7268
{
73-
newServers.add( new BoltServerAddress( record.get( "address" ).asString() ) );
69+
connections.add(new BoltServerAddress( record.get( "address" ).asString() ));
7470
}
7571
} );
76-
77-
}
78-
if ( success )
79-
{
80-
this.servers.clear();
81-
this.servers.addAll( newServers );
82-
log.debug( "~~ [MEMBERS] -> %s", newServers );
8372
}
84-
else
73+
if ( !success )
8574
{
86-
throw new ClusterUnavailableException( "Run out of servers" );
75+
throw new ServiceUnavailableException( "Run out of servers" );
8776
}
8877
}
8978
catch ( ClientException ex )
9079
{
9180
if ( ex.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
9281
{
93-
throw new ClientException( "Discovery failed: could not find procedure %s", DISCOVER_MEMBERS );
82+
//no procedure there, not much to do, stick with what we've got
83+
//this may happen because server is running in standalone mode
84+
log.warn( "Could not find procedure %s", DISCOVER_MEMBERS );
9485
}
9586
else
9687
{
@@ -99,13 +90,15 @@ public void accept( Record record )
9990
}
10091
}
10192

93+
//must be called from a synchronized method
10294
private boolean call( String procedureName, Consumer<Record> recorder )
10395
{
96+
Connection acquire = null;
97+
Session session = null;
98+
try {
99+
acquire = connections.acquire();
100+
session = new NetworkSession( acquire, log );
104101

105-
BoltServerAddress address = randomServer();
106-
Connection acquire = connections.acquire( address );
107-
try ( Session session = new NetworkSession( acquire, log ) )
108-
{
109102
StatementResult records = session.run( format( "CALL %s", procedureName ) );
110103
while ( records.hasNext() )
111104
{
@@ -114,65 +107,157 @@ private boolean call( String procedureName, Consumer<Record> recorder )
114107
}
115108
catch ( ConnectionFailureException e )
116109
{
117-
forget(address );
110+
if (acquire != null)
111+
{
112+
forget( acquire.address() );
113+
}
118114
return false;
119115
}
116+
finally
117+
{
118+
if (acquire != null)
119+
{
120+
acquire.close();
121+
}
122+
if (session != null)
123+
{
124+
session.close();
125+
}
126+
}
120127
return true;
121128
}
122129

123-
private void forget(BoltServerAddress address)
130+
//must be called from a synchronized method
131+
private void callWithRetry(String procedureName, Consumer<Record> recorder )
132+
{
133+
while ( !connections.isEmpty() )
134+
{
135+
Connection acquire = null;
136+
Session session = null;
137+
try {
138+
acquire = connections.acquire();
139+
session = new NetworkSession( acquire, log );
140+
List<Record> list = session.run( format( "CALL %s", procedureName ) ).list();
141+
for ( Record record : list )
142+
{
143+
recorder.accept( record );
144+
}
145+
//we found results give up
146+
return;
147+
}
148+
catch ( ConnectionFailureException e )
149+
{
150+
if (acquire != null)
151+
{
152+
forget( acquire.address() );
153+
}
154+
}
155+
finally
156+
{
157+
if (acquire != null)
158+
{
159+
acquire.close();
160+
}
161+
if (session != null)
162+
{
163+
session.close();
164+
}
165+
}
166+
}
167+
168+
throw new ServiceUnavailableException( "Failed to communicate with any of the cluster members" );
169+
}
170+
171+
private synchronized void forget( BoltServerAddress address )
124172
{
125-
servers.remove( address );
126-
connections.purge(address);
173+
address.markInvalid();
174+
connections.purge( address );
127175
}
128176

129-
//TODO this could return a WRITE session but that may lead to users using the LEADER too much
130-
//a `ClientException` may be what we want
131177
@Override
132178
public Session session()
133179
{
134-
throw new UnsupportedOperationException();
180+
return session( SessionMode.WRITE );
135181
}
136182

137183
@Override
138184
public Session session( final SessionMode mode )
139185
{
140-
return new ClusteredSession( new Supplier<Connection>()
186+
switch ( mode )
141187
{
142-
@Override
143-
public Connection get()
188+
case READ:
189+
return new ReadNetworkSession( new Supplier<Connection>()
144190
{
145-
return acquireConnection( mode );
146-
}
147-
}, log );
191+
@Override
192+
public Connection get()
193+
{
194+
return acquireConnection( mode );
195+
}
196+
}, new Consumer<Connection>()
197+
{
198+
@Override
199+
public void accept( Connection connection )
200+
{
201+
forget( connection.address() );
202+
}
203+
}, log );
204+
case WRITE:
205+
throw new UnsupportedOperationException();
206+
default:
207+
throw new UnsupportedOperationException();
208+
}
148209
}
149210

150-
private Connection acquireConnection( SessionMode mode )
211+
private synchronized Connection acquireConnection( SessionMode mode )
151212
{
152213
//if we are short on servers, find new ones
153-
if ( servers.size() < MINIMUM_NUMBER_OF_SERVERS )
214+
if ( connections.size() < MINIMUM_NUMBER_OF_SERVERS )
154215
{
155216
discover();
156217
}
157218

158-
final BoltServerAddress[] addresses = new BoltServerAddress[2];
159-
call( ACQUIRE_ENDPOINTS, new Consumer<Record>()
219+
endpoints.clear();
220+
try
221+
{
222+
callWithRetry( ACQUIRE_ENDPOINTS, new Consumer<Record>()
223+
{
224+
@Override
225+
public void accept( Record record )
226+
{
227+
String serverMode = record.get( "mode" ).asString();
228+
if ( serverMode.equals( "READ" ) )
229+
{
230+
endpoints.readServer = new BoltServerAddress( record.get( "address" ).asString() );
231+
}
232+
else if ( serverMode.equals( "WRITE" ) )
233+
{
234+
endpoints.writeServer = new BoltServerAddress( record.get( "address" ).asString() );
235+
}
236+
}
237+
} );
238+
}
239+
catch (ClientException e)
160240
{
161-
@Override
162-
public void accept( Record record )
241+
if ( e.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) )
163242
{
164-
addresses[0] = new BoltServerAddress( record.get( "READ" ).asString() );
165-
addresses[1] = new BoltServerAddress( record.get( "WRITE" ).asString() );
243+
log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS );
244+
return connections.acquire();
166245
}
167-
} );
246+
throw e;
247+
}
248+
249+
if ( !endpoints.valid() )
250+
{
251+
throw new ServiceUnavailableException("Could not establish any endpoints for the call");
252+
}
168253

169254

170255
switch ( mode )
171256
{
172257
case READ:
173-
return connections.acquire( addresses[0] );
258+
return connections.acquire( endpoints.readServer );
174259
case WRITE:
175-
return connections.acquire( addresses[0] );
260+
return connections.acquire( endpoints.writeServer );
176261
default:
177262
throw new ClientException( mode + " is not supported for creating new sessions" );
178263
}
@@ -191,4 +276,21 @@ public void close()
191276
}
192277
}
193278

279+
private static class Endpoints
280+
{
281+
BoltServerAddress readServer;
282+
BoltServerAddress writeServer;
283+
284+
public boolean valid()
285+
{
286+
return readServer != null && writeServer != null;
287+
}
288+
289+
public void clear()
290+
{
291+
readServer = null;
292+
writeServer = null;
293+
}
294+
}
295+
194296
}

0 commit comments

Comments
 (0)