Skip to content

Commit f2f55c2

Browse files
committed
Fixed couple flaky tests
Tests that asserted blocking operations are prohibited in even loop threads were flaky. They assumed lambdas in `CompletionStage` chain are always executed by event loop threads. However, this is not true. There is no guarantee which thread executes the callback. This commit adds additional check to only perform blocking operation when in event loop thread. Exceptions are expected in this case.
1 parent 3eb4c9d commit f2f55c2

File tree

4 files changed

+79
-48
lines changed

4 files changed

+79
-48
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,25 @@ public static EventLoopGroup newEventLoopGroup()
8888
*/
8989
public static void assertNotInEventLoopThread() throws IllegalStateException
9090
{
91-
if ( Thread.currentThread() instanceof DriverThread )
91+
if ( isEventLoopThread( Thread.currentThread() ) )
9292
{
9393
throw new IllegalStateException(
9494
"Blocking operation can't be executed in IO thread because it might result in a deadlock. " +
9595
"Please do not use blocking API when chaining futures returned by async API methods." );
9696
}
9797
}
9898

99+
/**
100+
* Check if given thread is an event loop IO thread.
101+
*
102+
* @param thread the thread to check.
103+
* @return {@code true} when given thread belongs to the event loop, {@code false} otherwise.
104+
*/
105+
public static boolean isEventLoopThread( Thread thread )
106+
{
107+
return thread instanceof DriverThread;
108+
}
109+
99110
/**
100111
* Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of
101112
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.

driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import static org.hamcrest.Matchers.instanceOf;
3232
import static org.hamcrest.Matchers.is;
3333
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertFalse;
3435
import static org.junit.Assert.assertThat;
36+
import static org.junit.Assert.assertTrue;
3537
import static org.junit.Assert.fail;
3638
import static org.neo4j.driver.internal.util.Iterables.count;
3739
import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError;
@@ -77,7 +79,7 @@ public void shouldAssertNotInEventLoopThread() throws Exception
7779
EventLoopGroupFactory.assertNotInEventLoopThread();
7880

7981
// submit assertion to the event loop thread, it should fail there
80-
Future<?> assertFuture = eventLoopGroup.next().submit( EventLoopGroupFactory::assertNotInEventLoopThread );
82+
Future<?> assertFuture = eventLoopGroup.submit( EventLoopGroupFactory::assertNotInEventLoopThread );
8183
try
8284
{
8385
assertFuture.get( 30, SECONDS );
@@ -89,6 +91,17 @@ public void shouldAssertNotInEventLoopThread() throws Exception
8991
}
9092
}
9193

94+
@Test
95+
public void shouldCheckIfEventLoopThread() throws Exception
96+
{
97+
eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 );
98+
99+
Thread eventLoopThread = getThread( eventLoopGroup );
100+
assertTrue( EventLoopGroupFactory.isEventLoopThread( eventLoopThread ) );
101+
102+
assertFalse( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) );
103+
}
104+
92105
/**
93106
* Test verifies that our event loop group uses same kind of thread as Netty does by default.
94107
* It's needed because default Netty setup has good performance.
@@ -114,7 +127,7 @@ public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Ex
114127

115128
private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception
116129
{
117-
return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS );
130+
return eventLoopGroup.submit( Thread::currentThread ).get( 10, SECONDS );
118131
}
119132

120133
private static void shutdown( EventLoopGroup group )

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.atomic.AtomicInteger;
3939

40+
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
4041
import org.neo4j.driver.internal.util.Futures;
4142
import org.neo4j.driver.internal.util.ServerVersion;
4243
import org.neo4j.driver.v1.Record;
@@ -1045,47 +1046,50 @@ public void shouldBePossibleToMixRunAsyncAndBlockingSessionClose()
10451046
@Test
10461047
public void shouldFailToExecuteBlockingRunInAsyncTransactionFunction()
10471048
{
1048-
TransactionWork<CompletionStage<List<Record>>> completionStageTransactionWork = tx ->
1049+
TransactionWork<CompletionStage<Void>> completionStageTransactionWork = tx ->
10491050
{
1050-
StatementResult result = tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" );
1051-
List<Record> records = new ArrayList<>();
1052-
while ( result.hasNext() )
1051+
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
10531052
{
1054-
records.add( result.next() );
1053+
try
1054+
{
1055+
tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" );
1056+
fail( "Exception expected" );
1057+
}
1058+
catch ( IllegalStateException e )
1059+
{
1060+
assertThat( e, is( blockingOperationInEventLoopError() ) );
1061+
}
10551062
}
1056-
1057-
return completedFuture( records );
1063+
return completedFuture( null );
10581064
};
10591065

1060-
CompletionStage<List<Record>> result = session.readTransactionAsync( completionStageTransactionWork );
1061-
1062-
try
1063-
{
1064-
await( result );
1065-
fail( "Exception expected" );
1066-
}
1067-
catch ( IllegalStateException e )
1068-
{
1069-
assertThat( e, is( blockingOperationInEventLoopError() ) );
1070-
}
1066+
CompletionStage<Void> result = session.readTransactionAsync( completionStageTransactionWork );
1067+
assertNull( await( result ) );
10711068
}
10721069

10731070
@Test
10741071
public void shouldFailToExecuteBlockingRunChainedWithAsyncRun()
10751072
{
1076-
CompletionStage<StatementResult> result = session.runAsync( "RETURN 1" )
1073+
CompletionStage<Void> result = session.runAsync( "RETURN 1" )
10771074
.thenCompose( StatementResultCursor::singleAsync )
1078-
.thenApply( record -> session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ) );
1075+
.thenApply( record ->
1076+
{
1077+
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
1078+
{
1079+
try
1080+
{
1081+
session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) );
1082+
fail( "Exception expected" );
1083+
}
1084+
catch ( IllegalStateException e )
1085+
{
1086+
assertThat( e, is( blockingOperationInEventLoopError() ) );
1087+
}
1088+
}
1089+
return null;
1090+
} );
10791091

1080-
try
1081-
{
1082-
await( result );
1083-
fail( "Exception expected" );
1084-
}
1085-
catch ( IllegalStateException e )
1086-
{
1087-
assertThat( e, is( blockingOperationInEventLoopError() ) );
1088-
}
1092+
assertNull( await( result ) );
10891093
}
10901094

10911095
@Test

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

3838
import org.neo4j.driver.internal.ExplicitTransaction;
39+
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
3940
import org.neo4j.driver.v1.Record;
4041
import org.neo4j.driver.v1.Session;
4142
import org.neo4j.driver.v1.Statement;
@@ -1239,23 +1240,25 @@ public void shouldPropagateFailureFromSummary()
12391240
@Test
12401241
public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction()
12411242
{
1242-
assumeDatabaseSupportsBookmarks();
1243-
1244-
session.writeTransaction( tx -> tx.run( "CREATE ()" ) );
1245-
assertNotNull( session.lastBookmark() );
1246-
1247-
CompletionStage<StatementResult> result = session.beginTransactionAsync()
1248-
.thenApply( tx -> tx.run( "CREATE ()" ) );
1243+
CompletionStage<Void> result = session.beginTransactionAsync()
1244+
.thenApply( tx ->
1245+
{
1246+
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
1247+
{
1248+
try
1249+
{
1250+
tx.run( "CREATE ()" );
1251+
fail( "Exception expected" );
1252+
}
1253+
catch ( IllegalStateException e )
1254+
{
1255+
assertThat( e, is( blockingOperationInEventLoopError() ) );
1256+
}
1257+
}
1258+
return null;
1259+
} );
12491260

1250-
try
1251-
{
1252-
await( result );
1253-
fail( "Exception expected" );
1254-
}
1255-
catch ( IllegalStateException e )
1256-
{
1257-
assertThat( e, is( blockingOperationInEventLoopError() ) );
1258-
}
1261+
assertNull( await( result ) );
12591262
}
12601263

12611264
@Test

0 commit comments

Comments
 (0)