Skip to content

Commit eb1d341

Browse files
committed
Throw exception upon returning Result in managed transaction API
Returning `Result` (or `ResultCursor`, `ReactiveResult`, etc.) directly in the managed transaction API is an invalid use of the API and it should be avoided. https://neo4j.com/docs/java-manual/current/session-api/#java-driver-simple-result-consume > Any query results obtained within a transaction function should be consumed within that function, as connection-bound resources cannot be managed correctly when out of scope. To that end, transaction functions can return values but these should be derived values rather than raw results.
1 parent 7ae8b7a commit eb1d341

File tree

9 files changed

+241
-1
lines changed

9 files changed

+241
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.neo4j.driver.TransactionConfig;
3333
import org.neo4j.driver.TransactionWork;
3434
import org.neo4j.driver.async.ResultCursor;
35+
import org.neo4j.driver.exceptions.ClientException;
3536
import org.neo4j.driver.internal.async.NetworkSession;
3637
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3738
import org.neo4j.driver.internal.spi.Connection;
@@ -158,6 +159,11 @@ private <T> T transaction(
158159
try (Transaction tx = beginTransaction(mode, config)) {
159160

160161
T result = work.execute(tx);
162+
if (result instanceof Result) {
163+
throw new ClientException(String.format(
164+
"%s is not a valid return value, it should be consumed before producing a return value",
165+
Result.class.getName()));
166+
}
161167
if (tx.isOpen()) {
162168
// commit tx if a user has not explicitly committed or rolled back the transaction
163169
tx.commit();

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.neo4j.driver.async.AsyncTransactionCallback;
3737
import org.neo4j.driver.async.AsyncTransactionWork;
3838
import org.neo4j.driver.async.ResultCursor;
39+
import org.neo4j.driver.exceptions.ClientException;
3940
import org.neo4j.driver.internal.InternalBookmark;
4041
import org.neo4j.driver.internal.util.Futures;
4142

@@ -161,6 +162,11 @@ private <T> void executeWork(
161162
Throwable error = Futures.completionExceptionCause(completionError);
162163
if (error != null) {
163164
closeTxAfterFailedTransactionWork(tx, resultFuture, error);
165+
} else if (result instanceof ResultCursor) {
166+
error = new ClientException(String.format(
167+
"%s is not a valid return value, it should be consumed before producing a return value",
168+
ResultCursor.class.getName()));
169+
closeTxAfterFailedTransactionWork(tx, resultFuture, error);
164170
} else {
165171
closeTxAfterSucceededTransactionWork(tx, resultFuture, result);
166172
}

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
import org.neo4j.driver.AccessMode;
2828
import org.neo4j.driver.Bookmark;
2929
import org.neo4j.driver.TransactionConfig;
30+
import org.neo4j.driver.exceptions.ClientException;
3031
import org.neo4j.driver.exceptions.TransactionNestingException;
3132
import org.neo4j.driver.internal.async.NetworkSession;
3233
import org.neo4j.driver.internal.async.UnmanagedTransaction;
3334
import org.neo4j.driver.internal.util.Futures;
35+
import org.neo4j.driver.reactive.RxResult;
36+
import org.neo4j.driver.reactivestreams.ReactiveResult;
3437
import org.reactivestreams.Publisher;
3538
import reactor.core.publisher.Flux;
3639
import reactor.core.publisher.Mono;
@@ -90,8 +93,25 @@ private Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config)
9093
tx -> Mono.fromDirect(closeTransaction(tx, false)).subscribe());
9194
}
9295

96+
@SuppressWarnings("deprecation")
9397
protected <T> Publisher<T> runTransaction(
9498
AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config) {
99+
work = work.andThen(publisher -> Flux.from(publisher).map(value -> {
100+
if (value instanceof ReactiveResult) {
101+
throw new ClientException(String.format(
102+
"%s is not a valid return value, it should be consumed before producing a return value",
103+
ReactiveResult.class.getName()));
104+
} else if (value instanceof org.neo4j.driver.reactive.ReactiveResult) {
105+
throw new ClientException(String.format(
106+
"%s is not a valid return value, it should be consumed before producing a return value",
107+
org.neo4j.driver.reactive.ReactiveResult.class.getName()));
108+
} else if (value instanceof RxResult) {
109+
throw new ClientException(String.format(
110+
"%s is not a valid return value, it should be consumed before producing a return value",
111+
RxResult.class.getName()));
112+
}
113+
return value;
114+
}));
95115
Flux<T> repeatableWork = Flux.usingWhen(
96116
beginTransaction(mode, config),
97117
work,

driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ void shouldUseBookmarksForAutoCommitTransactionsAndTransactionFunctions() {
248248
Session session = driver.session();
249249
Bookmark initialBookmark = session.lastBookmark();
250250

251-
session.writeTransaction(tx -> tx.run("CREATE ()"));
251+
session.writeTransaction(tx -> tx.run("CREATE ()").consume());
252252
Bookmark bookmark1 = session.lastBookmark();
253253
assertNotNull(bookmark1);
254254
assertNotEquals(initialBookmark, bookmark1);

driver/src/test/java/org/neo4j/driver/integration/SessionIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,12 @@
6363
import java.util.concurrent.Future;
6464
import java.util.concurrent.TimeUnit;
6565
import java.util.concurrent.atomic.AtomicInteger;
66+
import java.util.function.Function;
6667
import org.junit.jupiter.api.AfterEach;
6768
import org.junit.jupiter.api.Test;
6869
import org.junit.jupiter.api.extension.RegisterExtension;
70+
import org.junit.jupiter.params.ParameterizedTest;
71+
import org.junit.jupiter.params.provider.MethodSource;
6972
import org.neo4j.driver.AccessMode;
7073
import org.neo4j.driver.AuthToken;
7174
import org.neo4j.driver.Config;
@@ -1190,6 +1193,29 @@ void shouldErrorDatabaseNameUsingTxWithRetriesWhenDatabaseIsAbsent() throws Thro
11901193
session.close();
11911194
}
11921195

1196+
@ParameterizedTest
1197+
@MethodSource("managedTransactionsReturningResult")
1198+
void shouldErrorWhenResultIsReturned(Function<Session, Result> fn) {
1199+
// GIVEN
1200+
var session = neo4j.driver().session();
1201+
1202+
// WHEN & THEN
1203+
var error = assertThrows(ClientException.class, () -> fn.apply(session));
1204+
assertEquals(
1205+
"org.neo4j.driver.Result is not a valid return value, it should be consumed before producing a return value",
1206+
error.getMessage());
1207+
session.close();
1208+
}
1209+
1210+
@SuppressWarnings("deprecation")
1211+
static List<Function<Session, Result>> managedTransactionsReturningResult() {
1212+
return List.of(
1213+
session -> session.writeTransaction(tx -> tx.run("RETURN 1")),
1214+
session -> session.readTransaction(tx -> tx.run("RETURN 1")),
1215+
session -> session.executeWrite(tx -> tx.run("RETURN 1")),
1216+
session -> session.executeRead(tx -> tx.run("RETURN 1")));
1217+
}
1218+
11931219
@SuppressWarnings("deprecation")
11941220
private void testExecuteReadTx(AccessMode sessionMode) {
11951221
Driver driver = neo4j.driver();

driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@
5656
import java.util.concurrent.CompletionStage;
5757
import java.util.concurrent.Future;
5858
import java.util.concurrent.atomic.AtomicInteger;
59+
import java.util.function.Function;
5960
import org.junit.jupiter.api.AfterEach;
6061
import org.junit.jupiter.api.BeforeEach;
6162
import org.junit.jupiter.api.Test;
6263
import org.junit.jupiter.api.extension.RegisterExtension;
64+
import org.junit.jupiter.params.ParameterizedTest;
65+
import org.junit.jupiter.params.provider.MethodSource;
6366
import org.neo4j.driver.Bookmark;
6467
import org.neo4j.driver.Query;
6568
import org.neo4j.driver.Record;
@@ -752,6 +755,25 @@ void shouldAllowReturningNullFromAsyncTransactionFunction() {
752755
assertNull(await(writeResult));
753756
}
754757

758+
@ParameterizedTest
759+
@MethodSource("managedTransactionsReturningResultCursorStage")
760+
void shouldErrorWhenResultCursorIsReturned(Function<AsyncSession, CompletionStage<ResultCursor>> fn) {
761+
var error = assertThrows(ClientException.class, () -> await(fn.apply(session)));
762+
assertEquals(
763+
"org.neo4j.driver.async.ResultCursor is not a valid return value, it should be consumed before producing a return value",
764+
error.getMessage());
765+
await(session.closeAsync());
766+
}
767+
768+
@SuppressWarnings("deprecation")
769+
static List<Function<AsyncSession, CompletionStage<ResultCursor>>> managedTransactionsReturningResultCursorStage() {
770+
return List.of(
771+
session -> session.writeTransactionAsync(tx -> tx.runAsync("RETURN 1")),
772+
session -> session.readTransactionAsync(tx -> tx.runAsync("RETURN 1")),
773+
session -> session.executeWriteAsync(tx -> tx.runAsync("RETURN 1")),
774+
session -> session.executeReadAsync(tx -> tx.runAsync("RETURN 1")));
775+
}
776+
755777
private Future<List<CompletionStage<Record>>> runNestedQueries(ResultCursor inputCursor) {
756778
CompletableFuture<List<CompletionStage<Record>>> resultFuture = new CompletableFuture<>();
757779
runNestedQueries(inputCursor, new ArrayList<>(), resultFuture);
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.integration.reactive;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
24+
25+
import java.util.List;
26+
import java.util.function.Function;
27+
import org.junit.jupiter.api.extension.RegisterExtension;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.MethodSource;
30+
import org.neo4j.driver.exceptions.ClientException;
31+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
32+
import org.neo4j.driver.reactive.ReactiveResult;
33+
import org.neo4j.driver.reactive.ReactiveSession;
34+
import org.neo4j.driver.testutil.DatabaseExtension;
35+
import org.neo4j.driver.testutil.ParallelizableIT;
36+
import org.reactivestreams.Publisher;
37+
import reactor.adapter.JdkFlowAdapter;
38+
import reactor.core.publisher.Flux;
39+
40+
@EnabledOnNeo4jWith(BOLT_V4)
41+
@ParallelizableIT
42+
class ReactiveSessionIT {
43+
@RegisterExtension
44+
static final DatabaseExtension neo4j = new DatabaseExtension();
45+
46+
@ParameterizedTest
47+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
48+
void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher<ReactiveResult>> fn) {
49+
// GIVEN
50+
var session = neo4j.driver().session(ReactiveSession.class);
51+
52+
// WHEN & THEN
53+
var error = assertThrows(
54+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
55+
assertEquals(
56+
"org.neo4j.driver.reactive.ReactiveResult is not a valid return value, it should be consumed before producing a return value",
57+
error.getMessage());
58+
JdkFlowAdapter.flowPublisherToFlux(session.close()).blockFirst();
59+
}
60+
61+
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
62+
managedTransactionsReturningReactiveResultPublisher() {
63+
return List.of(
64+
session -> JdkFlowAdapter.flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))),
65+
session -> JdkFlowAdapter.flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1"))));
66+
}
67+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.integration.reactive;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
23+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
24+
25+
import java.util.List;
26+
import java.util.function.Function;
27+
import org.junit.jupiter.api.extension.RegisterExtension;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.MethodSource;
30+
import org.neo4j.driver.exceptions.ClientException;
31+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
32+
import org.neo4j.driver.reactivestreams.ReactiveResult;
33+
import org.neo4j.driver.reactivestreams.ReactiveSession;
34+
import org.neo4j.driver.testutil.DatabaseExtension;
35+
import org.neo4j.driver.testutil.ParallelizableIT;
36+
import org.reactivestreams.Publisher;
37+
import reactor.core.publisher.Flux;
38+
39+
@EnabledOnNeo4jWith(BOLT_V4)
40+
@ParallelizableIT
41+
public class ReactiveStreamsSessionIT {
42+
@RegisterExtension
43+
static final DatabaseExtension neo4j = new DatabaseExtension();
44+
45+
@ParameterizedTest
46+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
47+
void shouldErrorWhenReactiveResultIsReturned(Function<ReactiveSession, Publisher<ReactiveResult>> fn) {
48+
// GIVEN
49+
var session = neo4j.driver().session(ReactiveSession.class);
50+
51+
// WHEN & THEN
52+
var error = assertThrows(
53+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
54+
assertEquals(
55+
"org.neo4j.driver.reactivestreams.ReactiveResult is not a valid return value, it should be consumed before producing a return value",
56+
error.getMessage());
57+
Flux.from(session.close()).blockFirst();
58+
}
59+
60+
static List<Function<ReactiveSession, Publisher<ReactiveResult>>>
61+
managedTransactionsReturningReactiveResultPublisher() {
62+
return List.of(
63+
session -> session.executeWrite(tx -> tx.run("RETURN 1")),
64+
session -> session.executeRead(tx -> tx.run("RETURN 1")));
65+
}
66+
}

driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424
import static org.hamcrest.CoreMatchers.startsWith;
2525
import static org.hamcrest.MatcherAssert.assertThat;
2626
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
2728
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
2829

2930
import java.util.Arrays;
3031
import java.util.Iterator;
32+
import java.util.List;
3133
import java.util.Set;
3234
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.function.Function;
3336
import org.junit.jupiter.api.Test;
3437
import org.junit.jupiter.api.extension.RegisterExtension;
38+
import org.junit.jupiter.params.ParameterizedTest;
39+
import org.junit.jupiter.params.provider.MethodSource;
3540
import org.neo4j.driver.Result;
3641
import org.neo4j.driver.Session;
3742
import org.neo4j.driver.exceptions.ClientException;
@@ -179,6 +184,28 @@ void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() {
179184
assertNoParallelScheduler();
180185
}
181186

187+
@ParameterizedTest
188+
@MethodSource("managedTransactionsReturningReactiveResultPublisher")
189+
void shouldErrorWhenReactiveResultIsReturned(Function<RxSession, Publisher<RxResult>> fn) {
190+
// GIVEN
191+
var session = neo4j.driver().rxSession();
192+
193+
// WHEN & THEN
194+
var error = assertThrows(
195+
ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst());
196+
assertEquals(
197+
"org.neo4j.driver.reactive.RxResult is not a valid return value, it should be consumed before producing a return value",
198+
error.getMessage());
199+
Flux.from(session.close()).blockFirst();
200+
}
201+
202+
@SuppressWarnings("deprecation")
203+
static List<Function<RxSession, Publisher<RxResult>>> managedTransactionsReturningReactiveResultPublisher() {
204+
return List.of(
205+
session -> session.writeTransaction(tx -> Flux.just(tx.run("RETURN 1"))),
206+
session -> session.readTransaction(tx -> Flux.just(tx.run("RETURN 1"))));
207+
}
208+
182209
private void assertNoParallelScheduler() {
183210
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
184211
for (Thread t : threadSet) {

0 commit comments

Comments
 (0)