Skip to content

Commit 0ca83f8

Browse files
committed
Introduce new Reactive Streams session
This update introduces support for reactive session with Reactive Streams types. While it is similar to the deprecated `RxSession`, it includes the improvements introduced with the `ReactiveSession` that uses Flow API types. Sample session creation: ``` var session = driver.reactiveSession(org.neo4j.driver.reactivestreams.ReactiveSession.class); ```.
1 parent 93c7b95 commit 0ca83f8

File tree

65 files changed

+1718
-39
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1718
-39
lines changed

driver/clirr-ignored-differences.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,4 +397,16 @@
397397
<method>org.neo4j.driver.types.TypeSystem getDefault()</method>
398398
</difference>
399399

400+
<difference>
401+
<className>org/neo4j/driver/Driver</className>
402+
<differenceType>7012</differenceType>
403+
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class)</method>
404+
</difference>
405+
406+
<difference>
407+
<className>org/neo4j/driver/Driver</className>
408+
<differenceType>7012</differenceType>
409+
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class, org.neo4j.driver.SessionConfig)</method>
410+
</difference>
411+
400412
</differences>

driver/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
exports org.neo4j.driver;
2222
exports org.neo4j.driver.async;
2323
exports org.neo4j.driver.reactive;
24+
exports org.neo4j.driver.reactivestreams;
2425
exports org.neo4j.driver.types;
2526
exports org.neo4j.driver.summary;
2627
exports org.neo4j.driver.net;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver;
20+
21+
/**
22+
* A base interface for reactive sessions, used by {@link Driver#reactiveSession(Class)} and {@link Driver#reactiveSession(Class, SessionConfig)}.
23+
*/
24+
public interface BaseReactiveSession {}

driver/src/main/java/org/neo4j/driver/Driver.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ public interface Driver extends AutoCloseable {
7777
*
7878
* @return a new {@link Session} object.
7979
*/
80-
Session session();
80+
default Session session() {
81+
return session(SessionConfig.defaultConfig());
82+
}
8183

8284
/**
8385
* Create a new {@link Session} with a specified {@link SessionConfig session configuration}.
@@ -133,7 +135,41 @@ default ReactiveSession reactiveSession() {
133135
* @param sessionConfig used to customize the session.
134136
* @return a new {@link ReactiveSession} object.
135137
*/
136-
ReactiveSession reactiveSession(SessionConfig sessionConfig);
138+
default ReactiveSession reactiveSession(SessionConfig sessionConfig) {
139+
return reactiveSession(ReactiveSession.class, sessionConfig);
140+
}
141+
142+
/**
143+
* Create a new reactive session of supported type with default {@link SessionConfig session configuration}.
144+
* <p>
145+
* Supported types are:
146+
* <ul>
147+
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
148+
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
149+
* </ul>
150+
*
151+
* @param sessionClass session type class
152+
* @return session instance
153+
* @param <T> session type
154+
*/
155+
default <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass) {
156+
return reactiveSession(sessionClass, SessionConfig.defaultConfig());
157+
}
158+
159+
/**
160+
* Create a new reactive session of supported type with a specified {@link SessionConfig session configuration}.
161+
* <p>
162+
* Supported types are:
163+
* <ul>
164+
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
165+
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
166+
* </ul>
167+
*
168+
* @param sessionClass session type class
169+
* @return session instance
170+
* @param <T> session type
171+
*/
172+
<T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig);
137173

138174
/**
139175
* Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}. The {@link AsyncSession} provides an
@@ -143,7 +179,9 @@ default ReactiveSession reactiveSession() {
143179
*
144180
* @return a new {@link AsyncSession} object.
145181
*/
146-
AsyncSession asyncSession();
182+
default AsyncSession asyncSession() {
183+
return asyncSession(SessionConfig.defaultConfig());
184+
}
147185

