Skip to content

Commit 3ca3290

Browse files
author
Zhen Li
authored
Merge pull request #303 from neo4j/1.1-block-for-bookmark
Wait for bookmark when starting a transaction
2 parents e24d07d + f6222c7 commit 3ca3290

File tree

4 files changed

+162
-21
lines changed

4 files changed

+162
-21
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import java.util.Collections;
2222
import java.util.Map;
2323

24-
import org.neo4j.driver.internal.spi.Connection;
2524
import org.neo4j.driver.internal.spi.Collector;
25+
import org.neo4j.driver.internal.spi.Connection;
2626
import org.neo4j.driver.internal.types.InternalTypeSystem;
2727
import org.neo4j.driver.v1.Record;
2828
import org.neo4j.driver.v1.Statement;
@@ -36,7 +36,6 @@
3636

3737
import static java.util.Collections.emptyMap;
3838
import static java.util.Collections.singletonMap;
39-
4039
import static org.neo4j.driver.v1.Values.ofValue;
4140
import static org.neo4j.driver.v1.Values.value;
4241

@@ -81,18 +80,7 @@ private enum State
8180
{
8281
this.conn = conn;
8382
this.cleanup = cleanup;
84-
85-
final Map<String, Value> parameters;
86-
if ( bookmark == null )
87-
{
88-
parameters = emptyMap();
89-
}
90-
else
91-
{
92-
parameters = singletonMap( "bookmark", value( bookmark ) );
93-
}
94-
conn.run( "BEGIN", parameters, Collector.NO_OP );
95-
conn.pullAll( Collector.NO_OP );
83+
runBeginStatement( conn, bookmark );
9684
}
9785

9886
@Override
@@ -253,4 +241,24 @@ void setBookmark( String bookmark )
253241
this.bookmark = bookmark;
254242
}
255243

244+
private static void runBeginStatement( Connection connection, String bookmark )
245+
{
246+
Map<String,Value> parameters;
247+
if ( bookmark != null )
248+
{
249+
parameters = singletonMap( "bookmark", value( bookmark ) );
250+
}
251+
else
252+
{
253+
parameters = emptyMap();
254+
}
255+
256+
connection.run( "BEGIN", parameters, Collector.NO_OP );
257+
connection.pullAll( Collector.NO_OP );
258+
259+
if ( bookmark != null )
260+
{
261+
connection.sync();
262+
}
263+
}
256264
}

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,24 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.Collections;
22-
2321
import org.junit.Test;
2422
import org.mockito.InOrder;
2523

24+
import java.util.Collections;
25+
import java.util.Map;
26+
2627
import org.neo4j.driver.internal.spi.Collector;
2728
import org.neo4j.driver.internal.spi.Connection;
2829
import org.neo4j.driver.v1.Value;
2930

3031
import static org.mockito.Matchers.any;
3132
import static org.mockito.Mockito.inOrder;
3233
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.never;
3335
import static org.mockito.Mockito.verify;
3436
import static org.mockito.Mockito.verifyNoMoreInteractions;
3537
import static org.mockito.Mockito.when;
38+
import static org.neo4j.driver.v1.Values.value;
3639

3740
public class ExplicitTransactionTest
3841
{
@@ -111,4 +114,33 @@ public void shouldCommitOnSuccess() throws Throwable
111114
verify( cleanup ).run();
112115
verifyNoMoreInteractions( conn, cleanup );
113116
}
117+
118+
@Test
119+
public void shouldOnlyQueueMessagesWhenNoBookmarkGiven()
120+
{
121+
Connection connection = mock( Connection.class );
122+
123+
new ExplicitTransaction( connection, mock( Runnable.class ), null );
124+
125+
InOrder inOrder = inOrder( connection );
126+
inOrder.verify( connection ).run( "BEGIN", Collections.<String,Value>emptyMap(), Collector.NO_OP );
127+
inOrder.verify( connection ).pullAll( Collector.NO_OP );
128+
inOrder.verify( connection, never() ).sync();
129+
}
130+
131+
@Test
132+
public void shouldSyncWhenBookmarkGiven()
133+
{
134+
String bookmark = "hi, I'm bookmark";
135+
Connection connection = mock( Connection.class );
136+
137+
new ExplicitTransaction( connection, mock( Runnable.class ), bookmark );
138+
139+
Map<String,Value> expectedParams = Collections.singletonMap( "bookmark", value( bookmark ) );
140+
141+
InOrder inOrder = inOrder( connection );
142+
inOrder.verify( connection ).run( "BEGIN", expectedParams, Collector.NO_OP );
143+
inOrder.verify( connection ).pullAll( Collector.NO_OP );
144+
inOrder.verify( connection ).sync();
145+
}
114146
}

driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import org.junit.Test;
2424
import org.junit.rules.ExpectedException;
2525

26+
import org.neo4j.driver.v1.Session;
2627
import org.neo4j.driver.v1.Transaction;
2728
import org.neo4j.driver.v1.exceptions.ClientException;
29+
import org.neo4j.driver.v1.exceptions.TransientException;
2830
import org.neo4j.driver.v1.util.TestNeo4jSession;
2931

