Skip to content

Commit 3acc015

Browse files
committed
Adding and example for how to run multiple transactions within a single async session (neo4j#899)
1 parent 635e404 commit 3acc015

File tree

2 files changed

+107
-0
lines changed

2 files changed

+107
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.docs.driver;
20+
21+
// tag::async-result-consume-import[]
22+
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CompletionStage;
27+
28+
import org.neo4j.driver.async.AsyncSession;
29+
import org.neo4j.driver.async.AsyncTransaction;
30+
import org.neo4j.driver.async.ResultCursor;
31+
import org.neo4j.driver.summary.ResultSummary;
32+
import org.neo4j.driver.summary.SummaryCounters;
33+
34+
import static org.neo4j.driver.Values.parameters;
35+
// end::async-result-consume-import[]
36+
37+
public class AsyncRunMultipleTransactionExample extends BaseApplication
38+
{
39+
public AsyncRunMultipleTransactionExample( String uri, String user, String password )
40+
{
41+
super( uri, user, password );
42+
}
43+
44+
// tag::async-multiple-tx[]
45+
public CompletionStage<Integer> addEmployees( final String companyName )
46+
{
47+
AsyncSession session = driver.asyncSession();
48+
49+
return session.readTransactionAsync( AsyncRunMultipleTransactionExample::matchPersonNodes )
50+
.thenCompose( personNames -> session.writeTransactionAsync( tx -> createNodes( tx, companyName, personNames ) ) );
51+
}
52+
53+
private static CompletionStage<List<String>> matchPersonNodes( AsyncTransaction tx )
54+
{
55+
return tx.runAsync( "MATCH (a:Person) RETURN a.name AS name" )
56+
.thenCompose( cursor -> cursor.listAsync( record -> record.get( "name" ).asString() ) );
57+
}
58+
59+
private static CompletableFuture<Integer> createNodes( AsyncTransaction tx, String companyName, List<String> personNames )
60+
{
61+
CompletableFuture<Integer>[] nodeCreatedCounts = personNames.stream()
62+
.map( personName -> createNode(
63+
tx,
64+
companyName,
65+
personName ) )
66+
.toArray(
67+
size -> new CompletableFuture[size] );
68+
return CompletableFuture.allOf( nodeCreatedCounts )
69+
.thenApply( ignored ->
70+
Arrays.stream(
71+
nodeCreatedCounts )
72+
.map( CompletableFuture::join )
73+
.reduce( 0,
74+
Integer::sum ) );
75+
}
76+
77+
private static CompletionStage<Integer> createNode( AsyncTransaction tx, String companyName, String personName )
78+
{
79+
return tx.runAsync( "MATCH (emp:Person {name: $person_name}) " +
80+
"MERGE (com:Company {name: $company_name}) " +
81+
"MERGE (emp)-[:WORKS_FOR]->(com)",
82+
parameters( "person_name", personName, "company_name", companyName ) )
83+
.thenCompose( ResultCursor::consumeAsync )
84+
.thenApply( ResultSummary::counters )
85+
.thenApply( SummaryCounters::nodesCreated );
86+
}
87+
// end::async-multiple-tx[]
88+
}

examples/src/test/java/org/neo4j/docs/driver/ExamplesIT.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,25 @@ void testShouldAsyncRunResultConsumeExample() throws Exception
187187
}
188188
}
189189

190+
@Test
191+
void testShouldAsyncRunMultipleTransactionExample() throws Exception
192+
{
193+
// Given
194+
write( "CREATE (a:Person {name: 'Alice'})" );
195+
write( "CREATE (a:Person {name: 'Bob'})" );
196+
try ( AsyncRunMultipleTransactionExample example = new AsyncRunMultipleTransactionExample( uri, USER, PASSWORD ) )
197+
{
198+
// When
199+
Integer nodesCreated = await( example.addEmployees("Acme") );
200+
201+
// Then
202+
int employeeCount = readInt(
203+
"MATCH (emp:Person)-[WORKS_FOR]->(com:Company) WHERE com.name = 'Acme' RETURN count(emp)" );
204+
assertThat( employeeCount, equalTo( 2 ) );
205+
assertThat( nodesCreated, equalTo( 1 ) );
206+
}
207+
}
208+
190209
@Test
191210
void testShouldRunConfigConnectionPoolExample() throws Exception
192211
{

0 commit comments

Comments
 (0)