diff --git a/driver/clirr-ignored-differences.xml b/driver/clirr-ignored-differences.xml index ff46e0640a..66893cb60c 100644 --- a/driver/clirr-ignored-differences.xml +++ b/driver/clirr-ignored-differences.xml @@ -122,4 +122,150 @@ org.reactivestreams.Publisher isOpen() + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + diff --git a/driver/src/main/java/org/neo4j/driver/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/QueryRunner.java index a27f767107..7492cc1af9 100644 --- a/driver/src/main/java/org/neo4j/driver/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/QueryRunner.java @@ -18,147 +18,11 @@ */ package org.neo4j.driver; -import java.util.Map; - /** - * Common interface for components that can execute Neo4j queries. - * - *

Important notes on semantics

- *

- * queries run in the same {@link QueryRunner} are guaranteed - * to execute in order, meaning changes made by one query will be seen - * by all subsequent queries in the same {@link QueryRunner}. - *

- * However, to allow handling very large results, and to improve performance, - * result streams are retrieved lazily from the network. - * This means that when any of {@link #run(Query)} - * methods return a result, the query has only started executing - it may not - * have completed yet. Most of the time, you will not notice this, because the - * driver automatically waits for queries to complete at specific points to - * fulfill its contracts. - *

- * Specifically, the driver will ensure all outstanding queries are completed - * whenever you: + * An {@link AutoCloseable} extension of the {@link SimpleQueryRunner}. * - *

- *

- * As noted, most of the time, you will not need to consider this - your writes will - * always be durably stored as long as you either use the results, explicitly commit - * {@link Transaction transactions} or close the session you used using {@link Session#close()}. - *

- * While these semantics introduce some complexity, it gives the driver the ability - * to handle infinite result streams (like subscribing to events), significantly lowers - * the memory overhead for your application and improves performance. - * - * @see Session - * @see Transaction * @since 1.0 */ -public interface QueryRunner extends AutoCloseable +public interface QueryRunner extends SimpleQueryRunner, AutoCloseable { - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This particular method takes a {@link Value} as its input. This is useful - * if you want to take a map-like value that you've gotten from a prior result - * and send it back as parameters. - *

- * If you are creating parameters programmatically, {@link #run(String, Map)} - * might be more helpful, it converts your map to a {@link Value} for you. - * - *

Example

- *
-     * {@code
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       Values.parameters( "myNameParam", "Bob" ) );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. - * @return a stream of result values and associated metadata - */ - Result run(String query, Value parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Map} of parameters. The values in the map - * must be values that can be converted to Neo4j types. See {@link Values#parameters(Object...)} for - * a list of allowed types. - * - *

Example

- *
-     * {@code
-     *
-     * Map parameters = new HashMap();
-     * parameters.put("myNameParam", "Bob");
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       parameters );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Map parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Record} of parameters, which can be useful - * if you want to use the output of one query as input for another. - * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Record parameters ); - - /** - * Run a query and return a result stream. - * - * @param query text of a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(String query ); - - /** - * Run a query and return a result stream. - *

Example

- *
-     * {@code
-     *
-     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
-     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
-     * }
-     * 
- * - * @param query a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(Query query); } diff --git a/driver/src/main/java/org/neo4j/driver/Session.java b/driver/src/main/java/org/neo4j/driver/Session.java index d4823b4969..1d4b7ad248 100644 --- a/driver/src/main/java/org/neo4j/driver/Session.java +++ b/driver/src/main/java/org/neo4j/driver/Session.java @@ -19,6 +19,7 @@ package org.neo4j.driver; import java.util.Map; +import java.util.function.Consumer; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.util.Resource; @@ -78,76 +79,179 @@ public interface Session extends Resource, QueryRunner /** * Execute a unit of work in a managed {@link AccessMode#READ read} transaction. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * Managed transactions should not be explicitly committed (via {@link Transaction#commit()}). * * @param work the {@link TransactionWork} to be applied to a new read transaction. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback)}. */ + @Deprecated T readTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#READ read} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one + * or more statements to be run. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. * - * @param work the {@link TransactionWork} to be applied to a new read transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeRead( TransactionCallback callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#READ read} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback, TransactionConfig)}. */ + @Deprecated T readTransaction( TransactionWork work, TransactionConfig config ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one + * or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeRead( TransactionCallback callback, TransactionConfig config ); + /** * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * Managed transactions should not be explicitly committed (via {@link Transaction#commit()}). * * @param work the {@link TransactionWork} to be applied to a new write transaction. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback)}. */ + @Deprecated T writeTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for + * one or more statements to be run. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. * - * @param work the {@link TransactionWork} to be applied to a new write transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeWrite( TransactionCallback callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * + * @param contextConsumer the consumer representing the unit of work. + */ + default void executeWriteWithoutResult( Consumer contextConsumer ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new write transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback, TransactionConfig)}. */ + @Deprecated T writeTransaction( TransactionWork work, TransactionConfig config ); /** - * Run a query in a managed auto-commit transaction with the specified - * {@link TransactionConfig configuration}, and return a result stream. + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeWrite( TransactionCallback callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config the transaction configuration for the managed transaction. + */ + default void executeWriteWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + + /** + * Run a query in a managed auto-commit transaction with the specified {@link TransactionConfig configuration}, and return a result stream. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a stream of result values and associated metadata. */ - Result run(String query, TransactionConfig config ); + Result run( String query, TransactionConfig config ); /** * Run a query with parameters in a managed auto-commit transaction with the diff --git a/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java new file mode 100644 index 0000000000..8e3101e766 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.Map; + +/** + * Common interface for components that can execute Neo4j queries. + * + *

Important notes on semantics

+ *

+ * queries run in the same {@link QueryRunner} are guaranteed to execute in order, meaning changes made by one query will be seen by all subsequent queries in + * the same {@link QueryRunner}. + *

+ * However, to allow handling very large results, and to improve performance, result streams are retrieved lazily from the network. This means that when any of + * {@link #run(Query)} methods return a result, the query has only started executing - it may not have completed yet. Most of the time, you will not notice + * this, because the driver automatically waits for queries to complete at specific points to fulfill its contracts. + *

+ * Specifically, the driver will ensure all outstanding queries are completed whenever you: + * + *

+ *

+ * As noted, most of the time, you will not need to consider this - your writes will + * always be durably stored as long as you either use the results, explicitly commit + * {@link Transaction transactions} or close the session you utilised using {@link Session#close()}. + *

+ * While these semantics introduce some complexity, it gives the driver the ability + * to handle infinite result streams (like subscribing to events), significantly lowers + * the memory overhead for your application and improves performance. + * + * @see Session + * @see Transaction + * @since 5.0 + */ +public interface SimpleQueryRunner +{ + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This particular method takes a {@link Value} as its input. This is useful if you want to take a map-like value that you've gotten from a prior result and + * send it back as parameters. + *

+ * If you are creating parameters programmatically, {@link #run(String, Map)} might be more helpful, it converts your map to a {@link Value} for you. + * + *

Example

+ *
+     * {@code
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       Values.parameters( "myNameParam", "Bob" ) );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. + * @return a stream of result values and associated metadata + */ + Result run( String query, Value parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Map} of parameters. The values in the map must be values that can be converted to Neo4j types. See {@link + * Values#parameters(Object...)} for a list of allowed types. + * + *

Example

+ *
+     * {@code
+     *
+     * Map parameters = new HashMap();
+     * parameters.put("myNameParam", "Bob");
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       parameters );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Map parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Record} of parameters, which can be useful if you want to use the output of one query as input for another. + * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Record parameters ); + + /** + * Run a query and return a result stream. + * + * @param query text of a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( String query ); + + /** + * Run a query and return a result stream. + *

Example

+ *
+     * {@code
+     *
+     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
+     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
+     * }
+     * 
+ * + * @param query a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( Query query ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionCallback.java b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java new file mode 100644 index 0000000000..c80b5c436c --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +/** + * Callback that executes operations in a given {@link TransactionContext}. + * + * @param the return type of this work. + */ +public interface TransactionCallback +{ + /** + * Executes all given operations in the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute( TransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionContext.java b/driver/src/main/java/org/neo4j/driver/TransactionContext.java new file mode 100644 index 0000000000..ad94347dff --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +/** + * A context for running queries within transaction. + */ +public interface TransactionContext extends SimpleQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionWork.java b/driver/src/main/java/org/neo4j/driver/TransactionWork.java index f78b0a7cc7..f868f6ad1b 100644 --- a/driver/src/main/java/org/neo4j/driver/TransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/TransactionWork.java @@ -19,12 +19,13 @@ package org.neo4j.driver; /** - * Callback that executes operations against a given {@link Transaction}. - * To be used with {@link Session#readTransaction(TransactionWork)} and - * {@link Session#writeTransaction(TransactionWork)} methods. + * Callback that executes operations against a given {@link Transaction}. To be used with {@link Session#readTransaction(TransactionWork)} and {@link + * Session#writeTransaction(TransactionWork)} methods. * * @param the return type of this work. + * @deprecated superseded by {@link TransactionCallback}. */ +@Deprecated public interface TransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java index a925612066..2b40688005 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java @@ -25,11 +25,12 @@ import java.util.function.Function; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; +import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; -import org.neo4j.driver.Bookmark; /** * Provides a context of work for database interactions. @@ -133,9 +134,32 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one + * or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeReadAsync( AsyncTransactionCallback> callback ) + { + return executeReadAsync( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#READ read} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -158,9 +182,29 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction. *

@@ -181,9 +225,32 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for + * one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeWriteAsync( AsyncTransactionCallback> callback ) + { + return executeWriteAsync( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -206,9 +273,29 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + /** * Run a query asynchronously in an auto-commit transaction with the specified {@link TransactionConfig configuration} and return a * {@link CompletionStage} with a result cursor. diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java new file mode 100644 index 0000000000..17300558fe --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +/** + * Callback that executes operations against a given {@link AsyncTransactionContext}. + * + * @param the return type of this work. + */ +public interface AsyncTransactionCallback +{ + /** + * Executes all given operations against the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute( AsyncTransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java new file mode 100644 index 0000000000..1667cffdcd --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +/** + * A context for running queries within transaction. + */ +public interface AsyncTransactionContext extends AsyncQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java index fc9a29cb91..64c14d832c 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java @@ -19,13 +19,14 @@ package org.neo4j.driver.async; /** - * Callback that executes operations against a given {@link AsyncTransaction}. - * To be used with {@link AsyncSession#readTransactionAsync(AsyncTransactionWork)} and - * {@link AsyncSession#writeTransactionAsync(AsyncTransactionWork)} (AsyncTransactionWork)} methods. + * Callback that executes operations against a given {@link AsyncTransaction}. To be used with {@link AsyncSession#readTransactionAsync(AsyncTransactionWork)} + * and {@link AsyncSession#writeTransactionAsync(AsyncTransactionWork)} (AsyncTransactionWork)} methods. * * @param the return type of this work. * @since 4.0 + * @deprecated superseded by {@link AsyncTransactionCallback}. */ +@Deprecated public interface AsyncTransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java new file mode 100644 index 0000000000..874d6e36f1 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionContext; +import org.neo4j.driver.Value; + +final class DelegatingTransactionContext implements TransactionContext +{ + private final Transaction delegate; + + public DelegatingTransactionContext( Transaction delegate ) + { + this.delegate = delegate; + } + + @Override + public Result run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query ) + { + return delegate.run( query ); + } + + @Override + public Result run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index e11133ddee..33008f71c4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -26,6 +26,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.ResultCursor; @@ -112,6 +113,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.READ, work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -124,6 +131,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.WRITE, work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java new file mode 100644 index 0000000000..f422df85f6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionContext; +import org.neo4j.driver.async.ResultCursor; + +final class DelegatingAsyncTransactionContext implements AsyncTransactionContext +{ + private final AsyncTransaction delegate; + + public DelegatingAsyncTransactionContext( AsyncTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public CompletionStage runAsync( String query, Value parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Map parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Record parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query ) + { + return delegate.runAsync( query ); + } + + @Override + public CompletionStage runAsync( Query query ) + { + return delegate.runAsync( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index efc291933b..23c12fded6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -28,6 +28,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.internal.util.Futures; @@ -99,6 +100,12 @@ public CompletionStage readTransactionAsync( AsyncTransactionWork CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return readTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public CompletionStage writeTransactionAsync( AsyncTransactionWork> work ) { @@ -111,6 +118,12 @@ public CompletionStage writeTransactionAsync( AsyncTransactionWork CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return writeTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java new file mode 100644 index 0000000000..d78116230b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionContext; + +final class DelegatingRxTransactionContext implements RxTransactionContext +{ + private final RxTransaction delegate; + + public DelegatingRxTransactionContext( RxTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public RxResult run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query ) + { + return delegate.run( query ); + } + + @Override + public RxResult run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 1907ace63f..0403a736b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -35,6 +35,7 @@ import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionCallback; import org.neo4j.driver.reactive.RxTransactionWork; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; @@ -115,6 +116,12 @@ public Publisher readTransaction( RxTransactionWork Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + @Override public Publisher writeTransaction( RxTransactionWork> work ) { @@ -127,6 +134,12 @@ public Publisher writeTransaction( RxTransactionWork Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + private Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, @@ -135,7 +148,7 @@ private Publisher runTransaction( AccessMode mode, RxTransactionWork the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeRead(RxTransactionCallback)}. * */ + @Deprecated Publisher readTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one + * or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeRead( RxTransactionCallback> callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#READ read} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -104,10 +129,29 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeRead(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher readTransaction( RxTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ); + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction.

@@ -125,10 +169,32 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for + * one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeWrite( RxTransactionCallback> callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -148,20 +214,38 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work, TransactionConfig config ); /** - * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. - * The query is not executed when the reactive result is returned. - * Instead, the publishers in the result will actually start the execution of the query. + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work + * will result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ); + + /** + * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. The query is not + * executed when the reactive result is returned. Instead, the publishers in the result will actually start the execution of the query. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run(String query, TransactionConfig config ); + RxResult run( String query, TransactionConfig config ); /** * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java new file mode 100644 index 0000000000..d763bb68b9 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +/** + * Callback that executes operations against a given {@link RxTransactionContext}. + * + * @param the return type of this work. + */ +public interface RxTransactionCallback +{ + /** + * Executes all given operations against the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute( RxTransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java new file mode 100644 index 0000000000..6d5e531467 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +/** + * A context for running queries within transaction. + */ +public interface RxTransactionContext extends RxQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java index ef68a8118f..2fe89b7e1a 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java @@ -19,13 +19,14 @@ package org.neo4j.driver.reactive; /** - * Callback that executes operations against a given {@link RxTransaction}. - * To be used with {@link RxSession#readTransaction(RxTransactionWork)} and - * {@link RxSession#writeTransaction(RxTransactionWork)} methods. + * Callback that executes operations against a given {@link RxTransaction}. To be used with {@link RxSession#readTransaction(RxTransactionWork)} and {@link + * RxSession#writeTransaction(RxTransactionWork)} methods. * * @param the return type of this work. * @since 4.0 + * @deprecated superseded by {@link RxTransactionCallback}. */ +@Deprecated public interface RxTransactionWork { /** diff --git a/driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java new file mode 100644 index 0000000000..146a2b238d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.Value; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class DelegatingTransactionContextTest +{ + Transaction transaction; + DelegatingTransactionContext context; + + @BeforeEach + void beforeEach() + { + transaction = mock( Transaction.class ); + context = new DelegatingTransactionContext( transaction ); + } + + @Test + void shouldDelegateRunWithValueParams() + { + // GIVEN + String query = "something"; + Value params = mock( Value.class ); + Result expected = mock( Result.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithMapParams() + { + // GIVEN + String query = "something"; + Map params = Collections.emptyMap(); + Result expected = mock( Result.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithRecordParams() + { + // GIVEN + String query = "something"; + Record params = mock( Record.class ); + Result expected = mock( Result.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRun() + { + // GIVEN + String query = "something"; + Result expected = mock( Result.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } + + @Test + void shouldDelegateRunWithQueryType() + { + // GIVEN + Query query = mock( Query.class ); + Result expected = mock( Result.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java new file mode 100644 index 0000000000..d3f1d4ecf1 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; + +import org.neo4j.driver.Session; +import org.neo4j.driver.TransactionCallback; +import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.TransactionContext; +import org.neo4j.driver.internal.async.NetworkSession; +import org.neo4j.driver.internal.retry.RetryLogic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class InternalSessionTest +{ + NetworkSession networkSession; + Session session; + + @BeforeEach + void beforeEach() + { + networkSession = mock( NetworkSession.class ); + session = new InternalSession( networkSession ); + } + + @ParameterizedTest + @MethodSource( "executeVariations" ) + void shouldDelegateExecuteReadToRetryLogic( ExecuteVariation executeVariation ) + { + // GIVEN + RetryLogic logic = mock( RetryLogic.class ); + String expected = ""; + given( logic.retry( any() ) ).willReturn( expected ); + given( networkSession.retryLogic() ).willReturn( logic ); + TransactionCallback tc = ( ignored ) -> expected; + Consumer consumer = ( ignored ) -> + { + }; + TransactionConfig config = TransactionConfig.builder().build(); + + // WHEN + String actual = null; + if ( executeVariation.readOnly ) + { + actual = executeVariation.explicitTxConfig ? session.executeRead( tc, config ) : session.executeRead( tc ); + } + else + { + if ( executeVariation.hasResult ) + { + actual = executeVariation.explicitTxConfig ? session.executeWrite( tc, config ) : session.executeWrite( tc ); + } + else + { + if ( executeVariation.explicitTxConfig ) + { + session.executeWriteWithoutResult( consumer, config ); + } + else + { + session.executeWriteWithoutResult( consumer ); + } + } + } + + // THEN + if ( executeVariation.hasResult ) + { + assertEquals( expected, actual ); + } + then( networkSession ).should().retryLogic(); + then( logic ).should().retry( any() ); + } + + static List executeVariations() + { + return Arrays.asList( + new ExecuteVariation( false, false, false ), + new ExecuteVariation( false, false, true ), + new ExecuteVariation( false, true, false ), + new ExecuteVariation( false, true, true ), + new ExecuteVariation( true, false, true ), + new ExecuteVariation( true, true, true ) + ); + } + + private static class ExecuteVariation + { + private final boolean readOnly; + private final boolean explicitTxConfig; + private final boolean hasResult; + + private ExecuteVariation( boolean readOnly, boolean explicitTxConfig, boolean hasResult ) + { + this.readOnly = readOnly; + this.explicitTxConfig = explicitTxConfig; + this.hasResult = hasResult; + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java new file mode 100644 index 0000000000..4c1609526e --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.ResultCursor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class DelegatingAsyncTransactionContextTest +{ + AsyncTransaction transaction; + DelegatingAsyncTransactionContext context; + + @BeforeEach + void beforeEach() + { + transaction = mock( AsyncTransaction.class ); + context = new DelegatingAsyncTransactionContext( transaction ); + } + + @Test + void shouldDelegateRunWithValueParams() + { + // GIVEN + String query = "something"; + Value params = mock( Value.class ); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query, params ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query, params ); + } + + @Test + void shouldDelegateRunWithMapParams() + { + // GIVEN + String query = "something"; + Map params = Collections.emptyMap(); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query, params ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query, params ); + } + + @Test + void shouldDelegateRunWithRecordParams() + { + // GIVEN + String query = "something"; + Record params = mock( Record.class ); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query, params ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query, params ); + } + + @Test + void shouldDelegateRun() + { + // GIVEN + String query = "something"; + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query ); + } + + @Test + void shouldDelegateRunWithQueryType() + { + // GIVEN + Query query = mock( Query.class ); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java index 627ea6750a..36e22112af 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java @@ -24,7 +24,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -35,6 +39,7 @@ import org.neo4j.driver.Value; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ServiceUnavailableException; @@ -59,6 +64,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -132,7 +139,7 @@ private static Stream>> allRunTxM @ParameterizedTest @MethodSource( "allSessionRunMethods" ) - void shouldFlushOnRun( Function> runReturnOne ) throws Throwable + void shouldFlushOnRun( Function> runReturnOne ) { setupSuccessfulRunAndPull( connection ); @@ -143,7 +150,7 @@ void shouldFlushOnRun( Function> runR @ParameterizedTest @MethodSource( "allBeginTxMethods" ) - void shouldDelegateBeginTx( Function> beginTx ) throws Throwable + void shouldDelegateBeginTx( Function> beginTx ) { AsyncTransaction tx = await( beginTx.apply( asyncSession ) ); @@ -153,7 +160,7 @@ void shouldDelegateBeginTx( Function> runTx ) throws Throwable + void txRunShouldBeginAndCommitTx( Function> runTx ) { String string = await( runTx.apply( asyncSession ) ); @@ -224,21 +231,48 @@ void writeTxRetriedUntilFailureWhenTxCloseThrows() testTxIsRetriedUntilFailureWhenCommitFails( WRITE ); } - @Test - void shouldCloseSession() throws Throwable + void shouldCloseSession() { - await ( asyncSession.closeAsync() ); + await( asyncSession.closeAsync() ); assertFalse( this.session.isOpen() ); } @Test - void shouldReturnBookmark() throws Throwable + void shouldReturnBookmark() { session = newSession( connectionProvider, InternalBookmark.parse( "Bookmark1" ) ); asyncSession = new InternalAsyncSession( session ); - assertThat( asyncSession.lastBookmark(), equalTo( session.lastBookmark() )); + assertThat( asyncSession.lastBookmark(), equalTo( session.lastBookmark() ) ); + } + + @ParameterizedTest + @MethodSource( "executeVariations" ) + void shouldDelegateExecuteReadToRetryLogic( ExecuteVariation executeVariation ) throws ExecutionException, InterruptedException + { + // GIVEN + NetworkSession networkSession = mock( NetworkSession.class ); + AsyncSession session = new InternalAsyncSession( networkSession ); + RetryLogic logic = mock( RetryLogic.class ); + String expected = ""; + given( networkSession.retryLogic() ).willReturn( logic ); + AsyncTransactionCallback> tc = ( ignored ) -> CompletableFuture.completedFuture( expected ); + given( logic.retryAsync( any() ) ).willReturn( tc.execute( null ) ); + TransactionConfig config = TransactionConfig.builder().build(); + + // WHEN + CompletionStage actual = executeVariation.readOnly ? + ( + executeVariation.explicitTxConfig ? session.executeReadAsync( tc, config ) : session.executeReadAsync( tc ) + ) : ( + executeVariation.explicitTxConfig ? session.executeWriteAsync( tc, config ) : session.executeWriteAsync( tc ) + ); + + // THEN + assertEquals( expected, actual.toCompletableFuture().get() ); + then( networkSession ).should().retryLogic(); + then( logic ).should().retryAsync( any() ); } private void testTxRollbackWhenThrows( AccessMode transactionMode ) @@ -391,4 +425,26 @@ public CompletionStage execute( AsyncTransaction tx ) return completedFuture( result ); } } + + static List executeVariations() + { + return Arrays.asList( + new ExecuteVariation( false, false ), + new ExecuteVariation( false, true ), + new ExecuteVariation( true, false ), + new ExecuteVariation( true, true ) + ); + } + + private static class ExecuteVariation + { + private final boolean readOnly; + private final boolean explicitTxConfig; + + private ExecuteVariation( boolean readOnly, boolean explicitTxConfig ) + { + this.readOnly = readOnly; + this.explicitTxConfig = explicitTxConfig; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java new file mode 100644 index 0000000000..267d2388e6 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class DelegatingRxTransactionContextTest +{ + RxTransaction transaction; + DelegatingRxTransactionContext context; + + @BeforeEach + void beforeEach() + { + transaction = mock( RxTransaction.class ); + context = new DelegatingRxTransactionContext( transaction ); + } + + @Test + void shouldDelegateRunWithValueParams() + { + // GIVEN + String query = "something"; + Value params = mock( Value.class ); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithMapParams() + { + // GIVEN + String query = "something"; + Map params = Collections.emptyMap(); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithRecordParams() + { + // GIVEN + String query = "something"; + Record params = mock( Record.class ); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRun() + { + // GIVEN + String query = "something"; + RxResult expected = mock( RxResult.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } + + @Test + void shouldDelegateRunWithQueryType() + { + // GIVEN + Query query = mock( Query.class ); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index c0ef7e7483..682ec8c6de 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -26,6 +26,8 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -42,20 +44,25 @@ import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.cursor.RxResultCursorImpl; +import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.util.FixedRetryLogic; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionCallback; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -309,4 +316,54 @@ void shouldDelegateClose() verify( session ).closeAsync(); verifyNoMoreInteractions( session ); } + + @ParameterizedTest + @MethodSource( "executeVariations" ) + void shouldDelegateExecuteReadToRetryLogic( ExecuteVariation executeVariation ) + { + // GIVEN + NetworkSession networkSession = mock( NetworkSession.class ); + RxSession session = new InternalRxSession( networkSession ); + RetryLogic logic = mock( RetryLogic.class ); + String expected = ""; + given( networkSession.retryLogic() ).willReturn( logic ); + RxTransactionCallback> tc = ( ignored ) -> Mono.justOrEmpty( expected ); + given( logic.retryRx( any() ) ).willReturn( tc.execute( null ) ); + TransactionConfig config = TransactionConfig.builder().build(); + + // WHEN + Publisher actual = executeVariation.readOnly ? + ( + executeVariation.explicitTxConfig ? session.executeRead( tc, config ) : session.executeRead( tc ) + ) : ( + executeVariation.explicitTxConfig ? session.executeWrite( tc, config ) : session.executeWrite( tc ) + ); + + // THEN + assertEquals( expected, Mono.from( actual ).block() ); + then( networkSession ).should().retryLogic(); + then( logic ).should().retryRx( any() ); + } + + static List executeVariations() + { + return Arrays.asList( + new ExecuteVariation( false, false ), + new ExecuteVariation( false, true ), + new ExecuteVariation( true, false ), + new ExecuteVariation( true, true ) + ); + } + + private static class ExecuteVariation + { + private final boolean readOnly; + private final boolean explicitTxConfig; + + private ExecuteVariation( boolean readOnly, boolean explicitTxConfig ) + { + this.readOnly = readOnly; + this.explicitTxConfig = explicitTxConfig; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java index e1c85057ea..f0d94b968d 100644 --- a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java +++ b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java @@ -30,6 +30,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.Value; @@ -94,6 +95,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return realSession.readTransaction( work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeRead( callback, config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -106,6 +113,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return realSession.writeTransaction( work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeWrite( callback, config ); + } + @Override public Bookmark lastBookmark() { @@ -113,9 +126,9 @@ public Bookmark lastBookmark() } @Override - public Result run(String query, Map parameters) + public Result run( String query, Map parameters ) { - return realSession.run(query, parameters); + return realSession.run( query, parameters ); } @Override