1
1
/**
2
2
* Copyright (c) 2002-2016 "Neo Technology,"
3
3
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4
- *
4
+ * <p>
5
5
* This file is part of Neo4j.
6
- *
6
+ * <p>
7
7
* Licensed under the Apache License, Version 2.0 (the "License");
8
8
* you may not use this file except in compliance with the License.
9
9
* You may obtain a copy of the License at
10
- *
11
- * http://www.apache.org/licenses/LICENSE-2.0
12
- *
10
+ * <p>
11
+ * http://www.apache.org/licenses/LICENSE-2.0
12
+ * <p>
13
13
* Unless required by applicable law or agreed to in writing, software
14
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21
21
import org .junit .Rule ;
22
22
import org .junit .Test ;
23
23
24
+ import java .net .URI ;
24
25
import java .util .concurrent .TimeoutException ;
25
26
26
- import org .neo4j .driver .internal .util . Consumer ;
27
+ import org .neo4j .driver .internal .logging . DevNullLogger ;
27
28
import org .neo4j .driver .v1 .AccessMode ;
29
+ import org .neo4j .driver .v1 .Config ;
28
30
import org .neo4j .driver .v1 .Driver ;
29
31
import org .neo4j .driver .v1 .GraphDatabase ;
32
+ import org .neo4j .driver .v1 .Logger ;
33
+ import org .neo4j .driver .v1 .Logging ;
30
34
import org .neo4j .driver .v1 .Record ;
31
35
import org .neo4j .driver .v1 .Session ;
36
+ import org .neo4j .driver .v1 .Transaction ;
37
+ import org .neo4j .driver .v1 .Values ;
38
+ import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
32
39
import org .neo4j .driver .v1 .exceptions .SessionExpiredException ;
33
40
import org .neo4j .driver .v1 .util .Function ;
34
41
import org .neo4j .driver .v1 .util .cc .Cluster ;
35
42
import org .neo4j .driver .v1 .util .cc .ClusterMember ;
36
43
import org .neo4j .driver .v1 .util .cc .ClusterRule ;
37
44
45
+ import static org .junit .Assert .assertEquals ;
46
+ import static org .junit .Assert .assertNotNull ;
47
+ import static org .junit .Assert .fail ;
48
+
38
49
public class CausalClusteringIT
39
50
{
40
51
private static final long DEFAULT_TIMEOUT_MS = 15_000 ;
@@ -47,26 +58,149 @@ public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() th
47
58
{
48
59
Cluster cluster = clusterRule .getCluster ();
49
60
50
- ClusterMember leader = cluster .leaderTx ( new Consumer <Session >()
61
+ int count = executeWriteAndReadThroughBolt ( cluster .leader () );
62
+
63
+ assertEquals ( 1 , count );
64
+ }
65
+
66
+ @ Test
67
+ public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfFollower () throws Exception
68
+ {
69
+ Cluster cluster = clusterRule .getCluster ();
70
+
71
+ int count = executeWriteAndReadThroughBolt ( cluster .anyFollower () );
72
+
73
+ assertEquals ( 1 , count );
74
+ }
75
+
76
+ @ Test
77
+ public void sessionCreationShouldFailIfCallingDiscoveryProcedureOnEdgeServer () throws Exception
78
+ {
79
+ Cluster cluster = clusterRule .getCluster ();
80
+
81
+ ClusterMember readReplica = cluster .anyReadReplica ();
82
+ try
51
83
{
52
- @ Override
53
- public void accept ( Session session )
84
+ createDriver ( readReplica .getRoutingUri () );
85
+ fail ( "Should have thrown an exception using a read replica address for routing" );
86
+ }
87
+ catch ( ServiceUnavailableException ex )
88
+ {
89
+ assertEquals ( "Could not perform discovery. No routing servers available." , ex .getMessage () );
90
+ }
91
+ }
92
+
93
+ // Ensure that Bookmarks work with single instances using a driver created using a bolt[not+routing] URI.
94
+ @ Test
95
+ public void bookmarksShouldWorkWithDriverPinnedToSingleServer () throws Exception
96
+ {
97
+ Cluster cluster = clusterRule .getCluster ();
98
+ ClusterMember leader = cluster .leader ();
99
+
100
+ try ( Driver driver = createDriver ( leader .getBoltUri () ) )
101
+ {
102
+ String bookmark = inExpirableSession ( driver , createSession (), new Function <Session ,String >()
103
+ {
104
+ @ Override
105
+ public String apply ( Session session )
106
+ {
107
+ try ( Transaction tx = session .beginTransaction () )
108
+ {
109
+ tx .run ( "CREATE (p:Person {name: {name} })" , Values .parameters ( "name" , "Alistair" ) );
110
+ tx .success ();
111
+ }
112
+
113
+ return session .lastBookmark ();
114
+ }
115
+ } );
116
+
117
+ assertNotNull ( bookmark );
118
+
119
+ try ( Session session = driver .session ();
120
+ Transaction tx = session .beginTransaction ( bookmark ) )
54
121
{
55
- session .run ( "CREATE CONSTRAINT ON (p:Person) ASSERT p.name is UNIQUE" );
122
+ Record record = tx .run ( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next ();
123
+ assertEquals ( 1 , record .get ( "count" ).asInt () );
124
+ tx .success ();
56
125
}
57
- } );
126
+ }
127
+ }
128
+
129
+ @ Test
130
+ public void shouldUseBookmarkFromAReadSessionInAWriteSession () throws Exception
131
+ {
132
+ Cluster cluster = clusterRule .getCluster ();
133
+ ClusterMember leader = cluster .leader ();
58
134
59
- executeWriteAndReadThroughBolt ( leader );
135
+ try ( Driver driver = createDriver ( leader .getBoltUri () ) )
136
+ {
137
+ inExpirableSession ( driver , createWritableSession (), new Function <Session ,Void >()
138
+ {
139
+ @ Override
140
+ public Void apply ( Session session )
141
+ {
142
+ session .run ( "CREATE (p:Person {name: {name} })" , Values .parameters ( "name" , "Jim" ) );
143
+ return null ;
144
+ }
145
+ } );
146
+
147
+ final String bookmark ;
148
+ try ( Session session = driver .session ( AccessMode .READ ) )
149
+ {
150
+ try ( Transaction tx = session .beginTransaction () )
151
+ {
152
+ tx .run ( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next ();
153
+ tx .success ();
154
+ }
155
+
156
+ bookmark = session .lastBookmark ();
157
+ }
158
+
159
+ assertNotNull ( bookmark );
160
+
161
+ inExpirableSession ( driver , createWritableSession (), new Function <Session ,Void >()
162
+ {
163
+ @ Override
164
+ public Void apply ( Session session )
165
+ {
166
+ try ( Transaction tx = session .beginTransaction ( bookmark ) )
167
+ {
168
+ tx .run ( "CREATE (p:Person {name: {name} })" , Values .parameters ( "name" , "Alistair" ) );
169
+ tx .success ();
170
+ }
171
+
172
+ return null ;
173
+ }
174
+ } );
175
+
176
+ try ( Session session = driver .session () )
177
+ {
178
+ Record record = session .run ( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next ();
179
+ assertEquals ( 2 , record .get ( "count" ).asInt () );
180
+ }
181
+ }
60
182
}
61
183
62
184
private int executeWriteAndReadThroughBolt ( ClusterMember member ) throws TimeoutException , InterruptedException
63
185
{
64
- try ( Driver driver = GraphDatabase . driver ( member .getRoutingUri (), clusterRule . getDefaultAuthToken () ) )
186
+ try ( Driver driver = createDriver ( member .getRoutingUri () ) )
65
187
{
66
188
return inExpirableSession ( driver , createWritableSession (), executeWriteAndRead () );
67
189
}
68
190
}
69
191
192
+ private Function <Driver ,Session > createSession ()
193
+ {
194
+ return new Function <Driver ,Session >()
195
+ {
196
+ @ Override
197
+ public Session apply ( Driver driver )
198
+ {
199
+ return driver .session ();
200
+ }
201
+ };
202
+ }
203
+
70
204
private Function <Driver ,Session > createWritableSession ()
71
205
{
72
206
return new Function <Driver ,Session >()
@@ -113,4 +247,22 @@ private <T> T inExpirableSession( Driver driver, Function<Driver,Session> acquir
113
247
114
248
throw new TimeoutException ( "Transaction did not succeed in time" );
115
249
}
250
+
251
+ private Driver createDriver ( URI boltUri )
252
+ {
253
+ Logging devNullLogging = new Logging ()
254
+ {
255
+ @ Override
256
+ public Logger getLog ( String name )
257
+ {
258
+ return DevNullLogger .DEV_NULL_LOGGER ;
259
+ }
260
+ };
261
+
262
+ Config config = Config .build ()
263
+ .withLogging ( devNullLogging )
264
+ .toConfig ();
265
+
266
+ return GraphDatabase .driver ( boltUri , clusterRule .getDefaultAuthToken (), config );
267
+ }
116
268
}
0 commit comments