Skip to content

Commit 30ddc26

Browse files
committed
Merge branch '4.3' into feature/runprocessing
2 parents 38ff211 + 463195a commit 30ddc26

File tree

3 files changed

+129
-15
lines changed

3 files changed

+129
-15
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

+22-15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
24+
import io.netty.channel.ChannelPromise;
2425
import io.netty.channel.pool.ChannelHealthChecker;
2526
import io.netty.channel.pool.FixedChannelPool;
2627

@@ -66,21 +67,27 @@ public class NettyChannelPool implements ExtendedChannelPool
6667
protected ChannelFuture connectChannel( Bootstrap bootstrap )
6768
{
6869
ListenerEvent creatingEvent = handler.channelCreating( id );
69-
ChannelFuture channelFuture = connector.connect( address, bootstrap );
70-
channelFuture.addListener( future -> {
71-
if ( future.isSuccess() )
72-
{
73-
// notify pool handler about a successful connection
74-
Channel channel = channelFuture.channel();
75-
setPoolId( channel, id );
76-
handler.channelCreated( channel, creatingEvent );
77-
}
78-
else
79-
{
80-
handler.channelFailedToCreate( id );
81-
}
82-
} );
83-
return channelFuture;
70+
ChannelFuture connectedChannelFuture = connector.connect( address, bootstrap );
71+
Channel channel = connectedChannelFuture.channel();
72+
// This ensures that handler.channelCreated is called before SimpleChannelPool calls handler.channelAcquired
73+
ChannelPromise trackedChannelFuture = channel.newPromise();
74+
connectedChannelFuture.addListener(
75+
future ->
76+
{
77+
if ( future.isSuccess() )
78+
{
79+
// notify pool handler about a successful connection
80+
setPoolId( channel, id );
81+
handler.channelCreated( channel, creatingEvent );
82+
trackedChannelFuture.setSuccess();
83+
}
84+
else
85+
{
86+
handler.channelFailedToCreate( id );
87+
trackedChannelFuture.setFailure( future.cause() );
88+
}
89+
} );
90+
return trackedChannelFuture;
8491
}
8592
};
8693
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.docs.driver;
20+
21+
// tag::async-result-consume-import[]
22+
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionStage;
27+
28+
import org.neo4j.driver.async.AsyncSession;
29+
import org.neo4j.driver.async.AsyncTransaction;
30+
import org.neo4j.driver.async.ResultCursor;
31+
import org.neo4j.driver.summary.ResultSummary;
32+
import org.neo4j.driver.summary.SummaryCounters;
33+
34+
import static org.neo4j.driver.Values.parameters;
35+
// end::async-result-consume-import[]
36+
37+
public class AsyncRunMultipleTransactionExample extends BaseApplication
38+
{
39+
public AsyncRunMultipleTransactionExample( String uri, String user, String password )
40+
{
41+
super( uri, user, password );
42+
}
43+
44+
// tag::async-multiple-tx[]
45+
public CompletionStage<Integer> addEmployees( final String companyName )
46+
{
47+
AsyncSession session = driver.asyncSession();
48+
49+
return session.readTransactionAsync( AsyncRunMultipleTransactionExample::matchPersonNodes )
50+
.thenCompose( personNames -> session.writeTransactionAsync( tx -> createNodes( tx, companyName, personNames ) ) );
51+
}
52+
53+
private static CompletionStage<List<String>> matchPersonNodes( AsyncTransaction tx )
54+
{
55+
return tx.runAsync( "MATCH (a:Person) RETURN a.name AS name" )
56+
.thenCompose( cursor -> cursor.listAsync( record -> record.get( "name" ).asString() ) );
57+
}
58+
59+
private static CompletableFuture<Integer> createNodes( AsyncTransaction tx, String companyName, List<String> personNames )
60+
{
61+
CompletableFuture<Integer>[] nodeCreatedCounts = personNames.stream()
62+
.map( personName -> createNode(
63+
tx,
64+
companyName,
65+
personName ) )
66+
.toArray(
67+
size -> new CompletableFuture[size] );
68+
return CompletableFuture.allOf( nodeCreatedCounts )
69+
.thenApply( ignored ->
70+
Arrays.stream(
71+
nodeCreatedCounts )
72+
.map( CompletableFuture::join )
73+
.reduce( 0,
74+
Integer::sum ) );
75+
}
76+
77+
private static CompletionStage<Integer> createNode( AsyncTransaction tx, String companyName, String personName )
78+
{
79+
return tx.runAsync( "MATCH (emp:Person {name: $person_name}) " +
80+
"MERGE (com:Company {name: $company_name}) " +
81+
"MERGE (emp)-[:WORKS_FOR]->(com)",
82+
parameters( "person_name", personName, "company_name", companyName ) )
83+
.thenCompose( ResultCursor::consumeAsync )
84+
.thenApply( ResultSummary::counters )
85+
.thenApply( SummaryCounters::nodesCreated );
86+
}
87+
// end::async-multiple-tx[]
88+
}

examples/src/test/java/org/neo4j/docs/driver/ExamplesIT.java

+19
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,25 @@ void testShouldAsyncRunResultConsumeExample() throws Exception
187187
}
188188
}
189189

190+
@Test
191+
void testShouldAsyncRunMultipleTransactionExample() throws Exception
192+
{
193+
// Given
194+
write( "CREATE (a:Person {name: 'Alice'})" );
195+
write( "CREATE (a:Person {name: 'Bob'})" );
196+
try ( AsyncRunMultipleTransactionExample example = new AsyncRunMultipleTransactionExample( uri, USER, PASSWORD ) )
197+
{
198+
// When
199+
Integer nodesCreated = await( example.addEmployees("Acme") );
200+
201+
// Then
202+
int employeeCount = readInt(
203+
"MATCH (emp:Person)-[WORKS_FOR]->(com:Company) WHERE com.name = 'Acme' RETURN count(emp)" );
204+
assertThat( employeeCount, equalTo( 2 ) );
205+
assertThat( nodesCreated, equalTo( 1 ) );
206+
}
207+
}
208+
190209
@Test
191210
void testShouldRunConfigConnectionPoolExample() throws Exception
192211
{

0 commit comments

Comments
 (0)