25
25
import org .neo4j .driver .Record ;
26
26
import org .neo4j .driver .SessionConfig ;
27
27
import org .neo4j .driver .exceptions .Neo4jException ;
28
- import org .neo4j .driver .reactive .RxQueryRunner ;
29
- import org .neo4j .driver .reactive .RxSession ;
28
+ import org .neo4j .driver .reactivestreams .ReactiveQueryRunner ;
29
+ import org .neo4j .driver .reactivestreams .ReactiveResult ;
30
+ import org .neo4j .driver .reactivestreams .ReactiveSession ;
30
31
import org .neo4j .driver .summary .ResultSummary ;
31
32
import org .neo4j .http .config .ApplicationProperties ;
32
33
import org .reactivestreams .Publisher ;
@@ -60,15 +61,13 @@ class DefaultNeo4jAdapter implements Neo4jAdapter {
60
61
}
61
62
62
63
@ Override
63
- @ SuppressWarnings ({"deprecation" , "RedundantSuppression" })
64
64
public Flux <Record > stream (Neo4jPrincipal principal , String database , Query query ) {
65
65
66
66
return queryEvaluator .getExecutionRequirements (principal , query .text ())
67
- .flatMapMany (requirements -> this .execute0 (principal , database , requirements , q -> Flux . from (q .run (query ). records () )));
67
+ .flatMapMany (requirements -> this .execute0 (principal , database , requirements , q -> Mono . fromDirect (q .run (query )). flatMapMany ( ReactiveResult :: records )));
68
68
}
69
69
70
70
@ Override
71
- @ SuppressWarnings ({"deprecation" , "RedundantSuppression" })
72
71
public Mono <ResultContainer > run (Neo4jPrincipal principal , String database , AnnotatedQuery query , AnnotatedQuery ... additionalQueries ) {
73
72
74
73
Flux <AnnotatedQuery > queries = Flux .just (query );
@@ -83,9 +82,10 @@ record ResultAndSummary(EagerResult result, ResultSummary summary) {
83
82
.flatMap (q -> Mono .fromDirect (this .execute0 (principal , database , q .getT2 (), runner -> {
84
83
var annotatedQuery = q .getT1 ();
85
84
var rxResult = runner .run (annotatedQuery .value ());
86
- return Mono .fromDirect (rxResult .keys ())
87
- .zipWith (Flux .from (rxResult .records ()).collectList ())
88
- .flatMap (v -> Mono .just (v ).zipWith (Mono .fromDirect (rxResult .consume ()), (t , s ) -> Tuples .of (t .getT1 (), t .getT2 (), s )))
85
+ return Mono .fromDirect (rxResult )
86
+ .flatMap (reactiveResult -> Mono .just (reactiveResult .keys ())
87
+ .zipWith (Flux .from (reactiveResult .records ()).collectList ())
88
+ .flatMap (v -> Mono .just (v ).zipWith (Mono .fromDirect (reactiveResult .consume ()), (t , s ) -> Tuples .of (t .getT1 (), t .getT2 (), s ))))
89
89
.map (content -> new ResultAndSummary (EagerResult .success (content , annotatedQuery .includeStats (), annotatedQuery .resultDataContents (), driver .defaultTypeSystem ()), content .getT3 ()));
90
90
}))).onErrorResume (Neo4jException .class , e -> Mono .just (new ResultAndSummary (EagerResult .error (e ), null )))
91
91
)
@@ -99,8 +99,7 @@ record ResultAndSummary(EagerResult result, ResultSummary summary) {
99
99
});
100
100
}
101
101
102
- @ SuppressWarnings ("deprecation" )
103
- <T > Publisher <T > execute0 (Neo4jPrincipal principal , String database , QueryEvaluator .ExecutionRequirements requirements , Function <RxQueryRunner , Publisher <T >> query ) {
102
+ <T > Publisher <T > execute0 (Neo4jPrincipal principal , String database , QueryEvaluator .ExecutionRequirements requirements , Function <ReactiveQueryRunner , Publisher <T >> query ) {
104
103
105
104
var sessionSupplier = queryEvaluator .isEnterpriseEdition ().
106
105
flatMap (v -> {
@@ -110,23 +109,23 @@ <T> Publisher<T> execute0(Neo4jPrincipal principal, String database, QueryEvalua
110
109
.withDatabase (database )
111
110
.withDefaultAccessMode (requirements .target () == QueryEvaluator .Target .WRITERS ? AccessMode .WRITE : AccessMode .READ )
112
111
.build ();
113
- return Mono .fromCallable (() -> driver .rxSession ( sessionConfig ));
112
+ return Mono .fromCallable (() -> driver .reactiveSession ( ReactiveSession . class , sessionConfig ));
114
113
});
115
114
116
115
Flux <T > flow ;
117
116
if (requirements .transactionMode () == QueryEvaluator .TransactionMode .IMPLICIT ) {
118
- flow = Flux .usingWhen (sessionSupplier , query , RxSession ::close );
117
+ flow = Flux .usingWhen (sessionSupplier , query , ReactiveSession ::close );
119
118
} else {
120
119
flow = switch (requirements .target ()) {
121
120
case WRITERS -> Flux .usingWhen (
122
121
sessionSupplier ,
123
- session -> session .writeTransaction (query ::apply ),
124
- RxSession ::close
122
+ session -> session .executeWrite (query ::apply ),
123
+ ReactiveSession ::close
125
124
);
126
125
case READERS -> Flux .usingWhen (
127
126
sessionSupplier ,
128
- session -> session .readTransaction (query ::apply ),
129
- RxSession ::close
127
+ session -> session .executeRead (query ::apply ),
128
+ ReactiveSession ::close
130
129
);
131
130
};
132
131
}
0 commit comments