Skip to content

Commit cd24162

Browse files
committed
Operation on transactions should work with routing
We weren't checking for the cluster faillures when running on transactions. All operations should check for leader-switches and connection failures and handle them appropriately.
1 parent 25a8f38 commit cd24162

File tree

7 files changed

+614
-44
lines changed

7 files changed

+614
-44
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
import static java.lang.String.format;
4040
import static org.neo4j.driver.v1.Values.value;
4141

42+
/**
43+
* A session that safely handles routing errors.
44+
*/
4245
public class RoutingNetworkSession implements Session
4346
{
4447
protected final Session delegate;
@@ -107,13 +110,13 @@ public TypeSystem typeSystem()
107110
@Override
108111
public Transaction beginTransaction()
109112
{
110-
return delegate.beginTransaction();
113+
return new RoutingTransaction( delegate.beginTransaction(), mode, address, onError);
111114
}
112115

113116
@Override
114117
public Transaction beginTransaction( String bookmark )
115118
{
116-
return delegate.beginTransaction(bookmark);
119+
return new RoutingTransaction( delegate.beginTransaction(bookmark), mode, address, onError);
117120
}
118121

119122
@Override
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
22+
import java.util.Map;
23+
24+
import org.neo4j.driver.internal.net.BoltServerAddress;
25+
import org.neo4j.driver.v1.AccessMode;
26+
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.Statement;
28+
import org.neo4j.driver.v1.StatementResult;
29+
import org.neo4j.driver.v1.Transaction;
30+
import org.neo4j.driver.v1.Value;
31+
import org.neo4j.driver.v1.Values;
32+
import org.neo4j.driver.v1.exceptions.ClientException;
33+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
34+
import org.neo4j.driver.v1.types.TypeSystem;
35+
36+
import static org.neo4j.driver.internal.RoutingNetworkSession.filterFailureToWrite;
37+
import static org.neo4j.driver.internal.RoutingNetworkSession.sessionExpired;
38+
import static org.neo4j.driver.v1.Values.value;
39+
40+
/**
41+
* A transaction that safely handles routing errors.
42+
*/
43+
public class RoutingTransaction implements Transaction
44+
{
45+
protected final Transaction delegate;
46+
private final AccessMode mode;
47+
private final BoltServerAddress address;
48+
private final RoutingErrorHandler onError;
49+
50+
RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
51+
RoutingErrorHandler onError )
52+
{
53+
this.delegate = delegate;
54+
this.mode = mode;
55+
this.address = address;
56+
this.onError = onError;
57+
}
58+
59+
@Override
60+
public StatementResult run( String statementText )
61+
{
62+
return run( statementText, Values.EmptyMap );
63+
}
64+
65+
@Override
66+
public StatementResult run( String statementText, Map<String,Object> statementParameters )
67+
{
68+
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
69+
return run( statementText, params );
70+
}
71+
72+
@Override
73+
public StatementResult run( String statementTemplate, Record statementParameters )
74+
{
75+
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
76+
return run( statementTemplate, params );
77+
}
78+
79+
@Override
80+
public StatementResult run( String statementText, Value statementParameters )
81+
{
82+
return run( new Statement( statementText, statementParameters ) );
83+
}
84+
85+
@Override
86+
public StatementResult run( Statement statement )
87+
{
88+
try
89+
{
90+
return new RoutingStatementResult( delegate.run( statement ), mode, address, onError );
91+
}
92+
catch ( ConnectionFailureException e )
93+
{
94+
throw sessionExpired( e, onError, address );
95+
}
96+
catch ( ClientException e )
97+
{
98+
throw filterFailureToWrite( e, mode, onError, address );
99+
}
100+
}
101+
102+
@Override
103+
public TypeSystem typeSystem()
104+
{
105+
return delegate.typeSystem();
106+
}
107+
108+
109+
@Override
110+
public void success()
111+
{
112+
113+
}
114+
115+
@Override
116+
public void failure()
117+
{
118+
119+
}
120+
121+
@Override
122+
public boolean isOpen()
123+
{
124+
return false;
125+
}
126+
127+
@Override
128+
public void close()
129+
{
130+
try
131+
{
132+
delegate.close();
133+
}
134+
catch ( ConnectionFailureException e )
135+
{
136+
throw sessionExpired(e, onError, address);
137+
}
138+
catch ( ClientException e )
139+
{
140+
throw filterFailureToWrite( e, mode, onError, address );
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)