From f2f55c2743400d96e8d01a7e803d729be3454017 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 14 Dec 2017 15:47:27 +0100 Subject: [PATCH 1/2] Fixed couple flaky tests Tests that asserted blocking operations are prohibited in even loop threads were flaky. They assumed lambdas in `CompletionStage` chain are always executed by event loop threads. However, this is not true. There is no guarantee which thread executes the callback. This commit adds additional check to only perform blocking operation when in event loop thread. Exceptions are expected in this case. --- .../internal/async/EventLoopGroupFactory.java | 13 +++- .../async/EventLoopGroupFactoryTest.java | 17 ++++- .../driver/v1/integration/SessionAsyncIT.java | 62 ++++++++++--------- .../v1/integration/TransactionAsyncIT.java | 35 ++++++----- 4 files changed, 79 insertions(+), 48 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java index 72438f4ebc..02ace46b88 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java @@ -88,7 +88,7 @@ public static EventLoopGroup newEventLoopGroup() */ public static void assertNotInEventLoopThread() throws IllegalStateException { - if ( Thread.currentThread() instanceof DriverThread ) + if ( isEventLoopThread( Thread.currentThread() ) ) { throw new IllegalStateException( "Blocking operation can't be executed in IO thread because it might result in a deadlock. " + @@ -96,6 +96,17 @@ public static void assertNotInEventLoopThread() throws IllegalStateException } } + /** + * Check if given thread is an event loop IO thread. + * + * @param thread the thread to check. + * @return {@code true} when given thread belongs to the event loop, {@code false} otherwise. + */ + public static boolean isEventLoopThread( Thread thread ) + { + return thread instanceof DriverThread; + } + /** * Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of * {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}. diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java index 7cbdc5e501..14476ba393 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java @@ -31,7 +31,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.util.Iterables.count; import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError; @@ -77,7 +79,7 @@ public void shouldAssertNotInEventLoopThread() throws Exception EventLoopGroupFactory.assertNotInEventLoopThread(); // submit assertion to the event loop thread, it should fail there - Future assertFuture = eventLoopGroup.next().submit( EventLoopGroupFactory::assertNotInEventLoopThread ); + Future assertFuture = eventLoopGroup.submit( EventLoopGroupFactory::assertNotInEventLoopThread ); try { assertFuture.get( 30, SECONDS ); @@ -89,6 +91,17 @@ public void shouldAssertNotInEventLoopThread() throws Exception } } + @Test + public void shouldCheckIfEventLoopThread() throws Exception + { + eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 ); + + Thread eventLoopThread = getThread( eventLoopGroup ); + assertTrue( EventLoopGroupFactory.isEventLoopThread( eventLoopThread ) ); + + assertFalse( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ); + } + /** * Test verifies that our event loop group uses same kind of thread as Netty does by default. * It's needed because default Netty setup has good performance. @@ -114,7 +127,7 @@ public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Ex private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception { - return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS ); + return eventLoopGroup.submit( Thread::currentThread ).get( 10, SECONDS ); } private static void shutdown( EventLoopGroup group ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index d62b0916b0..56d0e1bd0e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -37,6 +37,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.async.EventLoopGroupFactory; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Record; @@ -1045,47 +1046,50 @@ public void shouldBePossibleToMixRunAsyncAndBlockingSessionClose() @Test public void shouldFailToExecuteBlockingRunInAsyncTransactionFunction() { - TransactionWork>> completionStageTransactionWork = tx -> + TransactionWork> completionStageTransactionWork = tx -> { - StatementResult result = tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" ); - List records = new ArrayList<>(); - while ( result.hasNext() ) + if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ) { - records.add( result.next() ); + try + { + tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e, is( blockingOperationInEventLoopError() ) ); + } } - - return completedFuture( records ); + return completedFuture( null ); }; - CompletionStage> result = session.readTransactionAsync( completionStageTransactionWork ); - - try - { - await( result ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e, is( blockingOperationInEventLoopError() ) ); - } + CompletionStage result = session.readTransactionAsync( completionStageTransactionWork ); + assertNull( await( result ) ); } @Test public void shouldFailToExecuteBlockingRunChainedWithAsyncRun() { - CompletionStage result = session.runAsync( "RETURN 1" ) + CompletionStage result = session.runAsync( "RETURN 1" ) .thenCompose( StatementResultCursor::singleAsync ) - .thenApply( record -> session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ) ); + .thenApply( record -> + { + if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ) + { + try + { + session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e, is( blockingOperationInEventLoopError() ) ); + } + } + return null; + } ); - try - { - await( result ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e, is( blockingOperationInEventLoopError() ) ); - } + assertNull( await( result ) ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index eca133acd8..24ed96e85a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.ExplicitTransaction; +import org.neo4j.driver.internal.async.EventLoopGroupFactory; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -1239,23 +1240,25 @@ public void shouldPropagateFailureFromSummary() @Test public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction() { - assumeDatabaseSupportsBookmarks(); - - session.writeTransaction( tx -> tx.run( "CREATE ()" ) ); - assertNotNull( session.lastBookmark() ); - - CompletionStage result = session.beginTransactionAsync() - .thenApply( tx -> tx.run( "CREATE ()" ) ); + CompletionStage result = session.beginTransactionAsync() + .thenApply( tx -> + { + if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) ) + { + try + { + tx.run( "CREATE ()" ); + fail( "Exception expected" ); + } + catch ( IllegalStateException e ) + { + assertThat( e, is( blockingOperationInEventLoopError() ) ); + } + } + return null; + } ); - try - { - await( result ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e, is( blockingOperationInEventLoopError() ) ); - } + assertNull( await( result ) ); } @Test From 992ef0e91631b684c29339c8a226e812363ba8b8 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 14 Dec 2017 15:54:17 +0100 Subject: [PATCH 2/2] Handle nulls returned from async tx functions Previously code failed with NPE when provided async transaction function returned null. This commit fixes the problem by wrapping null in a completed future. --- .../java/org/neo4j/driver/internal/NetworkSession.java | 5 ++++- .../neo4j/driver/v1/integration/SessionAsyncIT.java | 10 ++++++++++ .../org/neo4j/driver/v1/integration/SessionIT.java | 10 ++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 0d3cea6c77..292dc19f43 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -360,7 +360,10 @@ private CompletionStage safeExecuteWork( ExplicitTransaction tx, Transact // sync failure will result in an exception being thrown try { - return work.execute( tx ); + CompletionStage result = work.execute( tx ); + + // protect from given transaction function returning null + return result == null ? completedFuture( null ) : result; } catch ( Throwable workError ) { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 56d0e1bd0e..bf94886e39 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -1157,6 +1157,16 @@ public void shouldAllowAccessingRecordsAfterSessionClosed() } } + @Test + public void shouldAllowReturningNullFromAsyncTransactionFunction() + { + CompletionStage readResult = session.readTransactionAsync( tx -> null ); + assertNull( await( readResult ) ); + + CompletionStage writeResult = session.writeTransactionAsync( tx -> null ); + assertNull( await( writeResult ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 6826369508..9ff8981aae 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -1529,6 +1529,16 @@ public void shouldAllowLongRunningQueryWithConnectTimeout() throws Exception } } + @Test + public void shouldAllowReturningNullFromTransactionFunction() + { + try ( Session session = neo4j.driver().session() ) + { + assertNull( session.readTransaction( tx -> null ) ); + assertNull( session.writeTransaction( tx -> null ) ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );