diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index bc986cce90..a06e9a1617 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -32,6 +32,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.spi.Connection; @@ -158,6 +159,11 @@ private T transaction( try (Transaction tx = beginTransaction(mode, config)) { T result = work.execute(tx); + if (result instanceof Result) { + throw new ClientException(String.format( + "%s is not a valid return value, it should be consumed before producing a return value", + Result.class.getName())); + } if (tx.isOpen()) { // commit tx if a user has not explicitly committed or rolled back the transaction tx.commit(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index adad847fad..8220fd272c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -36,6 +36,7 @@ import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.util.Futures; @@ -161,6 +162,11 @@ private void executeWork( Throwable error = Futures.completionExceptionCause(completionError); if (error != null) { closeTxAfterFailedTransactionWork(tx, resultFuture, error); + } else if (result instanceof ResultCursor) { + error = new ClientException(String.format( + "%s is not a valid return value, it should be consumed before producing a return value", + ResultCursor.class.getName())); + closeTxAfterFailedTransactionWork(tx, resultFuture, error); } else { closeTxAfterSucceededTransactionWork(tx, resultFuture, result); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java index 557d1c2c10..8d3429abdf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java @@ -27,10 +27,13 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TransactionNestingException; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactivestreams.ReactiveResult; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -90,8 +93,25 @@ private Publisher beginTransaction(AccessMode mode, TransactionConfig config) tx -> Mono.fromDirect(closeTransaction(tx, false)).subscribe()); } + @SuppressWarnings("deprecation") protected Publisher runTransaction( AccessMode mode, Function> work, TransactionConfig config) { + work = work.andThen(publisher -> Flux.from(publisher).map(value -> { + if (value instanceof ReactiveResult) { + throw new ClientException(String.format( + "%s is not a valid return value, it should be consumed before producing a return value", + ReactiveResult.class.getName())); + } else if (value instanceof org.neo4j.driver.reactive.ReactiveResult) { + throw new ClientException(String.format( + "%s is not a valid return value, it should be consumed before producing a return value", + org.neo4j.driver.reactive.ReactiveResult.class.getName())); + } else if (value instanceof RxResult) { + throw new ClientException(String.format( + "%s is not a valid return value, it should be consumed before producing a return value", + RxResult.class.getName())); + } + return value; + })); Flux repeatableWork = Flux.usingWhen( beginTransaction(mode, config), work, diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java index 6622fb7a72..496065b0ba 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java @@ -248,7 +248,7 @@ void shouldUseBookmarksForAutoCommitTransactionsAndTransactionFunctions() { Session session = driver.session(); Bookmark initialBookmark = session.lastBookmark(); - session.writeTransaction(tx -> tx.run("CREATE ()")); + session.writeTransaction(tx -> tx.run("CREATE ()").consume()); Bookmark bookmark1 = session.lastBookmark(); assertNotNull(bookmark1); assertNotEquals(initialBookmark, bookmark1); @@ -259,7 +259,7 @@ void shouldUseBookmarksForAutoCommitTransactionsAndTransactionFunctions() { assertNotEquals(initialBookmark, bookmark2); assertNotEquals(bookmark1, bookmark2); - session.writeTransaction(tx -> tx.run("CREATE ()")); + session.writeTransaction(tx -> tx.run("CREATE ()").consume()); Bookmark bookmark3 = session.lastBookmark(); assertNotNull(bookmark3); assertNotEquals(initialBookmark, bookmark3); diff --git a/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java index 324d10b90b..3fd1182493 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/SessionIT.java @@ -63,9 +63,12 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.neo4j.driver.AccessMode; import org.neo4j.driver.AuthToken; import org.neo4j.driver.Config; @@ -1190,6 +1193,29 @@ void shouldErrorDatabaseNameUsingTxWithRetriesWhenDatabaseIsAbsent() throws Thro session.close(); } + @ParameterizedTest + @MethodSource("managedTransactionsReturningResult") + void shouldErrorWhenResultIsReturned(Function fn) { + // GIVEN + var session = neo4j.driver().session(); + + // WHEN & THEN + var error = assertThrows(ClientException.class, () -> fn.apply(session)); + assertEquals( + "org.neo4j.driver.Result is not a valid return value, it should be consumed before producing a return value", + error.getMessage()); + session.close(); + } + + @SuppressWarnings("deprecation") + static List> managedTransactionsReturningResult() { + return List.of( + session -> session.writeTransaction(tx -> tx.run("RETURN 1")), + session -> session.readTransaction(tx -> tx.run("RETURN 1")), + session -> session.executeWrite(tx -> tx.run("RETURN 1")), + session -> session.executeRead(tx -> tx.run("RETURN 1"))); + } + @SuppressWarnings("deprecation") private void testExecuteReadTx(AccessMode sessionMode) { Driver driver = neo4j.driver(); diff --git a/driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java index be43c83403..907178a183 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java @@ -56,10 +56,13 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.Record; @@ -752,6 +755,25 @@ void shouldAllowReturningNullFromAsyncTransactionFunction() { assertNull(await(writeResult)); } + @ParameterizedTest + @MethodSource("managedTransactionsReturningResultCursorStage") + void shouldErrorWhenResultCursorIsReturned(Function> fn) { + var error = assertThrows(ClientException.class, () -> await(fn.apply(session))); + assertEquals( + "org.neo4j.driver.async.ResultCursor is not a valid return value, it should be consumed before producing a return value", + error.getMessage()); + await(session.closeAsync()); + } + + @SuppressWarnings("deprecation") + static List>> managedTransactionsReturningResultCursorStage() { + return List.of( + session -> session.writeTransactionAsync(tx -> tx.runAsync("RETURN 1")), + session -> session.readTransactionAsync(tx -> tx.runAsync("RETURN 1")), + session -> session.executeWriteAsync(tx -> tx.runAsync("RETURN 1")), + session -> session.executeReadAsync(tx -> tx.runAsync("RETURN 1"))); + } + private Future>> runNestedQueries(ResultCursor inputCursor) { CompletableFuture>> resultFuture = new CompletableFuture<>(); runNestedQueries(inputCursor, new ArrayList<>(), resultFuture); diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java new file mode 100644 index 0000000000..1027e6ff6d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.integration.reactive; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; + +import java.util.List; +import java.util.function.Function; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; +import org.neo4j.driver.reactive.ReactiveResult; +import org.neo4j.driver.reactive.ReactiveSession; +import org.neo4j.driver.testutil.DatabaseExtension; +import org.neo4j.driver.testutil.ParallelizableIT; +import org.reactivestreams.Publisher; +import reactor.adapter.JdkFlowAdapter; +import reactor.core.publisher.Flux; + +@EnabledOnNeo4jWith(BOLT_V4) +@ParallelizableIT +class ReactiveSessionIT { + @RegisterExtension + static final DatabaseExtension neo4j = new DatabaseExtension(); + + @ParameterizedTest + @MethodSource("managedTransactionsReturningReactiveResultPublisher") + void shouldErrorWhenReactiveResultIsReturned(Function> fn) { + // GIVEN + var session = neo4j.driver().session(ReactiveSession.class); + + // WHEN & THEN + var error = assertThrows( + ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst()); + assertEquals( + "org.neo4j.driver.reactive.ReactiveResult is not a valid return value, it should be consumed before producing a return value", + error.getMessage()); + JdkFlowAdapter.flowPublisherToFlux(session.close()).blockFirst(); + } + + static List>> + managedTransactionsReturningReactiveResultPublisher() { + return List.of( + session -> JdkFlowAdapter.flowPublisherToFlux(session.executeWrite(tx -> tx.run("RETURN 1"))), + session -> JdkFlowAdapter.flowPublisherToFlux(session.executeRead(tx -> tx.run("RETURN 1")))); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java new file mode 100644 index 0000000000..f03117fc97 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveStreamsSessionIT.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.integration.reactive; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; + +import java.util.List; +import java.util.function.Function; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveSession; +import org.neo4j.driver.testutil.DatabaseExtension; +import org.neo4j.driver.testutil.ParallelizableIT; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +@EnabledOnNeo4jWith(BOLT_V4) +@ParallelizableIT +public class ReactiveStreamsSessionIT { + @RegisterExtension + static final DatabaseExtension neo4j = new DatabaseExtension(); + + @ParameterizedTest + @MethodSource("managedTransactionsReturningReactiveResultPublisher") + void shouldErrorWhenReactiveResultIsReturned(Function> fn) { + // GIVEN + var session = neo4j.driver().session(ReactiveSession.class); + + // WHEN & THEN + var error = assertThrows( + ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst()); + assertEquals( + "org.neo4j.driver.reactivestreams.ReactiveResult is not a valid return value, it should be consumed before producing a return value", + error.getMessage()); + Flux.from(session.close()).blockFirst(); + } + + static List>> + managedTransactionsReturningReactiveResultPublisher() { + return List.of( + session -> session.executeWrite(tx -> tx.run("RETURN 1")), + session -> session.executeRead(tx -> tx.run("RETURN 1"))); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java index c7c8fa8c86..57fd8df828 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java @@ -24,14 +24,19 @@ import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.exceptions.ClientException; @@ -179,6 +184,28 @@ void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() { assertNoParallelScheduler(); } + @ParameterizedTest + @MethodSource("managedTransactionsReturningReactiveResultPublisher") + void shouldErrorWhenReactiveResultIsReturned(Function> fn) { + // GIVEN + var session = neo4j.driver().rxSession(); + + // WHEN & THEN + var error = assertThrows( + ClientException.class, () -> Flux.from(fn.apply(session)).blockFirst()); + assertEquals( + "org.neo4j.driver.reactive.RxResult is not a valid return value, it should be consumed before producing a return value", + error.getMessage()); + Flux.from(session.close()).blockFirst(); + } + + @SuppressWarnings("deprecation") + static List>> managedTransactionsReturningReactiveResultPublisher() { + return List.of( + session -> session.writeTransaction(tx -> Flux.just(tx.run("RETURN 1"))), + session -> session.readTransaction(tx -> Flux.just(tx.run("RETURN 1")))); + } + private void assertNoParallelScheduler() { Set threadSet = Thread.getAllStackTraces().keySet(); for (Thread t : threadSet) {