Skip to content

Commit f6222c7

Browse files
committed
Wait for bookmark when starting a transaction
It is possible to specify bookmark when starting a transaction. First `BEGIN` statement would then contain the given bookmark in params. Database will wait when processing such `BEGIN` for the state described by the bookmark to appear. Sending `BEGIN` and bookmark was previously lazy. So it was possible to start a transaction with an invalid/unreachable bookmark. Error would in such case pop up only after the next sync (commit or consume) which is too late and can be quite confusing. This commit makes transaction eagerly send `BEGIN` and `PULL_ALL` messages when non-null bookmark is given. It makes `session#beginTransaction("bookmark")` call blocking but the API less surprising.
1 parent e24d07d commit f6222c7

File tree

4 files changed

+162
-21
lines changed

4 files changed

+162
-21
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import java.util.Collections;
2222
import java.util.Map;
2323

24-
import org.neo4j.driver.internal.spi.Connection;
2524
import org.neo4j.driver.internal.spi.Collector;
25+
import org.neo4j.driver.internal.spi.Connection;
2626
import org.neo4j.driver.internal.types.InternalTypeSystem;
2727
import org.neo4j.driver.v1.Record;
2828
import org.neo4j.driver.v1.Statement;
@@ -36,7 +36,6 @@
3636

3737
import static java.util.Collections.emptyMap;
3838
import static java.util.Collections.singletonMap;
39-
4039
import static org.neo4j.driver.v1.Values.ofValue;
4140
import static org.neo4j.driver.v1.Values.value;
4241

@@ -81,18 +80,7 @@ private enum State
8180
{
8281
this.conn = conn;
8382
this.cleanup = cleanup;
84-
85-
final Map<String, Value> parameters;
86-
if ( bookmark == null )
87-
{
88-
parameters = emptyMap();
89-
}
90-
else
91-
{
92-
parameters = singletonMap( "bookmark", value( bookmark ) );
93-
}
94-
conn.run( "BEGIN", parameters, Collector.NO_OP );
95-
conn.pullAll( Collector.NO_OP );
83+
runBeginStatement( conn, bookmark );
9684
}
9785

9886
@Override
@@ -253,4 +241,24 @@ void setBookmark( String bookmark )
253241
this.bookmark = bookmark;
254242
}
255243

244+
private static void runBeginStatement( Connection connection, String bookmark )
245+
{
246+
Map<String,Value> parameters;
247+
if ( bookmark != null )
248+
{
249+
parameters = singletonMap( "bookmark", value( bookmark ) );
250+
}
251+
else
252+
{
253+
parameters = emptyMap();
254+
}
255+
256+
connection.run( "BEGIN", parameters, Collector.NO_OP );
257+
connection.pullAll( Collector.NO_OP );
258+
259+
if ( bookmark != null )
260+
{
261+
connection.sync();
262+
}
263+
}
256264
}

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,24 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.Collections;
22-
2321
import org.junit.Test;
2422
import org.mockito.InOrder;
2523

24+
import java.util.Collections;
25+
import java.util.Map;
26+
2627
import org.neo4j.driver.internal.spi.Collector;
2728
import org.neo4j.driver.internal.spi.Connection;
2829
import org.neo4j.driver.v1.Value;
2930

3031
import static org.mockito.Matchers.any;
3132
import static org.mockito.Mockito.inOrder;
3233
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.never;
3335
import static org.mockito.Mockito.verify;
3436
import static org.mockito.Mockito.verifyNoMoreInteractions;
3537
import static org.mockito.Mockito.when;
38+
import static org.neo4j.driver.v1.Values.value;
3639

3740
public class ExplicitTransactionTest
3841
{
@@ -111,4 +114,33 @@ public void shouldCommitOnSuccess() throws Throwable
111114
verify( cleanup ).run();
112115
verifyNoMoreInteractions( conn, cleanup );
113116
}
117+
118+
@Test
119+
public void shouldOnlyQueueMessagesWhenNoBookmarkGiven()
120+
{
121+
Connection connection = mock( Connection.class );
122+
123+
new ExplicitTransaction( connection, mock( Runnable.class ), null );
124+
125+
InOrder inOrder = inOrder( connection );
126+
inOrder.verify( connection ).run( "BEGIN", Collections.<String,Value>emptyMap(), Collector.NO_OP );
127+
inOrder.verify( connection ).pullAll( Collector.NO_OP );
128+
inOrder.verify( connection, never() ).sync();
129+
}
130+
131+
@Test
132+
public void shouldSyncWhenBookmarkGiven()
133+
{
134+
String bookmark = "hi, I'm bookmark";
135+
Connection connection = mock( Connection.class );
136+
137+
new ExplicitTransaction( connection, mock( Runnable.class ), bookmark );
138+
139+
Map<String,Value> expectedParams = Collections.singletonMap( "bookmark", value( bookmark ) );
140+
141+
InOrder inOrder = inOrder( connection );
142+
inOrder.verify( connection ).run( "BEGIN", expectedParams, Collector.NO_OP );
143+
inOrder.verify( connection ).pullAll( Collector.NO_OP );
144+
inOrder.verify( connection ).sync();
145+
}
114146
}

driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import org.junit.Test;
2424
import org.junit.rules.ExpectedException;
2525

26+
import org.neo4j.driver.v1.Session;
2627
import org.neo4j.driver.v1.Transaction;
2728
import org.neo4j.driver.v1.exceptions.ClientException;
29+
import org.neo4j.driver.v1.exceptions.TransientException;
2830
import org.neo4j.driver.v1.util.TestNeo4jSession;
2931

