Skip to content

Commit 1fc6a41

Browse files
authored
Introduce new Reactive Streams session (#1325)
This update introduces support for reactive session with Reactive Streams types. While it is similar to the deprecated `RxSession`, it includes the improvements introduced with the `ReactiveSession` that uses Flow API types. Sample session creation: ``` var session = driver.reactiveSession(org.neo4j.driver.reactivestreams.ReactiveSession.class); ```
1 parent 93c7b95 commit 1fc6a41

File tree

65 files changed

+1719
-39
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1719
-39
lines changed

driver/clirr-ignored-differences.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,4 +397,16 @@
397397
<method>org.neo4j.driver.types.TypeSystem getDefault()</method>
398398
</difference>
399399

400+
<difference>
401+
<className>org/neo4j/driver/Driver</className>
402+
<differenceType>7012</differenceType>
403+
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class)</method>
404+
</difference>
405+
406+
<difference>
407+
<className>org/neo4j/driver/Driver</className>
408+
<differenceType>7012</differenceType>
409+
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class, org.neo4j.driver.SessionConfig)</method>
410+
</difference>
411+
400412
</differences>

driver/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
exports org.neo4j.driver;
2222
exports org.neo4j.driver.async;
2323
exports org.neo4j.driver.reactive;
24+
exports org.neo4j.driver.reactivestreams;
2425
exports org.neo4j.driver.types;
2526
exports org.neo4j.driver.summary;
2627
exports org.neo4j.driver.net;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver;
20+
21+
/**
22+
* A base interface for reactive sessions, used by {@link Driver#reactiveSession(Class)} and {@link Driver#reactiveSession(Class, SessionConfig)}.
23+
*/
24+
public interface BaseReactiveSession {}

driver/src/main/java/org/neo4j/driver/Driver.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ public interface Driver extends AutoCloseable {
7777
*
7878
* @return a new {@link Session} object.
7979
*/
80-
Session session();
80+
default Session session() {
81+
return session(SessionConfig.defaultConfig());
82+
}
8183

8284
/**
8385
* Create a new {@link Session} with a specified {@link SessionConfig session configuration}.
@@ -133,7 +135,41 @@ default ReactiveSession reactiveSession() {
133135
* @param sessionConfig used to customize the session.
134136
* @return a new {@link ReactiveSession} object.
135137
*/
136-
ReactiveSession reactiveSession(SessionConfig sessionConfig);
138+
default ReactiveSession reactiveSession(SessionConfig sessionConfig) {
139+
return reactiveSession(ReactiveSession.class, sessionConfig);
140+
}
141+
142+
/**
143+
* Create a new reactive session of supported type with default {@link SessionConfig session configuration}.
144+
* <p>
145+
* Supported types are:
146+
* <ul>
147+
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
148+
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
149+
* </ul>
150+
*
151+
* @param sessionClass session type class
152+
* @return session instance
153+
* @param <T> session type
154+
*/
155+
default <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass) {
156+
return reactiveSession(sessionClass, SessionConfig.defaultConfig());
157+
}
158+
159+
/**
160+
* Create a new reactive session of supported type with a specified {@link SessionConfig session configuration}.
161+
* <p>
162+
* Supported types are:
163+
* <ul>
164+
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
165+
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
166+
* </ul>
167+
*
168+
* @param sessionClass session type class
169+
* @return session instance
170+
* @param <T> session type
171+
*/
172+
<T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig);
137173

138174
/**
139175
* Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}. The {@link AsyncSession} provides an
@@ -143,7 +179,9 @@ default ReactiveSession reactiveSession() {
143179
*
144180
* @return a new {@link AsyncSession} object.
145181
*/
146-
AsyncSession asyncSession();
182+
default AsyncSession asyncSession() {
183+
return asyncSession(SessionConfig.defaultConfig());
184+
}
147185