3032
import static org.hamcrest.Matchers.instanceOf;
31-
import static org.hamcrest.core.StringStartsWith.startsWith;
33+
import static org.hamcrest.Matchers.startsWith;
3234
import static org.junit.Assert.assertNotNull;
3335
import static org.junit.Assert.assertNull;
3436
import static org.junit.Assert.assertThat;
@@ -58,15 +60,54 @@ public void shouldReceiveBookmarkOnSuccessfulCommit() throws Throwable
5860
assertNull( session.lastBookmark() );
5961

6062
// When
63+
createNodeInTx( session );
64+
65+
// Then
66+
assertNotNull( session.lastBookmark() );
67+
assertThat( session.lastBookmark(), startsWith( "neo4j:bookmark:v1:tx" ) );
68+
}
69+
70+
@Test
71+
public void shouldThrowForInvalidBookmark()
72+
{
73+
String invalidBookmark = "hi, this is an invalid bookmark";
74+
75+
try
76+
{
77+
session.beginTransaction( invalidBookmark );
78+
fail( "Exception expected" );
79+
}
80+
catch ( Exception e )
81+
{
82+
assertThat( e, instanceOf( ClientException.class ) );
83+
}
84+
}
85+
86+
@Test
87+
public void shouldThrowForUnreachableBookmark()
88+
{
89+
createNodeInTx( session );
90+
91+
try
92+
{
93+
// todo: configure bookmark wait timeout to be lower than default 30sec when neo4j supports this
94+
session.beginTransaction( session.lastBookmark() + 42 );
95+
fail( "Exception expected" );
96+
}
97+
catch ( Exception e )
98+
{
99+
assertThat( e, instanceOf( TransientException.class ) );
100+
assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) );
101+
}
102+
}
103+
104+
private static void createNodeInTx( Session session )
105+
{
61106
try ( Transaction tx = session.beginTransaction() )
62107
{
63108
tx.run( "CREATE (a:Person)" );
64109
tx.success();
65110
}
66-
67-
// Then
68-
assertNotNull( session.lastBookmark() );
69-
assertThat( session.lastBookmark(), startsWith( "neo4j:bookmark:v1:tx" ) );
70111
}
71112

72113
@Test

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,21 @@
4545
import org.neo4j.driver.v1.Session;
4646
import org.neo4j.driver.v1.Transaction;
4747
import org.neo4j.driver.v1.Values;
48+
import org.neo4j.driver.v1.exceptions.ClientException;
4849
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4950
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
51+
import org.neo4j.driver.v1.exceptions.TransientException;
5052
import org.neo4j.driver.v1.util.Function;
5153
import org.neo4j.driver.v1.util.cc.Cluster;
5254
import org.neo4j.driver.v1.util.cc.ClusterMember;
5355
import org.neo4j.driver.v1.util.cc.ClusterRule;
5456

57+
import static org.hamcrest.Matchers.containsString;
58+
import static org.hamcrest.Matchers.instanceOf;
59+
import static org.hamcrest.Matchers.startsWith;
5560
import static org.junit.Assert.assertEquals;
5661
import static org.junit.Assert.assertNotNull;
62+
import static org.junit.Assert.assertThat;
5763
import static org.junit.Assert.assertTrue;
5864
import static org.junit.Assert.fail;
5965

@@ -233,6 +239,60 @@ public void shouldDropBrokenOldSessions() throws Exception
233239
}
234240
}
235241

242+
@Test
243+
public void beginTransactionThrowsForInvalidBookmark()
244+
{
245+
String invalidBookmark = "hi, this is an invalid bookmark";
246+
ClusterMember leader = clusterRule.getCluster().leader();
247+
248+
try ( Driver driver = createDriver( leader.getBoltUri() );
249+
Session session = driver.session() )
250+
{
251+
try
252+
{
253+
session.beginTransaction( invalidBookmark );
254+
fail( "Exception expected" );
255+
}
256+
catch ( Exception e )
257+
{
258+
assertThat( e, instanceOf( ClientException.class ) );
259+
assertThat( e.getMessage(), containsString( invalidBookmark ) );
260+
}
261+
}
262+
}
263+
264+
@Test
265+
public void beginTransactionThrowsForUnreachableBookmark()
266+
{
267+
ClusterMember leader = clusterRule.getCluster().leader();
268+
269+
try ( Driver driver = createDriver( leader.getBoltUri() );
270+
Session session = driver.session() )
271+
{
272+
try ( Transaction tx = session.beginTransaction() )
273+
{
274+
tx.run( "CREATE ()" );
275+
tx.success();
276+
}
277+
278+
String bookmark = session.lastBookmark();
279+
assertNotNull( bookmark );
280+
String newBookmark = bookmark + "0";
281+
282+
try
283+
{
284+
// todo: configure bookmark wait timeout to be lower than default 30sec when neo4j supports this
285+
session.beginTransaction( newBookmark );
286+
fail( "Exception expected" );
287+
}
288+
catch ( Exception e )
289+
{
290+
assertThat( e, instanceOf( TransientException.class ) );
291+
assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) );
292+
}
293+
}
294+
}
295+
236296
private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException
237297
{
238298
try ( Driver driver = createDriver( member.getRoutingUri() ) )

0 commit comments

Comments
 (0)