Skip to content

Commit 3b90f2c

Browse files
author
Zhen Li
authored
Merge pull request #250 from pontusmelke/1.1-tx-close
Proper handling of cluster failures for transactions
2 parents 70fb746 + bb9121c commit 3b90f2c

12 files changed

+1374
-188
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import java.util.Collections;
22+
import java.util.Comparator;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Set;
26+
27+
import org.neo4j.driver.internal.net.BoltServerAddress;
28+
import org.neo4j.driver.internal.util.Clock;
29+
import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet;
30+
import org.neo4j.driver.v1.Logger;
31+
32+
/**
33+
* Defines a snapshot view of the cluster.
34+
*/
35+
class ClusterView
36+
{
37+
private final static Comparator<BoltServerAddress> COMPARATOR = new Comparator<BoltServerAddress>()
38+
{
39+
@Override
40+
public int compare( BoltServerAddress o1, BoltServerAddress o2 )
41+
{
42+
int compare = o1.host().compareTo( o2.host() );
43+
if ( compare == 0 )
44+
{
45+
compare = Integer.compare( o1.port(), o2.port() );
46+
}
47+
48+
return compare;
49+
}
50+
};
51+
52+
private static final int MIN_ROUTERS = 1;
53+
54+
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers =
55+
new ConcurrentRoundRobinSet<>( COMPARATOR );
56+
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers =
57+
new ConcurrentRoundRobinSet<>( COMPARATOR );
58+
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers =
59+
new ConcurrentRoundRobinSet<>( COMPARATOR );
60+
private final Clock clock;
61+
private final long expires;
62+
private final Logger log;
63+
64+
public ClusterView( long expires, Clock clock, Logger log )
65+
{
66+
this.expires = expires;
67+
this.clock = clock;
68+
this.log = log;
69+
}
70+
71+
public void addRouter( BoltServerAddress router )
72+
{
73+
this.routingServers.add( router );
74+
}
75+
76+
public boolean isStale()
77+
{
78+
return expires < clock.millis() ||
79+
routingServers.size() <= MIN_ROUTERS ||
80+
readServers.isEmpty() ||
81+
writeServers.isEmpty();
82+
}
83+
84+
Set<BoltServerAddress> all()
85+
{
86+
HashSet<BoltServerAddress> all =
87+
new HashSet<>( routingServers.size() + readServers.size() + writeServers.size() );
88+
all.addAll( routingServers );
89+
all.addAll( readServers );
90+
all.addAll( writeServers );
91+
return all;
92+
}
93+
94+
95+
public BoltServerAddress nextRouter()
96+
{
97+
return routingServers.hop();
98+
}
99+
100+
public BoltServerAddress nextReader()
101+
{
102+
return readServers.hop();
103+
}
104+
105+
public BoltServerAddress nextWriter()
106+
{
107+
return writeServers.hop();
108+
}
109+
110+
public void addReaders( List<BoltServerAddress> addresses )
111+
{
112+
readServers.addAll( addresses );
113+
}
114+
115+
public void addWriters( List<BoltServerAddress> addresses )
116+
{
117+
writeServers.addAll( addresses );
118+
}
119+
120+
public void addRouters( List<BoltServerAddress> addresses )
121+
{
122+
routingServers.addAll( addresses );
123+
}
124+
125+
public void remove( BoltServerAddress address )
126+
{
127+
if ( routingServers.remove( address ) )
128+
{
129+
log.debug( "Removing %s from routers", address.toString() );
130+
}
131+
if ( readServers.remove( address ) )
132+
{
133+
log.debug( "Removing %s from readers", address.toString() );
134+
}
135+
if ( writeServers.remove( address ) )
136+
{
137+
log.debug( "Removing %s from writers", address.toString() );
138+
}
139+
}
140+
141+
public boolean removeWriter( BoltServerAddress address )
142+
{
143+
return writeServers.remove( address );
144+
}
145+
146+
public int numberOfRouters()
147+
{
148+
return routingServers.size();
149+
}
150+
151+
public int numberOfReaders()
152+
{
153+
return readServers.size();
154+
}
155+
156+
public int numberOfWriters()
157+
{
158+
return writeServers.size();
159+
}
160+
161+
public Set<BoltServerAddress> routingServers()
162+
{
163+
return Collections.unmodifiableSet( routingServers );
164+
}
165+
166+
public Set<BoltServerAddress> readServers()
167+
{
168+
return Collections.unmodifiableSet( readServers );
169+
}
170+
171+
public Set<BoltServerAddress> writeServers()
172+
{
173+
return Collections.unmodifiableSet( writeServers );
174+
}
175+
}

driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java

Lines changed: 5 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.Collections;
22-
import java.util.Comparator;
23-
import java.util.HashSet;
2421
import java.util.List;
2522
import java.util.Set;
2623

