Skip to content

Commit 354384c

Browse files
Use RxJava '.using' for managing the session. (#799) (#810)
This aligns the RxJava examples with the Project Reactor ones and makes sure that the session is closed. Much more important however is the fact that the session is not opened in the first place prior to subscription to the flows. Also take note that I have rewritten the unmanaged variant for Project Reactor. While the previous setup does work in our test rig, I had issues by using a subscriber and not the await testing tool. Co-authored-by: Michael Simons <[email protected]>
1 parent aacb383 commit 354384c

File tree

4 files changed

+72
-37
lines changed

4 files changed

+72
-37
lines changed

examples/src/main/java/org/neo4j/docs/driver/RxAutocommitTransactionExample.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.reactivex.Flowable;
2222
// tag::rx-autocommit-transaction-import[]
23+
import io.reactivex.Observable;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
2526

@@ -54,13 +55,11 @@ public Flowable<String> readProductTitlesRxJava()
5455
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
5556
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
5657

57-
RxSession session = driver.rxSession();
58-
return Flowable.fromPublisher( session.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() )
59-
// It is okay to skip session.close() when publisher is completed successfully or cancelled
60-
.onErrorResumeNext( error -> {
61-
// We still rethrows the original error here. In a real application, you may want to handle the error directly here.
62-
return Flowable.<String>fromPublisher( session.close() ).concatWith( Flowable.error( error ) );
63-
} );
58+
return Flowable.using(
59+
driver::rxSession,
60+
session -> Flowable.fromPublisher( session.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() ),
61+
session -> Observable.fromPublisher(session.close()).subscribe()
62+
);
6463
}
6564
// end::RxJava-autocommit-transaction[]
6665
}

examples/src/main/java/org/neo4j/docs/driver/RxResultConsumeExample.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.reactivex.Flowable;
2222
// tag::rx-result-consume-import[]
23+
import io.reactivex.Observable;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
2526

@@ -58,15 +59,15 @@ public Flowable<String> getPeopleRxJava()
5859
String query = "MATCH (a:Person) RETURN a.name ORDER BY a.name";
5960
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
6061

61-
RxSession session = driver.rxSession();
62-
return Flowable.fromPublisher( session.readTransaction( tx -> {
63-
RxResult result = tx.run( query, parameters );
64-
return Flowable.fromPublisher( result.records() )
65-
.map( record -> record.get( 0 ).asString() );
66-
} ) ).onErrorResumeNext( error -> {
67-
// We rollback and rethrow the error. For a real application, you may want to handle the error directly here
68-
return Flowable.<String>fromPublisher( session.close() ).concatWith( Flowable.error( error ) );
69-
} );
62+
return Flowable.using(
63+
driver::rxSession,
64+
session -> session.readTransaction( tx -> {
65+
RxResult result = tx.run( query, parameters );
66+
return Flowable.fromPublisher( result.records() )
67+
.map( record -> record.get( 0 ).asString() );
68+
} ) ,
69+
session -> Observable.fromPublisher(session.close()).subscribe()
70+
);
7071
}
7172
// end::RxJava-result-consume[]
7273
}

examples/src/main/java/org/neo4j/docs/driver/RxTransactionFunctionExample.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.reactivex.Flowable;
2222
// tag::rx-transaction-function-import[]
23+
import io.reactivex.Observable;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
2526

@@ -60,15 +61,15 @@ public Flowable<ResultSummary> printAllProductsRxJava()
6061
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
6162
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
6263

63-
RxSession session = driver.rxSession();
64-
return Flowable.fromPublisher( session.readTransaction( tx -> {
64+
return Flowable.using(
65+
driver::rxSession,
66+
session -> session.readTransaction( tx -> {
6567
RxResult result = tx.run( query, parameters );
6668
return Flowable.fromPublisher( result.records() )
6769
.doOnNext( record -> System.out.println( record.get( 0 ).asString() ) ).ignoreElements().andThen( result.consume() );
68-
} ) ).onErrorResumeNext( error -> {
69-
// We rollback and rethrow the error. For a real application, you may want to handle the error directly here
70-
return Flowable.<ResultSummary>fromPublisher( session.close() ).concatWith( Flowable.error( error ) );
71-
} );
70+
} ) ,
71+
session -> Observable.fromPublisher(session.close()).subscribe()
72+
);
7273
}
7374
// end::RxJava-transaction-function[]
7475
}

