Skip to content

Commit bdb7d34

Browse files
committed
Throw exception upon returning result in managed transaction API
Returning result directly in the managed transaction API is an invalid use of the API and it should be avoided. https://neo4j.com/docs/java-manual/current/session-api/#java-driver-simple-result-consume > Any query results obtained within a transaction function should be consumed within that function, as connection-bound resources cannot be managed correctly when out of scope. To that end, transaction functions can return values but these should be derived values rather than raw results.
1 parent 7ae8b7a commit bdb7d34

File tree

8 files changed

+240
-0
lines changed

8 files changed

+240
-0
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.neo4j.driver.TransactionConfig;
3333
import org.neo4j.driver.TransactionWork;
3434
import org.neo4j.driver.async.ResultCursor;
35+
import org.neo4j.driver.exceptions.ClientException;
3536
import org.neo4j.driver.internal.async.NetworkSession;
3637
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3738
import org.neo4j.driver.internal.spi.Connection;
@@ -158,6 +159,11 @@ private <T> T transaction(
158159
try (Transaction tx = beginTransaction(mode, config)) {
159160

160161
T result = work.execute(tx);
162+
if (result instanceof Result) {
163+
throw new ClientException(String.format(
164+
"%s is not a valid return value, it should be consumed before producing a return value",
165+
Result.class.getName()));
166+
}
161167
if (tx.isOpen()) {
162168
// commit tx if a user has not explicitly committed or rolled back the transaction
163169
tx.commit();

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.neo4j.driver.async.AsyncTransactionCallback;
3737
import org.neo4j.driver.async.AsyncTransactionWork;
3838
import org.neo4j.driver.async.ResultCursor;
39+
import org.neo4j.driver.exceptions.ClientException;
3940
import org.neo4j.driver.internal.InternalBookmark;
4041
import org.neo4j.driver.internal.util.Futures;
4142

@@ -161,6 +162,11 @@ private <T> void executeWork(
161162
Throwable error = Futures.completionExceptionCause(completionError);
162163
if (error != null) {
163164
closeTxAfterFailedTransactionWork(tx, resultFuture, error);
165+
} else if (result instanceof ResultCursor) {
166+
error = new ClientException(String.format(
167+
"%s is not a valid return value, it should be consumed before producing a return value",
168+
ResultCursor.class.getName()));
169+
closeTxAfterFailedTransactionWork(tx, resultFuture, error);
164170
} else {
165171
closeTxAfterSucceededTransactionWork(tx, resultFuture, result);
166172
}

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
import org.neo4j.driver.AccessMode;
2828
import org.neo4j.driver.Bookmark;
2929
import org.neo4j.driver.TransactionConfig;
30+
import org.neo4j.driver.exceptions.ClientException;
3031
import org.neo4j.driver.exceptions.TransactionNestingException;
3132
import org.neo4j.driver.internal.async.NetworkSession;
3233
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3334
import org.neo4j.driver.internal.util.Futures;
35+
import org.neo4j.driver.reactive.RxResult;
36+
import org.neo4j.driver.reactivestreams.ReactiveResult;
3437
import org.reactivestreams.Publisher;
3538
import reactor.core.publisher.Flux;
3639
import reactor.core.publisher.Mono;
@@ -90,8 +93,25 @@ private Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config)
9093
tx -> Mono.fromDirect(closeTransaction(tx, false)).subscribe());
9194
}
9295

96+
@SuppressWarnings("deprecation")
9397
protected <T> Publisher<T> runTransaction(
9498
AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config) {
99+
work = work.andThen(publisher -> Flux.from(publisher).map(value -> {
100+
if (value instanceof ReactiveResult) {
101+
throw new ClientException(String.format(
102+
"%s is not a valid return value, it should be consumed before producing a return value",
103+
ReactiveResult.class.getName()));
104+
} else if (value instanceof org.neo4j.driver.reactive.ReactiveResult) {
105+
throw new ClientException(String.format(
106+
"%s is not a valid return value, it should be consumed before producing a return value",
107+
org.neo4j.driver.reactive.ReactiveResult.class.getName()));
108+
} else if (value instanceof RxResult) {
109+
throw new ClientException(String.format(
110+
"%s is not a valid return value, it should be consumed before producing a return value",
111+
RxResult.class.getName()));
112+
}
113+
return value;
114+
}));
95115
Flux<T> repeatableWork = Flux.usingWhen(
96116
beginTransaction(mode, config),
97117
work,

driver/src/test/java/org/neo4j/driver/integration/SessionIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,12 @@
6363
import java.util.concurrent.Future;
6464
import java.util.concurrent.TimeUnit;
6565
import java.util.concurrent.atomic.AtomicInteger;
66+
import java.util.function.Function;
6667
import org.junit.jupiter.api.AfterEach;
6768
import org.junit.jupiter.api.Test;
6869
import org.junit.jupiter.api.extension.RegisterExtension;
70+
import org.junit.jupiter.params.ParameterizedTest;
71+
import org.junit.jupiter.params.provider.MethodSource;
6972
import org.neo4j.driver.AccessMode;
7073
import org.neo4j.driver.AuthToken;
7174
import org.neo4j.driver.Config;
@@ -1190,6 +1193,29 @@ void shouldErrorDatabaseNameUsingTxWithRetriesWhenDatabaseIsAbsent() throws Thro
11901193
session.close();
11911194
}
11921195

1196+
@ParameterizedTest
1197+
@MethodSource("managedTransactionsReturningResult")
1198+
void shouldErrorWhenResultIsReturned(Function<Session, Result> fn) {
1199+
// GIVEN
1200+
var session = neo4j.driver().session();
1201+
1202+
// WHEN & THEN
1203+
var error = assertThrows(ClientException.class, () -> fn.apply(session));
1204+
assertEquals(
1205+
"org.neo4j.driver.Result is not a valid return value, it should be consumed before producing a return value",
1206+
error.getMessage());
1207+
session.close();
1208+
}
1209+
1210+
@SuppressWarnings("deprecation")
1211+
static List<Function<Session, Result>> managedTransactionsReturningResult() {
1212+
return List.of(
1213+
session -> session.writeTransaction(tx -> tx.run("RETURN 1")),
1214+
session -> session.readTransaction(tx -> tx.run("RETURN 1")),
1215+
session -> session.executeWrite(tx -> tx.run("RETURN 1")),
1216+
session -> session.executeRead(tx -> tx.run("RETURN 1")));
1217+
}
1218+
11931219
@SuppressWarnings("deprecation")
11941220
private void testExecuteReadTx(AccessMode sessionMode) {
11951221
Driver driver = neo4j.driver();

driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@
5656
import java.util.concurrent.CompletionStage;
5757
import java.util.concurrent.Future;
5858
import java.util.concurrent.atomic.AtomicInteger;
59+
import java.util.function.Function;
5960
import org.junit.jupiter.api.AfterEach;
6061
import org.junit.jupiter.api.BeforeEach;
6162
import org.junit.jupiter.api.Test;
6263
import org.junit.jupiter.api.extension.RegisterExtension;
64+
import org.junit.jupiter.params.ParameterizedTest;
65+
import org.junit.jupiter.params.provider.MethodSource;
6366
import org.neo4j.driver.Bookmark;
6467
import org.neo4j.driver.Query;
6568
import org.neo4j.driver.Record;
@@ -752,6 +755,25 @@ void shouldAllowReturningNullFromAsyncTransactionFunction() {
752755
assertNull(await(writeResult));
753756
}
754757

758+
@ParameterizedTest
759+
@MethodSource("managedTransactionsReturningResultCursorStage")
760+
void shouldErrorWhenResultCursorIsReturned(Function<AsyncSession, CompletionStage<ResultCursor>> fn) {
761+
var error = assertThrows(ClientException.class, () -> await(fn.apply(session)));
762+
assertEquals(
763+
"org.neo4j.driver.async.ResultCursor is not a valid return value, it should be consumed before producing a return value",
764+
error.getMessage());
765+
await(session.closeAsync());
766+
}
767+
768+
@SuppressWarnings("deprecation")
769+
static List<Function<AsyncSession, CompletionStage<ResultCursor>>> managedTransactionsReturningResultCursorStage() {
770+
return List.of(
771+
session -> session.writeTransactionAsync(tx -> tx.runAsync("RETURN 1")),
772+
session -> session.readTransactionAsync(tx -> tx.runAsync("RETURN 1")),
773+
session -> session.executeWriteAsync(tx -> tx.runAsync("RETURN 1")),
774+
session -> session.executeReadAsync(tx -> tx.runAsync("RETURN 1")));
775+
}
776+
755777
private Future<List<CompletionStage<Record>>> runNestedQueries(ResultCursor inputCursor) {
756778
CompletableFuture<List<CompletionStage<Record>>> resultFuture = new CompletableFuture<>();
757779
runNestedQueries(inputCursor, new ArrayList<>(), resultFuture);
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.integration.reactive;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
24+
25+
import java.util.List;
26+
import java.util.function.Function;
27+
import org.junit.jupiter.api.extension.RegisterExtension;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.MethodSource;
30+
import org.neo4j.driver.exceptions.ClientException;
31+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
32+
import org.neo4j.driver.reactive.ReactiveResult;
33+
import org.neo4j.driver.reactive.ReactiveSession;
34+
import org.neo4j.driver.testutil.DatabaseExtension;
35+
import org.neo4j.driver.testutil.ParallelizableIT;
36+
import org.reactivestreams.Publisher;
37+
import reactor.adapter.JdkFlowAdapter;
38+
import reactor.core.publisher.Flux;
39+
40+
@EnabledOnNeo4jWith(BOLT_V4)
41+
@ParallelizableIT
42+
class ReactiveSessionIT {
43+
@RegisterExtension
44+
static final DatabaseExtension neo4j = new DatabaseExtension();
45+
46+
@ParameterizedTest
47+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
48+
void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher<ReactiveResult>> fn) {
49+
// GIVEN
50+
var session = neo4j.driver().session(ReactiveSession.class);
51+
52+
// WHEN & THEN
53+
var error = assertThrows(
54+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
55+
assertEquals(
56+
"org.neo4j.driver.reactive.ReactiveResult is not a valid return value, it should be consumed before producing a return value",
57+
error.getMessage());
58+
JdkFlowAdapter.flowPublisherToFlux(session.close()).blockFirst();
59+
}
60+
61+
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
62+
managedTransactionsReturningReactiveResultPublisher() {
63+
return List.of(
64+
session -> JdkFlowAdapter.flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))),
65+
session -> JdkFlowAdapter.flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1"))));
66+
}
67+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.integration.reactive;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
24+
25+
import java.util.List;
26+
import java.util.function.Function;
27+
import org.junit.jupiter.api.extension.RegisterExtension;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.MethodSource;
30+
import org.neo4j.driver.exceptions.ClientException;
31+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
32+
import org.neo4j.driver.reactivestreams.ReactiveResult;
33+
import org.neo4j.driver.reactivestreams.ReactiveSession;
34+
import org.neo4j.driver.testutil.DatabaseExtension;
35+
import org.neo4j.driver.testutil.ParallelizableIT;
36+
import org.reactivestreams.Publisher;
37+
import reactor.core.publisher.Flux;
38+
39+
@EnabledOnNeo4jWith(BOLT_V4)
40+
@ParallelizableIT
41+
public class ReactiveStreamsSessionIT {
42+
@RegisterExtension
43+
static final DatabaseExtension neo4j = new DatabaseExtension();
44+
45+
@ParameterizedTest
46+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
47+
void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher<ReactiveResult>> fn) {
48+
// GIVEN
49+
var session = neo4j.driver().session(ReactiveSession.class);
50+
51+
// WHEN & THEN
52+
var error = assertThrows(
53+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
54+
assertEquals(
55+
"org.neo4j.driver.reactivestreams.ReactiveResult is not a valid return value, it should be consumed before producing a return value",
56+
error.getMessage());
57+
Flux.from(session.close()).blockFirst();
58+
}
59+
60+
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
61+
managedTransactionsReturningReactiveResultPublisher() {
62+
return List.of(
63+
session -> session.executeWrite(tx -> tx.run("RETURN 1")),
64+
session -> session.executeRead(tx -> tx.run("RETURN 1")));
65+
}
66+
}

driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424
import static org.hamcrest.CoreMatchers.startsWith;
2525
import static org.hamcrest.MatcherAssert.assertThat;
2626
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
2728
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
2829

2930
import java.util.Arrays;
3031
import java.util.Iterator;
32+
import java.util.List;
3133
import java.util.Set;
3234
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.function.Function;
3336
import org.junit.jupiter.api.Test;
3437
import org.junit.jupiter.api.extension.RegisterExtension;
38+
import org.junit.jupiter.params.ParameterizedTest;
39+
import org.junit.jupiter.params.provider.MethodSource;
3540
import org.neo4j.driver.Result;
3641
import org.neo4j.driver.Session;
3742
import org.neo4j.driver.exceptions.ClientException;
@@ -179,6 +184,28 @@ void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() {
179184
assertNoParallelScheduler();
180185
}
181186

187+
@ParameterizedTest
188+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
189+
void shouldErrorWhenReactiveResultIsReturned(Function<RxSession, Publisher<RxResult>> fn) {
190+
// GIVEN
191+
var session = neo4j.driver().rxSession();
192+
193+
// WHEN & THEN
194+
var error = assertThrows(
195+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
196+
assertEquals(
197+
"org.neo4j.driver.reactive.RxResult is not a valid return value, it should be consumed before producing a return value",
198+
error.getMessage());
199+
Flux.from(session.close()).blockFirst();
200+
}
201+
202+
@SuppressWarnings("deprecation")
203+
static List<Function<RxSession, Publisher<RxResult>>> managedTransactionsReturningReactiveResultPublisher() {
204+
return List.of(
205+
session -> session.writeTransaction(tx -> Flux.just(tx.run("RETURN 1"))),
206+
session -> session.readTransaction(tx -> Flux.just(tx.run("RETURN 1"))));
207+
}
208+
182209
private void assertNoParallelScheduler() {
183210
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
184211
for (Thread t : threadSet) {

0 commit comments

Comments
 (0)