148186
/**
149187
* Create a new {@link AsyncSession} with a specified {@link SessionConfig session configuration}.

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import static java.util.Objects.requireNonNull;
2122
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
2223

2324
import java.util.concurrent.CompletionStage;
2425
import java.util.concurrent.atomic.AtomicBoolean;
26+
import org.neo4j.driver.BaseReactiveSession;
2527
import org.neo4j.driver.Driver;
2628
import org.neo4j.driver.Logger;
2729
import org.neo4j.driver.Logging;
@@ -33,12 +35,10 @@
3335
import org.neo4j.driver.internal.async.NetworkSession;
3436
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
3537
import org.neo4j.driver.internal.metrics.MetricsProvider;
36-
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
3738
import org.neo4j.driver.internal.reactive.InternalRxSession;
3839
import org.neo4j.driver.internal.security.SecurityPlan;
3940
import org.neo4j.driver.internal.types.InternalTypeSystem;
4041
import org.neo4j.driver.internal.util.Futures;
41-
import org.neo4j.driver.reactive.ReactiveSession;
4242
import org.neo4j.driver.reactive.RxSession;
4343
import org.neo4j.driver.types.TypeSystem;
4444

@@ -61,11 +61,6 @@ public class InternalDriver implements Driver {
6161
this.log = logging.getLog(getClass());
6262
}
6363

64-
@Override
65-
public Session session() {
66-
return new InternalSession(newSession(SessionConfig.defaultConfig()));
67-
}
68-
6964
@Override
7065
public Session session(SessionConfig sessionConfig) {
7166
return new InternalSession(newSession(sessionConfig));
@@ -77,14 +72,19 @@ public RxSession rxSession(SessionConfig sessionConfig) {
7772
return new InternalRxSession(newSession(sessionConfig));
7873
}
7974

75+
@SuppressWarnings({"deprecation", "unchecked"})
8076
@Override
81-
public ReactiveSession reactiveSession(SessionConfig sessionConfig) {
82-
return new InternalReactiveSession(newSession(sessionConfig));
83-
}
84-
85-
@Override
86-
public AsyncSession asyncSession() {
87-
return new InternalAsyncSession(newSession(SessionConfig.defaultConfig()));
77+
public <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig) {
78+
requireNonNull(sessionClass, "sessionClass must not be null");
79+
requireNonNull(sessionClass, "sessionConfig must not be null");
80+
if (org.neo4j.driver.reactive.ReactiveSession.class.isAssignableFrom(sessionClass)) {
81+
return (T) new org.neo4j.driver.internal.reactive.InternalReactiveSession(newSession(sessionConfig));
82+
} else if (org.neo4j.driver.reactivestreams.ReactiveSession.class.isAssignableFrom(sessionClass)) {
83+
return (T) new org.neo4j.driver.internal.reactivestreams.InternalReactiveSession(newSession(sessionConfig));
84+
} else {
85+
throw new IllegalArgumentException(
86+
String.format("Unsupported session type '%s'", sessionClass.getCanonicalName()));
87+
}
8888
}
8989

9090
@Override

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.reactivestreams.Publisher;
3535
import reactor.core.publisher.Flux;
3636

37-
abstract class AbstractReactiveSession<S> {
37+
public abstract class AbstractReactiveSession<S> {
3838
protected final NetworkSession session;
3939

4040
public AbstractReactiveSession(NetworkSession session) {
@@ -45,15 +45,15 @@ public AbstractReactiveSession(NetworkSession session) {
4545
this.session = session;
4646
}
4747

48-
abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);
48+
protected abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);
4949

50-
abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
50+
protected abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
5151

5252
Publisher<S> doBeginTransaction(TransactionConfig config) {
5353
return doBeginTransaction(config, null);
5454
}
5555

56-
Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
56+
protected Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
5757
return createSingleItemPublisher(
5858
() -> {
5959
CompletableFuture<S> txFuture = new CompletableFuture<>();
@@ -87,7 +87,7 @@ Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
8787
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
8888
}
8989

90-
<T> Publisher<T> runTransaction(
90+
protected <T> Publisher<T> runTransaction(
9191
AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config) {
9292
Flux<T> repeatableWork = Flux.usingWhen(
9393
beginTransaction(mode, config),
@@ -119,7 +119,7 @@ public Set<Bookmark> lastBookmarks() {
119119
return session.lastBookmarks();
120120
}
121121

122-
<T> Publisher<T> doClose() {
122+
protected <T> Publisher<T> doClose() {
123123
return createEmptyPublisher(session::closeAsync);
124124
}
125125
}

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,30 @@
2424
import org.reactivestreams.Publisher;
2525
import reactor.core.publisher.Mono;
2626

27-
abstract class AbstractReactiveTransaction {
27+
public abstract class AbstractReactiveTransaction {
2828
protected final UnmanagedTransaction tx;
2929

3030
protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
3131
this.tx = tx;
3232
}
3333

34-
<T> Publisher<T> doCommit() {
34+
protected <T> Publisher<T> doCommit() {
3535
return createEmptyPublisher(tx::commitAsync);
3636
}
3737

38-
<T> Publisher<T> doRollback() {
38+
protected <T> Publisher<T> doRollback() {
3939
return createEmptyPublisher(tx::rollbackAsync);
4040
}
4141

42-
Publisher<Void> doClose() {
42+
protected Publisher<Void> doClose() {
4343
return close(false);
4444
}
4545

46-
Publisher<Boolean> doIsOpen() {
46+
protected Publisher<Boolean> doIsOpen() {
4747
return Mono.just(tx.isOpen());
4848
}
4949

50-
Publisher<Void> close(boolean commit) {
50+
public Publisher<Void> close(boolean commit) {
5151
return createEmptyPublisher(() -> tx.closeAsync(commit));
5252
}
5353
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ public InternalReactiveSession(NetworkSession session) {
4646
}
4747

4848
@Override
49-
ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
49+
protected ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
5050
return new InternalReactiveTransaction(unmanagedTransaction);
5151
}
5252

5353
@Override
54-
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
54+
protected org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
5555
return ((InternalReactiveTransaction) transaction).close(commit);
5656
}
5757

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public InternalRxSession(NetworkSession session) {
4343
}
4444

4545
@Override
46-
RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
46+
protected RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
4747
return new InternalRxTransaction(unmanagedTransaction);
4848
}
4949

5050
@Override
51-
Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
51+
protected Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
5252
return ((InternalRxTransaction) transaction).close(commit);
5353
}
5454

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.reactivestreams;
20+
21+
import java.util.Map;
22+
import org.neo4j.driver.Query;
23+
import org.neo4j.driver.Record;
24+
import org.neo4j.driver.Value;
25+
import org.neo4j.driver.Values;
26+
import org.neo4j.driver.internal.util.Extract;
27+
import org.neo4j.driver.internal.value.MapValue;
28+
import org.neo4j.driver.reactivestreams.ReactiveQueryRunner;
29+
import org.neo4j.driver.reactivestreams.ReactiveResult;
30+
import org.reactivestreams.Publisher;
31+
import reactor.core.publisher.Mono;
32+
33+
interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
34+
@Override
35+
default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
36+
try {
37+
Query query = new Query(queryStr, parameters);
38+
return run(query);
39+
} catch (Throwable t) {
40+
return Mono.error(t);
41+
}
42+
}
43+
44+
@Override
45+
default Publisher<ReactiveResult> run(String query, Map<String, Object> parameters) {
46+
return run(query, parameters(parameters));
47+
}
48+
49+
@Override
50+
default Publisher<ReactiveResult> run(String query, Record parameters) {
51+
return run(query, parameters(parameters));
52+
}
53+
54+
@Override
55+
default Publisher<ReactiveResult> run(String queryStr) {
56+
try {
57+
Query query = new Query(queryStr);
58+
return run(query);
59+
} catch (Throwable t) {
60+
return Mono.error(t);
61+
}
62+
}
63+
64+
static Value parameters(Record record) {
65+
return record == null ? Values.EmptyMap : parameters(record.asMap());
66+
}
67+
68+
static Value parameters(Map<String, Object> map) {
69+
if (map == null || map.isEmpty()) {
70+
return Values.EmptyMap;
71+
}
72+
return new MapValue(Extract.mapOfValues(map));
73+
}
74+
}

0 commit comments

Comments
 (0)