examples/src/main/java/org/neo4j/docs/driver/RxUnmanagedTransactionExample.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
package org.neo4j.docs.driver;
2020
import io.reactivex.Flowable;
2121
// tag::reactor-unmanaged-transaction-import[]
22+
import io.reactivex.Observable;
23+
import org.reactivestreams.Publisher;
2224
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Mono;
2326

2427
import java.util.Collections;
2528
import java.util.Map;
2629

30+
import org.neo4j.driver.reactive.RxQueryRunner;
2731
import org.neo4j.driver.reactive.RxSession;
28-
import org.neo4j.driver.reactive.RxTransaction;
32+
2933
// tag::reactor-unmanaged-transaction-import[]
3034
public class RxUnmanagedTransactionExample extends BaseApplication
3135
{
@@ -35,15 +39,41 @@ public RxUnmanagedTransactionExample(String uri, String user, String password )
3539
}
3640

3741
// tag::reactor-unmanaged-transaction[]
42+
static class QueryRunnerAndCallbacks
43+
{
44+
final RxQueryRunner queryRunner;
45+
46+
final Publisher<Void> commit;
47+
final Publisher<Void> rollback;
48+
49+
QueryRunnerAndCallbacks( RxQueryRunner queryRunner, Publisher<Void> commit, Publisher<Void> rollback )
50+
{
51+
this.queryRunner = queryRunner;
52+
this.commit = commit;
53+
this.rollback = rollback;
54+
}
55+
}
56+
3857
public Flux<String> readSingleProduct()
3958
{
4059
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
4160
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
4261

43-
RxSession session = driver.rxSession();
44-
return Flux.usingWhen( session.beginTransaction(),
45-
tx -> Flux.from( tx.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() ),
46-
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null );
62+
// The additional holder is required to make both usingWhen constructs close
63+
// the resources in the correct order.
64+
Mono<QueryRunnerAndCallbacks> queryRunnerSupplier = Mono.using(
65+
driver::rxSession,
66+
session -> Mono.from( session.beginTransaction() ).map(tx -> new QueryRunnerAndCallbacks( tx, tx.commit(), tx.rollback() ) ),
67+
RxSession::close
68+
);
69+
70+
return Flux.usingWhen(
71+
queryRunnerSupplier,
72+
queryRunnerAndCallbacks -> Flux.from( queryRunnerAndCallbacks.queryRunner.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() ),
73+
queryRunnerAndCallbacks -> queryRunnerAndCallbacks.commit,
74+
(queryRunnerAndCallbacks, error) -> queryRunnerAndCallbacks.rollback,
75+
queryRunnerAndCallbacks -> queryRunnerAndCallbacks.rollback
76+
);
4777
}
4878
// end::reactor-unmanaged-transaction[]
4979

@@ -53,16 +83,20 @@ public Flowable<String> readSingleProductRxJava()
5383
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
5484
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );
5585

56-
RxSession session = driver.rxSession();
57-
return Flowable.fromPublisher( session.beginTransaction() )
58-
.flatMap( tx ->
59-
Flowable.fromPublisher( tx.run( query, parameters ).records() )
60-
.map( record -> record.get( 0 ).asString() )
61-
.concatWith( tx.commit() )
62-
.onErrorResumeNext( error -> {
63-
// We rollback and rethrow the error. For a real application, you may want to handle the error directly here
64-
return Flowable.<String>fromPublisher( tx.rollback() ).concatWith( Flowable.error( error ) );
65-
} ) );
86+
return Flowable.using(
87+
driver::rxSession,
88+
session -> Flowable.fromPublisher( session.beginTransaction() )
89+
.flatMap( tx ->
90+
Flowable.fromPublisher( tx.run( query, parameters ).records() )
91+
.map( record -> record.get( 0 ).asString() )
92+
.concatWith( tx.commit() )
93+
.onErrorResumeNext( error -> {
94+
// We rollback and rethrow the error. For a real application, you may want to handle the error directly here
95+
return Flowable.<String>fromPublisher( tx.rollback() ).concatWith( Flowable.error( error ) );
96+
} )
97+
),
98+
session -> Observable.fromPublisher(session.close()).onErrorResumeNext( Observable.empty() ).subscribe()
99+
);
66100
}
67101
// end::RxJava-unmanaged-transaction[]
68102
}

0 commit comments

Comments
 (0)