3032
import static org.hamcrest.Matchers.instanceOf;
31-
import static org.hamcrest.core.StringStartsWith.startsWith;
33+
import static org.hamcrest.Matchers.startsWith;
3234
import static org.junit.Assert.assertNotNull;
3335
import static org.junit.Assert.assertNull;
3436
import static org.junit.Assert.assertThat;
@@ -58,15 +60,54 @@ public void shouldReceiveBookmarkOnSuccessfulCommit() throws Throwable
5860
assertNull( session.lastBookmark() );
5961

6062
// When
63+
createNodeInTx( session );
64+
65+
// Then
66+
assertNotNull( session.lastBookmark() );
67+
assertThat( session.lastBookmark(), startsWith( "neo4j:bookmark:v1:tx" ) );
68+
}
69+
70+
@Test
71+
public void shouldThrowForInvalidBookmark()
72+
{
73+
String invalidBookmark = "hi, this is an invalid bookmark";
74+
75+
try
76+
{
77+
session.beginTransaction( invalidBookmark );
78+
fail( "Exception expected" );
79+
}
80+
catch ( Exception e )
81+
{
82+
assertThat( e, instanceOf( ClientException.class ) );
83+
}
84+
}
85+
86+
@Test
87+
public void shouldThrowForUnreachableBookmark()
88+
{
89+
createNodeInTx( session );
90+
91+
try
92+
{
93+
// todo: configure bookmark wait timeout to be lower than default 30sec when neo4j supports this
94+
session.beginTransaction( session.lastBookmark() + 42 );
95+
fail( "Exception expected" );
96+
}
97+
catch ( Exception e )
98+
{
99+
assertThat( e, instanceOf( TransientException.class ) );
100+
assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) );
101+
}
102+
}
103+
104+
private static void createNodeInTx( Session session )
105+
{
61106
try ( Transaction tx = session.beginTransaction() )
62107
{
63108
tx.run( "CREATE (a:Person)" );
64109
tx.success();
65110
}
66-
67-
// Then
68-
assertNotNull( session.lastBookmark() );
69-
assertThat( session.lastBookmark(), startsWith( "neo4j:bookmark:v1:tx" ) );
70111
}
71112

72113
@Test

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,21 @@
4545
import org.neo4j.driver.v1.Session;
4646
import org.neo4j.driver.v1.Transaction;
4747
import org.neo4j.driver.v1.Values;
48+
import org.neo4j.driver.v1.exceptions.ClientException;
4849
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4950
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
51+
import org.neo4j.driver.v1.exceptions.TransientException;
5052
import org.neo4j.driver.v1.util.Function;
5153
import org.neo4j.driver.v1.util.cc.Cluster;
5254
import org.neo4j.driver.v1.util.cc.ClusterMember;
5355
import org.neo4j.driver.v1.util.cc.ClusterRule;
5456

57+
import static org.hamcrest.Matchers.containsString;
58+
import static org.hamcrest.Matchers.instanceOf;
59+
import static org.hamcrest.Matchers.startsWith;
5560
import static org.junit.Assert.assertEquals;
5661
import static org.junit.Assert.assertNotNull;
62+
import static org.junit.Assert.assertThat;
5763
import static org.junit.Assert.assertTrue;
5864
import static org.junit.Assert.fail;
5965

@@ -233,6 +239,60 @@ public void shouldDropBrokenOldSessions() throws Exception
233239
}
234240
}
235241

242+
@Test
243+
public void beginTransactionThrowsForInvalidBookmark()
244+
{
245+
String invalidBookmark = "hi, this is an invalid bookmark";
246+
ClusterMember leader = clusterRule.getCluster().leader();
247+
248+
try ( Driver driver = createDriver( leader.getBoltUri() );
249+
Session session = driver.session() )
250+
{
251+
try
252+
{
253+
session.beginTransaction( invalidBookmark );
254+
fail( "Exception expected" );
255+
}
256+
catch ( Exception e )
257+
{
258+
assertThat( e, instanceOf( ClientException.class ) );
259+
assertThat( e.getMessage(), containsString( invalidBookmark ) );
260+
}
261+
}
262+
}
263+
264+
@Test
265+
public void beginTransactionThrowsForUnreachableBookmark()
266+
{
267+
ClusterMember leader = clusterRule.getCluster().leader();
268+
269+
try ( Driver driver = createDriver( leader.getBoltUri() );
270+
Session session = driver.session() )
271+
{
272+
try ( Transaction tx = session.beginTransaction() )
273+
{
274+
tx.run( "CREATE ()" );
275+
tx.success();
276+
}
277+
278+
String bookmark = session.lastBookmark();
279+
assertNotNull( bookmark );
280+
String newBookmark = bookmark + "0";
281+
282+
try
283+
{
284+
// todo: configure bookmark wait timeout to be lower than default 30sec when neo4j supports this
285+
session.beginTransaction( newBookmark );
286+
fail( "Exception expected" );
287+
}
288+
catch ( Exception e )
289+
{
290+
assertThat( e, instanceOf( TransientException.class ) );
291+
assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) );
292+
}
293+
}
294+
}
295+
236296
private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException
237297
{
238298
try ( Driver driver = createDriver( member.getRoutingUri() ) )

0 commit comments

Comments
 (0)