Skip to content

Commit 738f37a

Browse files
committed
Added couple unit tests
* to verify that created event loop threads are the same as default Netty event loop threads * to verify that it's possible to execute blocking operations in `ForkJoinPool.commonPool()` when chaining async stages
1 parent 11f735f commit 738f37a

File tree

3 files changed

+81
-4
lines changed

3 files changed

+81
-4
lines changed

driver/src/test/java/org/neo4j/driver/internal/async/EventLoopGroupFactoryTest.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@ public class EventLoopGroupFactoryTest
4343
@After
4444
public void tearDown()
4545
{
46-
if ( eventLoopGroup != null )
47-
{
48-
eventLoopGroup.shutdownGracefully().syncUninterruptibly();
49-
}
46+
shutdown( eventLoopGroup );
5047
}
5148

5249
@Test
@@ -91,4 +88,46 @@ public void shouldAssertNotInEventLoopThread() throws Exception
9188
assertThat( e.getCause(), is( blockingOperationInEventLoopError() ) );
9289
}
9390
}
91+
92+
/**
93+
* Test verifies that our event loop group uses same kind of thread as Netty does by default.
94+
* It's needed because default Netty setup has good performance.
95+
*/
96+
@Test
97+
public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Exception
98+
{
99+
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup( 1 );
100+
eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 );
101+
try
102+
{
103+
Thread defaultThread = getThread( nioEventLoopGroup );
104+
Thread driverThread = getThread( eventLoopGroup );
105+
106+
assertEquals( defaultThread.getClass(), driverThread.getClass().getSuperclass() );
107+
assertEquals( defaultThread.getPriority(), driverThread.getPriority() );
108+
}
109+
finally
110+
{
111+
shutdown( nioEventLoopGroup );
112+
}
113+
}
114+
115+
private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception
116+
{
117+
return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS );
118+
}
119+
120+
private static void shutdown( EventLoopGroup group )
121+
{
122+
if ( group != null )
123+
{
124+
try
125+
{
126+
group.shutdownGracefully().syncUninterruptibly();
127+
}
128+
catch ( Throwable ignore )
129+
{
130+
}
131+
}
132+
}
94133
}

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,22 @@ public void shouldFailToExecuteBlockingRunChainedWithAsyncRun()
10881088
}
10891089
}
10901090

1091+
@Test
1092+
public void shouldAllowBlockingOperationInCommonPoolWhenChaining()
1093+
{
1094+
CompletionStage<Node> nodeStage = session.runAsync( "RETURN 42 AS value" )
1095+
.thenCompose( StatementResultCursor::singleAsync )
1096+
// move execution to ForkJoinPool.commonPool()
1097+
.thenApplyAsync( record -> session.run( "CREATE (n:Node {value: $value}) RETURN n", record ) )
1098+
.thenApply( StatementResult::single )
1099+
.thenApply( record -> record.get( 0 ).asNode() );
1100+
1101+
Node node = await( nodeStage );
1102+
1103+
assertEquals( 42, node.get( "value" ).asInt() );
1104+
assertEquals( 1, countNodesByLabel( "Node" ) );
1105+
}
1106+
10911107
private Future<List<CompletionStage<Record>>> runNestedQueries( StatementResultCursor inputCursor )
10921108
{
10931109
CompletableFuture<List<CompletionStage<Record>>> resultFuture = new CompletableFuture<>();

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,28 @@ public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction()
12581258
}
12591259
}
12601260

1261+
@Test
1262+
public void shouldAllowUsingBlockingApiInCommonPoolWhenChaining()
1263+
{
1264+
CompletionStage<Transaction> txStage = session.beginTransactionAsync()
1265+
// move execution to ForkJoinPool.commonPool()
1266+
.thenApplyAsync( tx ->
1267+
{
1268+
tx.run( "UNWIND [1,1,2] AS x CREATE (:Node {id: x})" );
1269+
tx.run( "CREATE (:Node {id: 42})" );
1270+
tx.success();
1271+
tx.close();
1272+
return tx;
1273+
} );
1274+
1275+
Transaction tx = await( txStage );
1276+
1277+
assertFalse( tx.isOpen() );
1278+
assertEquals( 2, countNodes( 1 ) );
1279+
assertEquals( 1, countNodes( 2 ) );
1280+
assertEquals( 1, countNodes( 42 ) );
1281+
}
1282+
12611283
private int countNodes( Object id )
12621284
{
12631285
StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );

0 commit comments

Comments
 (0)