@@ -29,9 +26,7 @@
2926
import org.neo4j.driver.internal.spi.Connection;
3027
import org.neo4j.driver.internal.spi.ConnectionPool;
3128
import org.neo4j.driver.internal.util.Clock;
32-
import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet;
3329
import org.neo4j.driver.v1.AccessMode;
34-
import org.neo4j.driver.v1.Logger;
3530
import org.neo4j.driver.v1.Logging;
3631
import org.neo4j.driver.v1.Record;
3732
import org.neo4j.driver.v1.Session;
@@ -48,20 +43,7 @@ public class RoutingDriver extends BaseDriver
4843
{
4944
private static final String GET_SERVERS = "dbms.cluster.routing.getServers";
5045
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
51-
private final static Comparator<BoltServerAddress> COMPARATOR = new Comparator<BoltServerAddress>()
52-
{
53-
@Override
54-
public int compare( BoltServerAddress o1, BoltServerAddress o2 )
55-
{
56-
int compare = o1.host().compareTo( o2.host() );
57-
if ( compare == 0 )
58-
{
59-
compare = Integer.compare( o1.port(), o2.port() );
60-
}
6146

62-
return compare;
63-
}
64-
};
6547
private final ConnectionPool connections;
6648
private final Function<Connection,Session> sessionProvider;
6749
private final Clock clock;
@@ -197,117 +179,6 @@ List<BoltServerAddress> addresses()
197179
}
198180
}
199181

200-
private static class ClusterView
201-
{
202-
private static final int MIN_ROUTERS = 1;
203-
204-
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers =
205-
new ConcurrentRoundRobinSet<>( COMPARATOR );
206-
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers =
207-
new ConcurrentRoundRobinSet<>( COMPARATOR );
208-
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers =
209-
new ConcurrentRoundRobinSet<>( COMPARATOR );
210-
private final Clock clock;
211-
private final long expires;
212-
private final Logger log;
213-
214-
private ClusterView( long expires, Clock clock, Logger log )
215-
{
216-
this.expires = expires;
217-
this.clock = clock;
218-
this.log = log;
219-
}
220-
221-
public void addRouter( BoltServerAddress router )
222-
{
223-
this.routingServers.add( router );
224-
}
225-
226-
public boolean isStale()
227-
{
228-
return expires < clock.millis() ||
229-
routingServers.size() <= MIN_ROUTERS ||
230-
readServers.isEmpty() ||
231-
writeServers.isEmpty();
232-
}
233-
234-
Set<BoltServerAddress> all()
235-
{
236-
HashSet<BoltServerAddress> all =
237-
new HashSet<>( routingServers.size() + readServers.size() + writeServers.size() );
238-
all.addAll( routingServers );
239-
all.addAll( readServers );
240-
all.addAll( writeServers );
241-
return all;
242-
}
243-
244-
public int numberOfRouters()
245-
{
246-
return routingServers.size();
247-
}
248-
249-
public BoltServerAddress nextRouter()
250-
{
251-
return routingServers.hop();
252-
}
253-
254-
public BoltServerAddress nextReader()
255-
{
256-
return readServers.hop();
257-
}
258-
259-
public BoltServerAddress nextWriter()
260-
{
261-
return writeServers.hop();
262-
}
263-
264-
public void addReaders( List<BoltServerAddress> addresses )
265-
{
266-
readServers.addAll( addresses );
267-
}
268-
269-
public void addWriters( List<BoltServerAddress> addresses )
270-
{
271-
writeServers.addAll( addresses );
272-
}
273-
274-
public void addRouters( List<BoltServerAddress> addresses )
275-
{
276-
routingServers.addAll( addresses );
277-
}
278-
279-
public void remove( BoltServerAddress address )
280-
{
281-
if ( routingServers.remove( address ) )
282-
{
283-
log.debug( "Removing %s from routers", address.toString() );
284-
}
285-
if ( readServers.remove( address ) )
286-
{
287-
log.debug( "Removing %s from readers", address.toString() );
288-
}
289-
if ( writeServers.remove( address ) )
290-
{
291-
log.debug( "Removing %s from writers", address.toString() );
292-
}
293-
}
294-
295-
public boolean removeWriter( BoltServerAddress address )
296-
{
297-
return writeServers.remove( address );
298-
}
299-
300-
public int numberOfReaders()
301-
{
302-
return readServers.size();
303-
}
304-
305-
public int numberOfWriters()
306-
{
307-
return writeServers.size();
308-
}
309-
}
310-
311182
private List<ServerInfo> servers( Record record )
312183
{
313184
return record.get( "servers" ).asList( new Function<Value,ServerInfo>()
@@ -371,7 +242,8 @@ public Session session()
371242
@Override
372243
public Session session( final AccessMode mode )
373244
{
374-
return new RoutingNetworkSession( mode, acquireConnection( mode ),
245+
Connection connection = acquireConnection( mode );
246+
return new RoutingNetworkSession( new NetworkSession( connection ), mode, connection.address(),
375247
new RoutingErrorHandler()
376248
{
377249
@Override
@@ -458,19 +330,19 @@ public void close()
458330
//For testing
459331
public Set<BoltServerAddress> routingServers()
460332
{
461-
return Collections.unmodifiableSet( clusterView.routingServers );
333+
return clusterView.routingServers();
462334
}
463335

464336
//For testing
465337
public Set<BoltServerAddress> readServers()
466338
{
467-
return Collections.unmodifiableSet( clusterView.readServers );
339+
return clusterView.readServers();
468340
}
469341

470342
//For testing
471343
public Set<BoltServerAddress> writeServers()
472344
{
473-
return Collections.unmodifiableSet( clusterView.writeServers );
345+
return clusterView.writeServers( );
474346
}
475347

476348
//For testing

0 commit comments

Comments
 (0)