diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 0d3cea6c77..292dc19f43 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -360,7 +360,10 @@ private CompletionStage safeExecuteWork( ExplicitTransaction tx, Transact // sync failure will result in an exception being thrown try { - return work.execute( tx ); + CompletionStage result = work.execute( tx ); + + // protect from given transaction function returning null + return result == null ? completedFuture( null ) : result; } catch ( Throwable workError ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java index 72438f4ebc..02ace46b88 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java @@ -88,7 +88,7 @@ public static EventLoopGroup newEventLoopGroup() */ public static void assertNotInEventLoopThread() throws IllegalStateException { - if ( Thread.currentThread() instanceof DriverThread ) + if ( isEventLoopThread( Thread.currentThread() ) ) { throw new IllegalStateException( "Blocking operation can't be executed in IO thread because it might result in a deadlock. " + @@ -96,6 +96,17 @@ public static void assertNotInEventLoopThread() throws IllegalStateException } } + /** + * Check if given thread is an event loop IO thread. + * + * @param thread the thread to check. + * @return {@code true} when given thread belongs to the event loop, {@code false} otherwise. + */ + public static boolean isEventLoopThread( Thread thread ) + { + return thread instanceof DriverThread; + } + /** * Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of * {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}. diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java index 7cbdc5e501..14476ba393 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java @@ -31,7 +31,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.util.Iterables.count; import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError; @@ -77,7 +79,7 @@ public void shouldAssertNotInEventLoopThread() throws Exception EventLoopGroupFactory.assertNotInEventLoopThread(); // submit assertion to the event loop thread, it should fail there - Future assertFuture = eventLoopGroup.next().submit( EventLoopGroupFactory::assertNotInEventLoopThread ); + Future assertFuture = eventLoopGroup.submit( EventLoopGroupFactory::assertNotInEventLoopThread ); try { assertFuture.get( 30, SECONDS ); @@ -89,6 +91,17 @@ public void shouldAssertNotInEventLoopThread() throws Exception } } + @Test + public void shouldCheckIfEventLoopThread() throws Exception + { + eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 ); + + Thread eventLoopThread = getThread( eventLoopGroup ); + assertTrue( EventLoopGroupFactory.isEventLoopThread( eventLoopThread ) ); + + assertFalse( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ); + } + /** * Test verifies that our event loop group uses same kind of thread as Netty does by default. * It's needed because default Netty setup has good performance. @@ -114,7 +127,7 @@ public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Ex private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception { - return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS ); + return eventLoopGroup.submit( Thread::currentThread ).get( 10, SECONDS ); } private static void shutdown( EventLoopGroup group ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index d62b0916b0..bf94886e39 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -37,6 +37,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.async.EventLoopGroupFactory; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Record; @@ -1045,47 +1046,50 @@ public void shouldBePossibleToMixRunAsyncAndBlockingSessionClose() @Test public void shouldFailToExecuteBlockingRunInAsyncTransactionFunction() { - TransactionWork>> completionStageTransactionWork = tx -> + TransactionWork> completionStageTransactionWork = tx -> { - StatementResult result = tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" ); - List records = new ArrayList<>(); - while ( result.hasNext() ) + if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ) { - records.add( result.next() ); + try + { + tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e, is( blockingOperationInEventLoopError() ) ); + } } - - return completedFuture( records ); + return completedFuture( null ); }; - CompletionStage> result = session.readTransactionAsync( completionStageTransactionWork ); - - try - { - await( result ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e, is( blockingOperationInEventLoopError() ) ); - } + CompletionStage result = session.readTransactionAsync( completionStageTransactionWork ); + assertNull( await( result ) ); } @Test public void shouldFailToExecuteBlockingRunChainedWithAsyncRun() { - CompletionStage result = session.runAsync( "RETURN 1" ) + CompletionStage result = session.runAsync( "RETURN 1" ) .thenCompose( StatementResultCursor::singleAsync ) - .thenApply( record -> session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ) ); + .thenApply( record -> + { + if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ) + { + try + { + session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e, is( blockingOperationInEventLoopError() ) ); + } + } + return null; + } ); - try - { - await( result ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e, is( blockingOperationInEventLoopError() ) ); - } + assertNull( await( result ) ); } @Test @@ -1153,6 +1157,16 @@ public void shouldAllowAccessingRecordsAfterSessionClosed() } } + @Test + public void shouldAllowReturningNullFromAsyncTransactionFunction() + { + CompletionStage readResult = session.readTransactionAsync( tx -> null ); + assertNull( await( readResult ) ); + + CompletionStage writeResult = session.writeTransactionAsync( tx -> null ); + assertNull( await( writeResult ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 6826369508..9ff8981aae 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -1529,6 +1529,16 @@ public void shouldAllowLongRunningQueryWithConnectTimeout() throws Exception } } + @Test + public void shouldAllowReturningNullFromTransactionFunction() + { + try ( Session session = neo4j.driver().session() ) + { + assertNull( session.readTransaction( tx -> null ) ); + assertNull( session.writeTransaction( tx -> null ) ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index eca133acd8..24ed96e85a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.ExplicitTransaction; +import org.neo4j.driver.internal.async.EventLoopGroupFactory; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -1239,23 +1240,25 @@ public void shouldPropagateFailureFromSummary() @Test public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction() { - assumeDatabaseSupportsBookmarks(); - - session.writeTransaction( tx -> tx.run( "CREATE ()" ) ); - assertNotNull( session.lastBookmark() ); - - CompletionStage result = session.beginTransactionAsync() - .thenApply( tx -> tx.run( "CREATE ()" ) ); + CompletionStage result = session.beginTransactionAsync() + .thenApply( tx -> + { + if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ) + { + try + { + tx.run( "CREATE ()" ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e, is( blockingOperationInEventLoopError() ) ); + } + } + return null; + } ); - try - { - await( result ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e, is( blockingOperationInEventLoopError() ) ); - } + assertNull( await( result ) ); } @Test