19
19
package org .neo4j .docs .driver ;
20
20
21
21
import io .reactivex .Flowable ;
22
- import org .reactivestreams .Publisher ;
23
22
import reactor .core .publisher .Flux ;
24
- import reactor .core .publisher .Mono ;
25
23
26
24
import java .util .Collections ;
27
25
import java .util .Map ;
@@ -36,21 +34,23 @@ public RxExplicitTransactionExample( String uri, String user, String password )
36
34
super ( uri , user , password );
37
35
}
38
36
39
- public Publisher <String > readSingleProductReactor ()
37
+ public Flux <String > readSingleProductReactor ()
40
38
{
41
39
// tag::reactor-explicit-transaction[]
42
40
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title" ;
43
41
Map <String ,Object > parameters = Collections .singletonMap ( "id" , 0 );
44
42
45
43
RxSession session = driver .rxSession ();
46
- return Mono .usingWhen ( session .beginTransaction (),
47
- tx -> Flux .from ( tx .run ( query , parameters ).records () ).single ().map ( record -> record .get ( 0 ).asString () ),
44
+ // It is recommended to use Flux.usingWhen for explicit transactions and Flux.using for autocommit transactions (session).
45
+ // This is because an explicit transaction needs to be supplied via a another resource publisher session.beginTransaction.
46
+ return Flux .usingWhen ( session .beginTransaction (),
47
+ tx -> Flux .from ( tx .run ( query , parameters ).records () ).map ( record -> record .get ( 0 ).asString () ),
48
48
RxTransaction ::commit ,
49
49
RxTransaction ::rollback );
50
50
// end::reactor-explicit-transaction[]
51
51
}
52
52
53
- public Publisher <String > readSingleProductRxJava ()
53
+ public Flowable <String > readSingleProductRxJava ()
54
54
{
55
55
// tag::RxJava-explicit-transaction[]
56
56
String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title" ;
@@ -59,7 +59,8 @@ public Publisher<String> readSingleProductRxJava()
59
59
RxSession session = driver .rxSession ();
60
60
return Flowable .fromPublisher ( session .beginTransaction () )
61
61
.flatMap ( tx -> Flowable .fromPublisher ( tx .run ( query , parameters ).records () ).map ( record -> record .get ( 0 ).asString () )
62
- .doOnComplete ( tx ::commit ).doOnError ( error -> tx .rollback () ) );
62
+ .doOnComplete ( tx ::commit )
63
+ .doOnError ( error -> tx .rollback () ) );
63
64
// end::RxJava-explicit-transaction[]
64
65
}
65
66
}
0 commit comments