Skip to content

Commit 91f5f06

Browse files
authored
Merge pull request #448 from lutovich/1.5-flaky-tests
Fixed couple flaky tests and improved null handling
2 parents b3876ea + 992ef0e commit 91f5f06

File tree

6 files changed

+103
-49
lines changed

6 files changed

+103
-49
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,10 @@ private <T> CompletionStage<T> safeExecuteWork( ExplicitTransaction tx, Transact
360360
// sync failure will result in an exception being thrown
361361
try
362362
{
363-
return work.execute( tx );
363+
CompletionStage<T> result = work.execute( tx );
364+
365+
// protect from given transaction function returning null
366+
return result == null ? completedFuture( null ) : result;
364367
}
365368
catch ( Throwable workError )
366369
{

driver/src/main/java/org/neo4j/driver/internal/async/EventLoopGroupFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,25 @@ public static EventLoopGroup newEventLoopGroup()
8888
*/
8989
public static void assertNotInEventLoopThread() throws IllegalStateException
9090
{
91-
if ( Thread.currentThread() instanceof DriverThread )
91+
if ( isEventLoopThread( Thread.currentThread() ) )
9292
{
9393
throw new IllegalStateException(
9494
"Blocking operation can't be executed in IO thread because it might result in a deadlock. " +
9595
"Please do not use blocking API when chaining futures returned by async API methods." );
9696
}
9797
}
9898

99+
/**
100+
* Check if given thread is an event loop IO thread.
101+
*
102+
* @param thread the thread to check.
103+
* @return {@code true} when given thread belongs to the event loop, {@code false} otherwise.
104+
*/
105+
public static boolean isEventLoopThread( Thread thread )
106+
{
107+
return thread instanceof DriverThread;
108+
}
109+
99110
/**
100111
* Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of
101112
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.

driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import static org.hamcrest.Matchers.instanceOf;
3232
import static org.hamcrest.Matchers.is;
3333
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertFalse;
3435
import static org.junit.Assert.assertThat;
36+
import static org.junit.Assert.assertTrue;
3537
import static org.junit.Assert.fail;
3638
import static org.neo4j.driver.internal.util.Iterables.count;
3739
import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError;
@@ -77,7 +79,7 @@ public void shouldAssertNotInEventLoopThread() throws Exception
7779
EventLoopGroupFactory.assertNotInEventLoopThread();
7880

7981
// submit assertion to the event loop thread, it should fail there
80-
Future<?> assertFuture = eventLoopGroup.next().submit( EventLoopGroupFactory::assertNotInEventLoopThread );
82+
Future<?> assertFuture = eventLoopGroup.submit( EventLoopGroupFactory::assertNotInEventLoopThread );
8183
try
8284
{
8385
assertFuture.get( 30, SECONDS );
@@ -89,6 +91,17 @@ public void shouldAssertNotInEventLoopThread() throws Exception
8991
}
9092
}
9193

94+
@Test
95+
public void shouldCheckIfEventLoopThread() throws Exception
96+
{
97+
eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 );
98+
99+
Thread eventLoopThread = getThread( eventLoopGroup );
100+
assertTrue( EventLoopGroupFactory.isEventLoopThread( eventLoopThread ) );
101+
102+
assertFalse( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) );
103+
}
104+
92105
/**
93106
* Test verifies that our event loop group uses same kind of thread as Netty does by default.
94107
* It's needed because default Netty setup has good performance.
@@ -114,7 +127,7 @@ public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Ex
114127

115128
private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception
116129
{
117-
return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS );
130+
return eventLoopGroup.submit( Thread::currentThread ).get( 10, SECONDS );
118131
}
119132

120133
private static void shutdown( EventLoopGroup group )

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.atomic.AtomicInteger;
3939

40+
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
4041
import org.neo4j.driver.internal.util.Futures;
4142
import org.neo4j.driver.internal.util.ServerVersion;
4243
import org.neo4j.driver.v1.Record;
@@ -1045,47 +1046,50 @@ public void shouldBePossibleToMixRunAsyncAndBlockingSessionClose()
10451046
@Test
10461047
public void shouldFailToExecuteBlockingRunInAsyncTransactionFunction()
10471048
{
1048-
TransactionWork<CompletionStage<List<Record>>> completionStageTransactionWork = tx ->
1049+
TransactionWork<CompletionStage<Void>> completionStageTransactionWork = tx ->
10491050
{
1050-
StatementResult result = tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" );
1051-
List<Record> records = new ArrayList<>();
1052-
while ( result.hasNext() )
1051+
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
10531052
{
1054-
records.add( result.next() );
1053+
try
1054+
{
1055+
tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" );
1056+
fail( "Exception expected" );
1057+
}
1058+
catch ( IllegalStateException e )
1059+
{
1060+
assertThat( e, is( blockingOperationInEventLoopError() ) );
1061+
}
10551062
}
1056-
1057-
return completedFuture( records );
1063+
return completedFuture( null );
10581064
};
10591065

1060-
CompletionStage<List<Record>> result = session.readTransactionAsync( completionStageTransactionWork );
1061-
1062-
try
1063-
{
1064-
await( result );
1065-
fail( "Exception expected" );
1066-
}
1067-
catch ( IllegalStateException e )
1068-
{
1069-
assertThat( e, is( blockingOperationInEventLoopError() ) );
1070-
}
1066+
CompletionStage<Void> result = session.readTransactionAsync( completionStageTransactionWork );
1067+
assertNull( await( result ) );
10711068
}
10721069

10731070
@Test
10741071
public void shouldFailToExecuteBlockingRunChainedWithAsyncRun()
10751072
{
1076-
CompletionStage<StatementResult> result = session.runAsync( "RETURN 1" )
1073+
CompletionStage<Void> result = session.runAsync( "RETURN 1" )
10771074
.thenCompose( StatementResultCursor::singleAsync )
1078-
.thenApply( record -> session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ) );
1075+
.thenApply( record ->
1076+
{
1077+
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
1078+
{
1079+
try
1080+
{
1081+
session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) );
1082+
fail( "Exception expected" );
1083+
}
1084+
catch ( IllegalStateException e )
1085+
{
1086+
assertThat( e, is( blockingOperationInEventLoopError() ) );
1087+
}
1088+
}
1089+
return null;
1090+
} );
10791091

1080-
try
1081-
{
1082-
await( result );
1083-
fail( "Exception expected" );
1084-
}
1085-
catch ( IllegalStateException e )
1086-
{
1087-
assertThat( e, is( blockingOperationInEventLoopError() ) );
1088-
}
1092+
assertNull( await( result ) );
10891093
}
10901094

10911095
@Test
@@ -1153,6 +1157,16 @@ public void shouldAllowAccessingRecordsAfterSessionClosed()
11531157
}
11541158
}
11551159

1160+
@Test
1161+
public void shouldAllowReturningNullFromAsyncTransactionFunction()
1162+
{
1163+
CompletionStage<Object> readResult = session.readTransactionAsync( tx -> null );
1164+
assertNull( await( readResult ) );
1165+
1166+
CompletionStage<Object> writeResult = session.writeTransactionAsync( tx -> null );
1167+
assertNull( await( writeResult ) );
1168+
}
1169+
11561170
private Future<List<CompletionStage<Record>>> runNestedQueries( StatementResultCursor inputCursor )
11571171
{
11581172
CompletableFuture<List<CompletionStage<Record>>> resultFuture = new CompletableFuture<>();

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,6 +1529,16 @@ public void shouldAllowLongRunningQueryWithConnectTimeout() throws Exception
15291529
}
15301530
}
15311531

1532+
@Test
1533+
public void shouldAllowReturningNullFromTransactionFunction()
1534+
{
1535+
try ( Session session = neo4j.driver().session() )
1536+
{
1537+
assertNull( session.readTransaction( tx -> null ) );
1538+
assertNull( session.writeTransaction( tx -> null ) );
1539+
}
1540+
}
1541+
15321542
private void assumeServerIs31OrLater()
15331543
{
15341544
ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

3838
import org.neo4j.driver.internal.ExplicitTransaction;
39+
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
3940
import org.neo4j.driver.v1.Record;
4041
import org.neo4j.driver.v1.Session;
4142
import org.neo4j.driver.v1.Statement;
@@ -1239,23 +1240,25 @@ public void shouldPropagateFailureFromSummary()
12391240
@Test
12401241
public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction()
12411242
{
1242-
assumeDatabaseSupportsBookmarks();
1243-
1244-
session.writeTransaction( tx -> tx.run( "CREATE ()" ) );
1245-
assertNotNull( session.lastBookmark() );
1246-
1247-
CompletionStage<StatementResult> result = session.beginTransactionAsync()
1248-
.thenApply( tx -> tx.run( "CREATE ()" ) );
1243+
CompletionStage<Void> result = session.beginTransactionAsync()
1244+
.thenApply( tx ->
1245+
{
1246+
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
1247+
{
1248+
try
1249+
{
1250+
tx.run( "CREATE ()" );
1251+
fail( "Exception expected" );
1252+
}
1253+
catch ( IllegalStateException e )
1254+
{
1255+
assertThat( e, is( blockingOperationInEventLoopError() ) );
1256+
}
1257+
}
1258+
return null;
1259+
} );
12491260

1250-
try
1251-
{
1252-
await( result );
1253-
fail( "Exception expected" );
1254-
}
1255-
catch ( IllegalStateException e )
1256-
{
1257-
assertThat( e, is( blockingOperationInEventLoopError() ) );
1258-
}
1261+
assertNull( await( result ) );
12591262
}
12601263

12611264
@Test

0 commit comments

Comments
 (0)