From f6222c7d3d0b4c72734a750e37cc88fc0cf6b73b Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 4 Jan 2017 23:38:45 +0100 Subject: [PATCH] Wait for bookmark when starting a transaction It is possible to specify bookmark when starting a transaction. First `BEGIN` statement would then contain the given bookmark in params. Database will wait when processing such `BEGIN` for the state described by the bookmark to appear. Sending `BEGIN` and bookmark was previously lazy. So it was possible to start a transaction with an invalid/unreachable bookmark. Error would in such case pop up only after the next sync (commit or consume) which is too late and can be quite confusing. This commit makes transaction eagerly send `BEGIN` and `PULL_ALL` messages when non-null bookmark is given. It makes `session#beginTransaction("bookmark")` call blocking but the API less surprising. --- .../driver/internal/ExplicitTransaction.java | 36 ++++++----- .../internal/ExplicitTransactionTest.java | 36 ++++++++++- .../driver/v1/integration/BookmarkIT.java | 51 ++++++++++++++-- .../v1/integration/CausalClusteringIT.java | 60 +++++++++++++++++++ 4 files changed, 162 insertions(+), 21 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index e7a8dc620b..29056d9238 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -21,8 +21,8 @@ import java.util.Collections; import java.util.Map; -import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.types.InternalTypeSystem; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; @@ -36,7 +36,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; - import static org.neo4j.driver.v1.Values.ofValue; import static org.neo4j.driver.v1.Values.value; @@ -81,18 +80,7 @@ private enum State { this.conn = conn; this.cleanup = cleanup; - - final Map parameters; - if ( bookmark == null ) - { - parameters = emptyMap(); - } - else - { - parameters = singletonMap( "bookmark", value( bookmark ) ); - } - conn.run( "BEGIN", parameters, Collector.NO_OP ); - conn.pullAll( Collector.NO_OP ); + runBeginStatement( conn, bookmark ); } @Override @@ -253,4 +241,24 @@ void setBookmark( String bookmark ) this.bookmark = bookmark; } + private static void runBeginStatement( Connection connection, String bookmark ) + { + Map parameters; + if ( bookmark != null ) + { + parameters = singletonMap( "bookmark", value( bookmark ) ); + } + else + { + parameters = emptyMap(); + } + + connection.run( "BEGIN", parameters, Collector.NO_OP ); + connection.pullAll( Collector.NO_OP ); + + if ( bookmark != null ) + { + connection.sync(); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index caa9a7f10e..13b199cf72 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -18,11 +18,12 @@ */ package org.neo4j.driver.internal; -import java.util.Collections; - import org.junit.Test; import org.mockito.InOrder; +import java.util.Collections; +import java.util.Map; + import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.v1.Value; @@ -30,9 +31,11 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.neo4j.driver.v1.Values.value; public class ExplicitTransactionTest { @@ -111,4 +114,33 @@ public void shouldCommitOnSuccess() throws Throwable verify( cleanup ).run(); verifyNoMoreInteractions( conn, cleanup ); } + + @Test + public void shouldOnlyQueueMessagesWhenNoBookmarkGiven() + { + Connection connection = mock( Connection.class ); + + new ExplicitTransaction( connection, mock( Runnable.class ), null ); + + InOrder inOrder = inOrder( connection ); + inOrder.verify( connection ).run( "BEGIN", Collections.emptyMap(), Collector.NO_OP ); + inOrder.verify( connection ).pullAll( Collector.NO_OP ); + inOrder.verify( connection, never() ).sync(); + } + + @Test + public void shouldSyncWhenBookmarkGiven() + { + String bookmark = "hi, I'm bookmark"; + Connection connection = mock( Connection.class ); + + new ExplicitTransaction( connection, mock( Runnable.class ), bookmark ); + + Map expectedParams = Collections.singletonMap( "bookmark", value( bookmark ) ); + + InOrder inOrder = inOrder( connection ); + inOrder.verify( connection ).run( "BEGIN", expectedParams, Collector.NO_OP ); + inOrder.verify( connection ).pullAll( Collector.NO_OP ); + inOrder.verify( connection ).sync(); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java index 82dba3598c..e3be0c8b28 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/BookmarkIT.java @@ -23,12 +23,14 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.util.TestNeo4jSession; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -58,15 +60,54 @@ public void shouldReceiveBookmarkOnSuccessfulCommit() throws Throwable assertNull( session.lastBookmark() ); // When + createNodeInTx( session ); + + // Then + assertNotNull( session.lastBookmark() ); + assertThat( session.lastBookmark(), startsWith( "neo4j:bookmark:v1:tx" ) ); + } + + @Test + public void shouldThrowForInvalidBookmark() + { + String invalidBookmark = "hi, this is an invalid bookmark"; + + try + { + session.beginTransaction( invalidBookmark ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + } + + @Test + public void shouldThrowForUnreachableBookmark() + { + createNodeInTx( session ); + + try + { + // todo: configure bookmark wait timeout to be lower than default 30sec when neo4j supports this + session.beginTransaction( session.lastBookmark() + 42 ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( TransientException.class ) ); + assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) ); + } + } + + private static void createNodeInTx( Session session ) + { try ( Transaction tx = session.beginTransaction() ) { tx.run( "CREATE (a:Person)" ); tx.success(); } - - // Then - assertNotNull( session.lastBookmark() ); - assertThat( session.lastBookmark(), startsWith( "neo4j:bookmark:v1:tx" ) ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index 2f8985e15c..64e6dd771e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -45,15 +45,21 @@ import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.Values; +import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.cc.Cluster; import org.neo4j.driver.v1.util.cc.ClusterMember; import org.neo4j.driver.v1.util.cc.ClusterRule; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -233,6 +239,60 @@ public void shouldDropBrokenOldSessions() throws Exception } } + @Test + public void beginTransactionThrowsForInvalidBookmark() + { + String invalidBookmark = "hi, this is an invalid bookmark"; + ClusterMember leader = clusterRule.getCluster().leader(); + + try ( Driver driver = createDriver( leader.getBoltUri() ); + Session session = driver.session() ) + { + try + { + session.beginTransaction( invalidBookmark ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + assertThat( e.getMessage(), containsString( invalidBookmark ) ); + } + } + } + + @Test + public void beginTransactionThrowsForUnreachableBookmark() + { + ClusterMember leader = clusterRule.getCluster().leader(); + + try ( Driver driver = createDriver( leader.getBoltUri() ); + Session session = driver.session() ) + { + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "CREATE ()" ); + tx.success(); + } + + String bookmark = session.lastBookmark(); + assertNotNull( bookmark ); + String newBookmark = bookmark + "0"; + + try + { + // todo: configure bookmark wait timeout to be lower than default 30sec when neo4j supports this + session.beginTransaction( newBookmark ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( TransientException.class ) ); + assertThat( e.getMessage(), startsWith( "Database not up to the requested version" ) ); + } + } + } + private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException { try ( Driver driver = createDriver( member.getRoutingUri() ) )