Skip to content

Commit 8b2d872

Browse files
authored
Introduce new reactive session with updated API (#1208)
This update introduces a new reactive session called `ReactiveSession` and deprecates the existing `RxSession`. The new session can be created via `reactiveSession` methods on `Driver`. The `ReactiveSession` brings reactive API in line with synchronous and asynchronous when it comes to getting an instance on result handle (`ReactiveResult`). Specifically, `run` methods return a `Publisher<ReactiveResult>` that will only emit `ReactiveResult` instance on successful Bolt message (`RUN`) exchange and will emit an error on failure. Furthermore, this change allowed simplification of `ReactiveResult.keys()` method that simply returns a list of strings now. This update also adds a new reactive backend implementation for Testkit and renames existing implementation to `reactive-legacy`.
1 parent 1f0176a commit 8b2d872

File tree

66 files changed

+2338
-421
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2338
-421
lines changed

driver/clirr-ignored-differences.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,28 @@
280280
<method>org.reactivestreams.Publisher isOpen()</method>
281281
</difference>
282282

283+
<difference>
284+
<className>org/neo4j/driver/Driver</className>
285+
<differenceType>7012</differenceType>
286+
<method>org.neo4j.driver.reactive.ReactiveSession reactiveSession()</method>
287+
</difference>
288+
289+
<difference>
290+
<className>org/neo4j/driver/Driver</className>
291+
<differenceType>7012</differenceType>
292+
<method>org.neo4j.driver.reactive.ReactiveSession reactiveSession(org.neo4j.driver.SessionConfig)</method>
293+
</difference>
294+
295+
<difference>
296+
<className>org/neo4j/driver/reactive/RxQueryRunner</className>
297+
<differenceType>7012</differenceType>
298+
<method>org.neo4j.driver.Value parameters(org.neo4j.driver.Record)</method>
299+
</difference>
300+
301+
<difference>
302+
<className>org/neo4j/driver/reactive/RxQueryRunner</className>
303+
<differenceType>7012</differenceType>
304+
<method>org.neo4j.driver.Value parameters(java.util.Map)</method>
305+
</difference>
306+
283307
</differences>

driver/src/main/java/org/neo4j/driver/Driver.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.neo4j.driver.async.AsyncSession;
2424
import org.neo4j.driver.exceptions.ClientException;
25+
import org.neo4j.driver.reactive.ReactiveSession;
2526
import org.neo4j.driver.reactive.RxSession;
2627
import org.neo4j.driver.types.TypeSystem;
2728
import org.neo4j.driver.util.Experimental;
@@ -90,27 +91,57 @@ public interface Driver extends AutoCloseable
9091
Session session( SessionConfig sessionConfig );
9192

9293
/**
93-
* Create a new general purpose {@link RxSession} with default {@link SessionConfig session configuration}.
94-
* The {@link RxSession} provides a reactive way to run queries and process results.
94+
* Create a new general purpose {@link RxSession} with default {@link SessionConfig session configuration}. The {@link RxSession} provides a reactive way to
95+
* run queries and process results.
9596
* <p>
9697
* Alias to {@link #rxSession(SessionConfig)}}.
9798
*
9899
* @return a new {@link RxSession} object.
100+
* @deprecated superseded by {@link #reactiveSession()}.
99101
*/
100-
RxSession rxSession();
102+
@Deprecated
103+
default RxSession rxSession()
104+
{
105+
return rxSession( SessionConfig.defaultConfig() );
106+
}
101107

102108
/**
103-
* Create a new {@link RxSession} with a specified {@link SessionConfig session configuration}.
104-
* Use {@link SessionConfig#forDatabase(String)} to obtain a general purpose session configuration for the specified database.
105-
* The {@link RxSession} provides a reactive way to run queries and process results.
109+
* Create a new {@link RxSession} with a specified {@link SessionConfig session configuration}. Use {@link SessionConfig#forDatabase(String)} to obtain a
110+
* general purpose session configuration for the specified database. The {@link RxSession} provides a reactive way to run queries and process results.
111+
*
106112
* @param sessionConfig used to customize the session.
107113
* @return a new {@link RxSession} object.
114+
* @deprecated superseded by {@link #reactiveSession(SessionConfig)}.
108115
*/
116+
@Deprecated
109117
RxSession rxSession( SessionConfig sessionConfig );
110118

111119
/**
112-
* Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}.
113-
* The {@link AsyncSession} provides an asynchronous way to run queries and process results.
120+
* Create a new general purpose {@link ReactiveSession} with default {@link SessionConfig session configuration}. The {@link ReactiveSession} provides a
121+
* reactive way to run queries and process results.
122+
* <p>
123+
* Alias to {@link #rxSession(SessionConfig)}}.
124+
*
125+
* @return a new {@link ReactiveSession} object.
126+
*/
127+
default ReactiveSession reactiveSession()
128+
{
129+
return reactiveSession( SessionConfig.defaultConfig() );
130+
}
131+
132+
/**
133+
* Create a new {@link ReactiveSession} with a specified {@link SessionConfig session configuration}. Use {@link SessionConfig#forDatabase(String)} to
134+
* obtain a general purpose session configuration for the specified database. The {@link ReactiveSession} provides a reactive way to run queries and process
135+
* results.
136+
*
137+
* @param sessionConfig used to customize the session.
138+
* @return a new {@link ReactiveSession} object.
139+
*/
140+
ReactiveSession reactiveSession( SessionConfig sessionConfig );
141+
142+
/**
143+
* Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}. The {@link AsyncSession} provides an
144+
* asynchronous way to run queries and process results.
114145
* <p>
115146
* Alias to {@link #asyncSession(SessionConfig)}}.
116147
*

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@
2525
import org.neo4j.driver.Logger;
2626
import org.neo4j.driver.Logging;
2727
import org.neo4j.driver.Metrics;
28-
import org.neo4j.driver.internal.metrics.MetricsProvider;
2928
import org.neo4j.driver.Session;
3029
import org.neo4j.driver.SessionConfig;
3130
import org.neo4j.driver.async.AsyncSession;
3231
import org.neo4j.driver.internal.async.InternalAsyncSession;
3332
import org.neo4j.driver.internal.async.NetworkSession;
3433
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
34+
import org.neo4j.driver.internal.metrics.MetricsProvider;
35+
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
3536
import org.neo4j.driver.internal.reactive.InternalRxSession;
3637
import org.neo4j.driver.internal.security.SecurityPlan;
3738
import org.neo4j.driver.internal.types.InternalTypeSystem;
3839
import org.neo4j.driver.internal.util.Futures;
40+
import org.neo4j.driver.reactive.ReactiveSession;
3941
import org.neo4j.driver.reactive.RxSession;
4042
import org.neo4j.driver.types.TypeSystem;
4143

@@ -71,15 +73,15 @@ public Session session( SessionConfig sessionConfig )
7173
}
7274

7375
@Override
74-
public RxSession rxSession()
76+
public RxSession rxSession( SessionConfig sessionConfig )
7577
{
76-
return new InternalRxSession( newSession( SessionConfig.defaultConfig() ) );
78+
return new InternalRxSession( newSession( sessionConfig ) );
7779
}
7880

7981
@Override
80-
public RxSession rxSession( SessionConfig sessionConfig )
82+
public ReactiveSession reactiveSession( SessionConfig sessionConfig )
8183
{
82-
return new InternalRxSession( newSession( sessionConfig ) );
84+
return new InternalReactiveSession( newSession( sessionConfig ) );
8385
}
8486

8587
@Override

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ public interface RxResultCursor extends Subscription, FailableCursor
3737
CompletionStage<ResultSummary> summaryAsync();
3838

3939
boolean isDone();
40+
41+
Throwable getRunError();
4042
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class RxResultCursorImpl implements RxResultCursor
4242
private final RunResponseHandler runHandler;
4343
private final PullResponseHandler pullHandler;
4444
private final Throwable runResponseError;
45+
private boolean runErrorSurfaced;
4546
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
4647
private boolean summaryFutureExposed;
4748
private boolean resultConsumed;
@@ -108,7 +109,7 @@ public CompletionStage<Throwable> discardAllFailureAsync()
108109
{
109110
// calling this method will enforce discarding record stream and finish running cypher query
110111
return summaryStage().thenApply( summary -> (Throwable) null )
111-
.exceptionally( throwable -> summaryFutureExposed ? null : throwable );
112+
.exceptionally( throwable -> runErrorSurfaced || summaryFutureExposed ? null : throwable );
112113
}
113114

114115
@Override
@@ -136,6 +137,13 @@ public boolean isDone()
136137
return summaryFuture.isDone();
137138
}
138139

140+
@Override
141+
public Throwable getRunError()
142+
{
143+
runErrorSurfaced = true;
144+
return runResponseError;
145+
}
146+
139147
public CompletionStage<ResultSummary> summaryStage()
140148
{
141149
if ( !isDone() && !resultConsumed ) // the summary is called before record streaming
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.reactive;
20+
21+
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Flux;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.function.Function;
26+
27+
import org.neo4j.driver.AccessMode;
28+
import org.neo4j.driver.Bookmark;
29+
import org.neo4j.driver.TransactionConfig;
30+
import org.neo4j.driver.exceptions.TransactionNestingException;
31+
import org.neo4j.driver.internal.async.NetworkSession;
32+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
33+
import org.neo4j.driver.internal.util.Futures;
34+
35+
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
36+
import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher;
37+
38+
abstract class AbstractReactiveSession<S>
39+
{
40+
protected final NetworkSession session;
41+
42+
public AbstractReactiveSession( NetworkSession session )
43+
{
44+
// RxSession accept a network session as input.
45+
// The network session different from async session that it provides ways to both run for Rx and Async
46+
// Note: Blocking result could just build on top of async result. However, Rx result cannot just build on top of async result.
47+
this.session = session;
48+
}
49+
50+
abstract S createTransaction( UnmanagedTransaction unmanagedTransaction );
51+
52+
abstract Publisher<Void> closeTransaction( S transaction, boolean commit );
53+
54+
public Publisher<S> beginTransaction( TransactionConfig config )
55+
{
56+
return createSingleItemPublisher(
57+
() ->
58+
{
59+
CompletableFuture<S> txFuture = new CompletableFuture<>();
60+
session.beginTransactionAsync( config ).whenComplete(
61+
( tx, completionError ) ->
62+
{
63+
if ( tx != null )
64+
{
65+
txFuture.complete( createTransaction( tx ) );
66+
}
67+
else
68+
{
69+
releaseConnectionBeforeReturning( txFuture, completionError );
70+
}
71+
} );
72+
return txFuture;
73+
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
74+
}
75+
76+
Publisher<S> beginTransaction( AccessMode mode, TransactionConfig config )
77+
{
78+
return createSingleItemPublisher(
79+
() ->
80+
{
81+
CompletableFuture<S> txFuture = new CompletableFuture<>();
82+
session.beginTransactionAsync( mode, config ).whenComplete(
83+
( tx, completionError ) ->
84+
{
85+
if ( tx != null )
86+
{
87+
txFuture.complete( createTransaction( tx ) );
88+
}
89+
else
90+
{
91+
releaseConnectionBeforeReturning( txFuture, completionError );
92+
}
93+
} );
94+
return txFuture;
95+
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
96+
}
97+
98+
<T> Publisher<T> runTransaction( AccessMode mode, Function<S,? extends Publisher<T>> work, TransactionConfig config )
99+
{
100+
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ),
101+
work,
102+
tx -> closeTransaction( tx, true ),
103+
( tx, error ) -> closeTransaction( tx, false ),
104+
( tx ) -> closeTransaction( tx, false ) );
105+
return session.retryLogic().retryRx( repeatableWork );
106+
}
107+
108+
private <T> void releaseConnectionBeforeReturning( CompletableFuture<T> returnFuture, Throwable completionError )
109+
{
110+
// We failed to create a result cursor, so we cannot rely on result cursor to clean-up resources.
111+
// Therefore, we will first release the connection that might have been created in the session and then notify the error.
112+
// The logic here shall be the same as `SessionPullResponseHandler#afterFailure`.
113+
// The reason we need to release connection in session is that we made `rxSession.close()` optional;
114+
// Otherwise, session.close shall handle everything for us.
115+
Throwable error = Futures.completionExceptionCause( completionError );
116+
if ( error instanceof TransactionNestingException )
117+
{
118+
returnFuture.completeExceptionally( error );
119+
}
120+
else
121+
{
122+
session.releaseConnectionAsync().whenComplete( ( ignored, closeError ) ->
123+
returnFuture.completeExceptionally( Futures.combineErrors( error, closeError ) ) );
124+
}
125+
}
126+
127+
public Bookmark lastBookmark()
128+
{
129+
return session.lastBookmark();
130+
}
131+
132+
public <T> Publisher<T> close()
133+
{
134+
return createEmptyPublisher( session::closeAsync );
135+
}
136+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.reactive;
20+
21+
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Mono;
23+
24+
import org.neo4j.driver.internal.async.UnmanagedTransaction;
25+
26+
import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
27+
28+
abstract class AbstractReactiveTransaction
29+
{
30+
protected final UnmanagedTransaction tx;
31+
32+
protected AbstractReactiveTransaction( UnmanagedTransaction tx )
33+
{
34+
this.tx = tx;
35+
}
36+
37+
public <T> Publisher<T> commit()
38+
{
39+
return createEmptyPublisher( tx::commitAsync );
40+
}
41+
42+
public <T> Publisher<T> rollback()
43+
{
44+
return createEmptyPublisher( tx::rollbackAsync );
45+
}
46+
47+
public Publisher<Void> close()
48+
{
49+
return close( false );
50+
}
51+
52+
public Publisher<Boolean> isOpen()
53+
{
54+
return Mono.just( tx.isOpen() );
55+
}
56+
57+
Publisher<Void> close( boolean commit )
58+
{
59+
return createEmptyPublisher( () -> tx.closeAsync( commit ) );
60+
}
61+
}

0 commit comments

Comments
 (0)