148186
/**
149187
* Create a new {@link AsyncSession} with a specified {@link SessionConfig session configuration}.

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import static java.util.Objects.requireNonNull;
2122
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
2223

2324
import java.util.concurrent.CompletionStage;
2425
import java.util.concurrent.atomic.AtomicBoolean;
26+
import org.neo4j.driver.BaseReactiveSession;
2527
import org.neo4j.driver.Driver;
2628
import org.neo4j.driver.Logger;
2729
import org.neo4j.driver.Logging;
@@ -33,12 +35,10 @@
3335
import org.neo4j.driver.internal.async.NetworkSession;
3436
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
3537
import org.neo4j.driver.internal.metrics.MetricsProvider;
36-
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
3738
import org.neo4j.driver.internal.reactive.InternalRxSession;
3839
import org.neo4j.driver.internal.security.SecurityPlan;
3940
import org.neo4j.driver.internal.types.InternalTypeSystem;
4041
import org.neo4j.driver.internal.util.Futures;
41-
import org.neo4j.driver.reactive.ReactiveSession;
4242
import org.neo4j.driver.reactive.RxSession;
4343
import org.neo4j.driver.types.TypeSystem;
4444

@@ -61,11 +61,6 @@ public class InternalDriver implements Driver {
6161
this.log = logging.getLog(getClass());
6262
}
6363

64-
@Override
65-
public Session session() {
66-
return new InternalSession(newSession(SessionConfig.defaultConfig()));
67-
}
68-
6964
@Override
7065
public Session session(SessionConfig sessionConfig) {
7166
return new InternalSession(newSession(sessionConfig));
@@ -77,14 +72,19 @@ public RxSession rxSession(SessionConfig sessionConfig) {
7772
return new InternalRxSession(newSession(sessionConfig));
7873
}
7974

75+
@SuppressWarnings({"deprecation", "unchecked"})
8076
@Override
81-
public ReactiveSession reactiveSession(SessionConfig sessionConfig) {
82-
return new InternalReactiveSession(newSession(sessionConfig));
83-
}
84-
85-
@Override
86-
public AsyncSession asyncSession() {
87-
return new InternalAsyncSession(newSession(SessionConfig.defaultConfig()));
77+
public <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig) {
78+
requireNonNull(sessionClass, "sessionClass must not be null");
79+
requireNonNull(sessionClass, "sessionConfig must not be null");
80+
if (org.neo4j.driver.reactive.ReactiveSession.class.isAssignableFrom(sessionClass)) {
81+
return (T) new org.neo4j.driver.internal.reactive.InternalReactiveSession(newSession(sessionConfig));
82+
} else if (org.neo4j.driver.reactivestreams.ReactiveSession.class.isAssignableFrom(sessionClass)) {
83+
return (T) new org.neo4j.driver.internal.reactivestreams.InternalReactiveSession(newSession(sessionConfig));
84+
} else {
85+
throw new IllegalArgumentException(
86+
String.format("Unsupported session type '%s'", sessionClass.getCanonicalName()));
87+
}
8888
}
8989

9090
@Override

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.reactivestreams.Publisher;
3535
import reactor.core.publisher.Flux;
3636

37-
abstract class AbstractReactiveSession<S> {
37+
public abstract class AbstractReactiveSession<S> {
3838
protected final NetworkSession session;
3939

4040
public AbstractReactiveSession(NetworkSession session) {
@@ -45,15 +45,15 @@ public AbstractReactiveSession(NetworkSession session) {
4545
this.session = session;
4646
}
4747

48-
abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);
48+
protected abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);
4949

50-
abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
50+
protected abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
5151

5252
Publisher<S> doBeginTransaction(TransactionConfig config) {
5353
return doBeginTransaction(config, null);
5454
}
5555

56-
Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
56+
protected Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
5757
return createSingleItemPublisher(
5858
() -> {
5959
CompletableFuture<S> txFuture = new CompletableFuture<>();
@@ -87,7 +87,7 @@ Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
8787
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
8888
}
8989

90-
<T> Publisher<T> runTransaction(
90+
protected <T> Publisher<T> runTransaction(
9191
AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config) {
9292
Flux<T> repeatableWork = Flux.usingWhen(
9393
beginTransaction(mode, config),
@@ -119,7 +119,7 @@ public Set<Bookmark> lastBookmarks() {
119119
return session.lastBookmarks();
120120
}
121121

122-
<T> Publisher<T> doClose() {
122+
protected <T> Publisher<T> doClose() {
123123
return createEmptyPublisher(session::closeAsync);
124124
}
125125
}

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,30 @@
2424
import org.reactivestreams.Publisher;
2525
import reactor.core.publisher.Mono;
2626

27-
abstract class AbstractReactiveTransaction {
27+
public abstract class AbstractReactiveTransaction {
2828
protected final UnmanagedTransaction tx;
2929

3030
protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
3131
this.tx = tx;
3232
}
3333

34-
<T> Publisher<T> doCommit() {
34+
protected <T> Publisher<T> doCommit() {
3535
return createEmptyPublisher(tx::commitAsync);
3636
}
3737

38-
<T> Publisher<T> doRollback() {
38+
protected <T> Publisher<T> doRollback() {
3939
return createEmptyPublisher(tx::rollbackAsync);
4040
}
4141

42-
Publisher<Void> doClose() {
42+
protected Publisher<Void> doClose() {
4343
return close(false);
4444
}
4545

46-
Publisher<Boolean> doIsOpen() {
46+
protected Publisher<Boolean> doIsOpen() {
4747
return Mono.just(tx.isOpen());
4848
}
4949

50-
Publisher<Void> close(boolean commit) {
50+
public Publisher<Void> close(boolean commit) {
5151
return createEmptyPublisher(() -> tx.closeAsync(commit));
5252
}
5353
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ public InternalReactiveSession(NetworkSession session) {
4646
}
4747

4848
@Override
49-
ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
49+
protected ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
5050
return new InternalReactiveTransaction(unmanagedTransaction);
5151
}
5252

5353
@Override
54-
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
54+
protected org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
5555
return ((InternalReactiveTransaction) transaction).close(commit);
5656
}
5757

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public InternalRxSession(NetworkSession session) {
4343
}
4444

4545
@Override
46-
RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
46+
protected RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
4747
return new InternalRxTransaction(unmanagedTransaction);
4848
}
4949

5050
@Override
51-
Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
51+
protected Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
5252
return ((InternalRxTransaction) transaction).close(commit);
5353
}
5454

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.reactivestreams;
20+
21+
import java.util.Map;
22+
import org.neo4j.driver.Query;
23+
import org.neo4j.driver.Record;
24+
import org.neo4j.driver.Value;
25+
import org.neo4j.driver.Values;
26+
import org.neo4j.driver.internal.util.Extract;
27+
import org.neo4j.driver.internal.value.MapValue;
28+
import org.neo4j.driver.reactivestreams.ReactiveQueryRunner;
29+
import org.neo4j.driver.reactivestreams.ReactiveResult;
30+
import org.reactivestreams.Publisher;
31+
import reactor.core.publisher.Mono;
32+
33+
interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
34+
@Override
35+
default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
36+
try {
37+
Query query = new Query(queryStr, parameters);
38+
return run(query);
39+
} catch (Throwable t) {
40+
return Mono.error(t);
41+
}
42+
}
43+
44+
@Override
45+
default Publisher<ReactiveResult> run(String query, Map<String, Object> parameters) {
46+
return run(query, parameters(parameters));
47+
}
48+
49+
@Override
50+
default Publisher<ReactiveResult> run(String query, Record parameters) {
51+
return run(query, parameters(parameters));
52+
}
53+
54+
@Override
55+
default Publisher<ReactiveResult> run(String queryStr) {
56+
try {
57+
Query query = new Query(queryStr);
58+
return run(query);
59+
} catch (Throwable t) {
60+
return Mono.error(t);
61+
}
62+
}
63+
64+
static Value parameters(Record record) {
65+
return record == null ? Values.EmptyMap : parameters(record.asMap());
66+
}
67+
68+
static Value parameters(Map<String, Object> map) {
69+
if (map == null || map.isEmpty()) {
70+
return Values.EmptyMap;
71+
}
72+
return new MapValue(Extract.mapOfValues(map));
73+
}
74+
}

0 commit comments

Comments
 (0)