diff --git a/driver/src/main/java/org/neo4j/driver/ResultResourcesHandler.java b/driver/src/main/java/org/neo4j/driver/ResultResourcesHandler.java deleted file mode 100644 index 95e19ec5de..0000000000 --- a/driver/src/main/java/org/neo4j/driver/ResultResourcesHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver; - -public interface ResultResourcesHandler -{ - void resultFetched(); - - void resultFailed( Throwable error ); - - ResultResourcesHandler NO_OP = new ResultResourcesHandler() - { - @Override - public void resultFetched() - { - } - - @Override - public void resultFailed( Throwable error ) - { - } - }; -} diff --git a/driver/src/main/java/org/neo4j/driver/StatementKeys.java b/driver/src/main/java/org/neo4j/driver/StatementKeys.java deleted file mode 100644 index de2d237495..0000000000 --- a/driver/src/main/java/org/neo4j/driver/StatementKeys.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver; - -import java.util.Collections; -import java.util.List; - -public class StatementKeys -{ - private List keys; - - public boolean isPopulated() - { - return keys != null; - } - - public void set( List keys ) - { - this.keys = keys; - } - - public List asList() - { - return keys == null ? Collections.emptyList() : keys; - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalPair.java b/driver/src/main/java/org/neo4j/driver/internal/InternalPair.java index 036e215322..42a4913fd0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalPair.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalPair.java @@ -20,7 +20,6 @@ import java.util.Objects; -import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.Pair; public class InternalPair implements Pair @@ -57,11 +56,6 @@ public String toString() return String.format( "%s: %s", Objects.toString( key ), Objects.toString( value ) ); } - public String toString( Function printValue ) - { - return String.format( "%s: %s", key, printValue.apply( value ) ); - } - @Override public boolean equals( Object o ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java index 87edc623f6..1869b682c2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java @@ -23,7 +23,6 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.exceptions.ProtocolException; @@ -37,18 +36,16 @@ public class RoutingProcedureClusterCompositionProvider implements ClusterCompos private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to "; private final Clock clock; - private final Logger log; private final RoutingProcedureRunner routingProcedureRunner; - public RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings ) + public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingSettings settings ) { - this( clock, log, new RoutingProcedureRunner( settings.routingContext() ) ); + this( clock, new RoutingProcedureRunner( settings.routingContext() ) ); } - RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingProcedureRunner routingProcedureRunner ) + RoutingProcedureClusterCompositionProvider( Clock clock, RoutingProcedureRunner routingProcedureRunner ) { this.clock = clock; - this.log = log; this.routingProcedureRunner = routingProcedureRunner; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index d78db828a1..7e866d07ad 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -268,7 +268,7 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R { Logger log = loadBalancerLogger( logging ); ClusterCompositionProvider clusterCompositionProvider = - new RoutingProcedureClusterCompositionProvider( clock, log, settings ); + new RoutingProcedureClusterCompositionProvider( clock, settings ); return new Rediscovery( initialRouter, settings, clusterCompositionProvider, eventExecutorGroup, new DnsResolver( log ), log ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Consumers.java b/driver/src/main/java/org/neo4j/driver/internal/util/Consumers.java deleted file mode 100644 index 16b6de7825..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Consumers.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.util; - -import org.neo4j.driver.v1.util.Consumer; - -public final class Consumers -{ - private Consumers() - { - throw new UnsupportedOperationException( "Do not instantiate" ); - } - - public static Consumer noOp() - { - return new Consumer() - { - @Override - public void accept( T t ) - { - //Do nothing - } - }; - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java b/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java index 7a9846f708..37a673ecee 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.neo4j.driver.v1.util.Function; @@ -108,14 +107,4 @@ public void remove() } }; } - - public static Map mapValues( Map map, Function f ) - { - HashMap transformed = new HashMap<>( map.size() ); - for ( Entry entry : map.entrySet() ) - { - transformed.put( entry.getKey(), f.apply( entry.getValue() ) ); - } - return transformed; - } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Driver.java b/driver/src/main/java/org/neo4j/driver/v1/Driver.java index 2331d7c2f2..00acfa9865 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Driver.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Driver.java @@ -139,9 +139,20 @@ public interface Driver extends AutoCloseable Session session( AccessMode mode, Iterable bookmarks ); /** - * Close all the resources assigned to this driver, including any open connections. + * Close all the resources assigned to this driver, including open connections and IO threads. + *

+ * This operation works the same way as {@link #closeAsync()} but blocks until all resources are closed. */ + @Override void close(); + /** + * Close all the resources assigned to this driver, including open connections and IO threads. + *

+ * This operation is asynchronous and returns a {@link CompletionStage}. This stage is completed with + * {@code null} when all resources are closed. It is completed exceptionally if termination fails. + * + * @return a {@link CompletionStage completion stage} that represents the asynchronous close. + */ CompletionStage closeAsync(); } diff --git a/driver/src/main/java/org/neo4j/driver/v1/ResponseListener.java b/driver/src/main/java/org/neo4j/driver/v1/ResponseListener.java deleted file mode 100644 index dbc0652220..0000000000 --- a/driver/src/main/java/org/neo4j/driver/v1/ResponseListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.v1; - -public interface ResponseListener -{ - void operationCompleted( T result, Throwable error ); -} diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index e177e3097b..e6db0b754b 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -19,6 +19,8 @@ package org.neo4j.driver.v1; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.Function; import org.neo4j.driver.v1.util.Resource; @@ -42,19 +44,21 @@ * the graph seen by the previous query. For more on causal consistency, see * the Neo4j clustering manual. *

- * Typically, a session will wrap a TCP connection. Such a connection will be - * acquired from a connection pool and released back there when the session is - * destroyed. One connection can therefore be adopted by many sessions, - * although by only one at a time. Application code should never need to deal - * directly with connection management. + * Typically, a session will acquire a TCP connection to execute query or + * transaction. Such a connection will be acquired from a connection pool + * and released back there when query result is consumed or transaction is + * committed or rolled back. One connection can therefore be adopted by many + * sessions, although by only one at a time. Application code should never need + * to deal directly with connection management. *

* A session inherits its destination address and permissions from its - * underlying connection. This means that one session may only ever target one - * machine within a cluster and does not support re-authentication. To achieve - * otherwise requires creation of a separate session. + * underlying connection. This means that for a single query/transaction one + * session may only ever target one machine within a cluster and does not + * support re-authentication. To achieve otherwise requires creation of a + * separate session. *

* Similarly, multiple sessions should be used when working with concurrency; - * session implementations are generally not thread safe. + * session implementations are not thread safe. * * @since 1.0 */ @@ -65,6 +69,9 @@ public interface Session extends Resource, StatementRunner * most one transaction may exist in a session at any point in time. To * maintain multiple concurrent transactions, use multiple concurrent * sessions. + *

+ * This operation works the same way as {@link #beginTransactionAsync()} but blocks until transaction is actually + * started. * * @return a new {@link Transaction} */ @@ -84,6 +91,25 @@ public interface Session extends Resource, StatementRunner @Deprecated Transaction beginTransaction( String bookmark ); + /** + * Begin a new explicit {@linkplain Transaction transaction}. At + * most one transaction may exist in a session at any point in time. To + * maintain multiple concurrent transactions, use multiple concurrent + * sessions. + *

+ * This operation is asynchronous and returns a {@link CompletionStage}. This stage is completed with a new + * {@link Transaction} object when begin operation is successful. It is completed exceptionally if + * transaction can't be started. + *

+ * Returned stage can be completed by an IO thread which should never block. Otherwise IO operations on this and + * potentially other network connections might deadlock. Please do not chain blocking operations like + * {@link #run(String)} on the returned stage. Driver will throw {@link IllegalStateException} when blocking API + * call is executed in IO thread. Consider using asynchronous calls throughout the chain or offloading blocking + * operation to a different {@link Executor}. This can be done using methods with "Async" suffix like + * {@link CompletionStage#thenApplyAsync(Function)} or {@link CompletionStage#thenApplyAsync(Function, Executor)}. + * + * @return a {@link CompletionStage completion stage} that represents the asynchronous begin of a transaction. + */ CompletionStage beginTransactionAsync(); /** @@ -91,6 +117,9 @@ public interface Session extends Resource, StatementRunner *

* Transaction will automatically be committed unless exception is thrown from the unit of work itself or from * {@link Transaction#close()} or transaction is explicitly marked for failure via {@link Transaction#failure()}. + *

+ * This operation works the same way as {@link #readTransactionAsync(TransactionWork)} but blocks until given + * blocking unit of work is completed. * * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param the return type of the given unit of work. @@ -98,13 +127,37 @@ public interface Session extends Resource, StatementRunner */ T readTransaction( TransactionWork work ); + /** + * Execute given unit of asynchronous work in a {@link AccessMode#READ read} asynchronous transaction. + *

+ * Transaction will automatically be committed unless given unit of work fails or + * {@link Transaction#commitAsync() async transaction commit} fails. It will also not be committed if explicitly + * rolled back via {@link Transaction#rollbackAsync()}. + *

+ * Returned stage and given {@link TransactionWork} can be completed/executed by an IO thread which should never + * block. Otherwise IO operations on this and potentially other network connections might deadlock. Please do not + * chain blocking operations like {@link #run(String)} on the returned stage and do not use them inside the + * {@link TransactionWork}. Driver will throw {@link IllegalStateException} when blocking API + * call is executed in IO thread. Consider using asynchronous calls throughout the chain or offloading blocking + * operation to a different {@link Executor}. This can be done using methods with "Async" suffix like + * {@link CompletionStage#thenApplyAsync(Function)} or {@link CompletionStage#thenApplyAsync(Function, Executor)}. + * + * @param work the {@link TransactionWork} to be applied to a new read transaction. Operation executed by the + * given work must be asynchronous. + * @param the return type of the given unit of work. + * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given + * unit of work. Stage can be completed exceptionally if given work or commit fails. + */ CompletionStage readTransactionAsync( TransactionWork> work ); /** - * Execute given unit of work in a {@link AccessMode#WRITE write} transaction. + * Execute given unit of work in a {@link AccessMode#WRITE write} transaction. *

* Transaction will automatically be committed unless exception is thrown from the unit of work itself or from * {@link Transaction#close()} or transaction is explicitly marked for failure via {@link Transaction#failure()}. + *

+ * This operation works the same way as {@link #writeTransactionAsync(TransactionWork)} but blocks until given + * blocking unit of work is completed. * * @param work the {@link TransactionWork} to be applied to a new write transaction. * @param the return type of the given unit of work. @@ -112,6 +165,27 @@ public interface Session extends Resource, StatementRunner */ T writeTransaction( TransactionWork work ); + /** + * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction. + *

+ * Transaction will automatically be committed unless given unit of work fails or + * {@link Transaction#commitAsync() async transaction commit} fails. It will also not be committed if explicitly + * rolled back via {@link Transaction#rollbackAsync()}. + *

+ * Returned stage and given {@link TransactionWork} can be completed/executed by an IO thread which should never + * block. Otherwise IO operations on this and potentially other network connections might deadlock. Please do not + * chain blocking operations like {@link #run(String)} on the returned stage and do not use them inside the + * {@link TransactionWork}. Driver will throw {@link IllegalStateException} when blocking API + * call is executed in IO thread. Consider using asynchronous calls throughout the chain or offloading blocking + * operation to a different {@link Executor}. This can be done using methods with "Async" suffix like + * {@link CompletionStage#thenApplyAsync(Function)} or {@link CompletionStage#thenApplyAsync(Function, Executor)}. + * + * @param work the {@link TransactionWork} to be applied to a new write transaction. Operation executed by the + * given work must be asynchronous. + * @param the return type of the given unit of work. + * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given + * unit of work. Stage can be completed exceptionally if given work or commit fails. + */ CompletionStage writeTransactionAsync( TransactionWork> work ); /** @@ -142,14 +216,23 @@ public interface Session extends Resource, StatementRunner void reset(); /** - * Signal that you are done using this session. In the default driver usage, closing - * and accessing sessions is very low cost, because sessions are pooled by {@link Driver}. - * - * When this method returns, all outstanding statements in the session are guaranteed to - * have completed, meaning any writes you performed are guaranteed to be durably stored. + * Signal that you are done using this session. In the default driver usage, closing and accessing sessions is + * very low cost. + *

+ * This operation works the same way as {@link #closeAsync()} but blocks until session is actually closed. */ @Override void close(); + /** + * Signal that you are done using this session. In the default driver usage, closing and accessing sessions is + * very low cost. + *

+ * This operation is asynchronous and returns a {@link CompletionStage}. Stage is completed when all outstanding + * statements in the session have completed, meaning any writes you performed are guaranteed to be durably stored. + * It might be completed exceptionally when there are unconsumed errors from previous statements or transactions. + * + * @return a {@link CompletionStage completion stage} that represents the asynchronous close. + */ CompletionStage closeAsync(); } diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java index 7d842413fc..0d1224d23e 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java @@ -20,11 +20,45 @@ import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import org.neo4j.driver.v1.exceptions.NoSuchRecordException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.util.Consumer; import org.neo4j.driver.v1.util.Function; +/** + * The result of asynchronous execution of a Cypher statement, conceptually an asynchronous stream of + * {@link Record records}. + *

+ * Result can be eagerly fetched in a list using {@link #listAsync()} or navigated lazily using + * {@link #forEachAsync(Consumer)} or {@link #nextAsync()}. + *

+ * Results are valid until the next statement is run or until the end of the current transaction, + * whichever comes first. To keep a result around while further statements are run, or to use a result outside the scope + * of the current transaction, see {@link #listAsync()}. + *

+ *

Important note on semantics

+ *

+ * In order to handle very large results, and to minimize memory overhead and maximize + * performance, results are retrieved lazily. Please see {@link StatementRunner} for + * important details on the effects of this. + *

+ * The short version is that, if you want a hard guarantee that the underlying statement + * has completed, you need to either call {@link Transaction#commitAsync()} on the {@link Transaction transaction} + * or {@link Session#closeAsync()} on the {@link Session session} that created this result, or you need to use + * the result. + *

+ * Note: Every returned {@link CompletionStage} can be completed by an IO thread which should never block. + * Otherwise IO operations on this and potentially other network connections might deadlock. Please do not chain + * blocking operations like {@link Session#run(String)} on the returned stages. Driver will throw + * {@link IllegalStateException} when blocking API call is executed in IO thread. Consider using asynchronous calls + * throughout the chain or offloading blocking operation to a different {@link Executor}. This can be done using + * methods with "Async" suffix like {@link CompletionStage#thenApplyAsync(java.util.function.Function)} or + * {@link CompletionStage#thenApplyAsync(java.util.function.Function, Executor)}. + * + * @since 1.5 + */ public interface StatementResultCursor { /** @@ -34,19 +68,97 @@ public interface StatementResultCursor */ List keys(); + /** + * Asynchronously retrieve the result summary. + *

+ * If the records in the result is not fully consumed, then calling this method will force to pull all remaining + * records into buffer to yield the summary. + *

+ * If you want to obtain the summary but discard the records, use {@link #consumeAsync()} instead. + * + * @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be + * completed exceptionally if query execution fails. + */ CompletionStage summaryAsync(); + /** + * Asynchronously navigate to and retrieve the next {@link Record} in this result. Returned stage can contain + * {@code null} if end of records stream has been reached. + * + * @return a {@link CompletionStage} completed with a record or {@code null}. Stage can also be + * completed exceptionally if query execution fails. + */ CompletionStage nextAsync(); + /** + * Asynchronously investigate the next upcoming {@link Record} without moving forward in the result. Returned + * stage can contain {@code null} if end of records stream has been reached. + * + * @return a {@link CompletionStage} completed with a record or {@code null}. Stage can also be + * completed exceptionally if query execution fails. + */ CompletionStage peekAsync(); + /** + * Asynchronously return the first record in the result, failing if there is not exactly + * one record left in the stream. + * + * @return a {@link CompletionStage} completed with the first and only record in the stream. Stage will be + * completed exceptionally with {@link NoSuchRecordException} if there is not exactly one record left in the + * stream. It can also be completed exceptionally if query execution fails. + */ CompletionStage singleAsync(); + /** + * Asynchronously consume the entire result, yielding a summary of it. Calling this method exhausts the result. + * + * @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be + * completed exceptionally if query execution fails. + */ CompletionStage consumeAsync(); + /** + * Asynchronously apply the given {@link Consumer action} to every record in the result, yielding a summary of it. + * + * @param action the function to be applied to every record in the result. Provided function should not block. + * @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be + * completed exceptionally if query execution or provided function fails. + */ CompletionStage forEachAsync( Consumer action ); + /** + * Asynchronously retrieve and store the entire result stream. + * This can be used if you want to iterate over the stream multiple times or to store the + * whole result for later use. + *

+ * Note that this method can only be used if you know that the statement that + * yielded this result returns a finite stream. Some statements can yield + * infinite results, in which case calling this method will lead to running + * out of memory. + *

+ * Calling this method exhausts the result. + * + * @return a {@link CompletionStage} completed with a list of all remaining immutable records. Stage can also be + * completed exceptionally if query execution fails. + */ CompletionStage> listAsync(); + /** + * Asynchronously retrieve and store a projection of the entire result. + * This can be used if you want to iterate over the stream multiple times or to store the + * whole result for later use. + *

+ * Note that this method can only be used if you know that the statement that + * yielded this result returns a finite stream. Some statements can yield + * infinite results, in which case calling this method will lead to running + * out of memory. + *

+ * Calling this method exhausts the result. + * + * @param mapFunction a function to map from Record to T. See {@link Records} for some predefined functions. + * @param the type of result list elements + * @return a {@link CompletionStage} completed with a list of all remaining immutable records. Stage can also be + * completed exceptionally if query execution or provided function fails. + */ CompletionStage> listAsync( Function mapFunction ); } diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementRunner.java b/driver/src/main/java/org/neo4j/driver/v1/StatementRunner.java index 78305a7a40..7f5a5f21f6 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/StatementRunner.java +++ b/driver/src/main/java/org/neo4j/driver/v1/StatementRunner.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.Function; import org.neo4j.driver.v1.types.TypeSystem; import org.neo4j.driver.v1.util.Experimental; @@ -34,31 +36,47 @@ * by all subsequent statements in the same {@link StatementRunner}. * * However, to allow handling very large results, and to improve performance, - * result streams are retrieved lazily. This means that when any of the - * {@link #run(Statement)} methods return a result, the statement has only - * started executing - it may not have completed yet. Most of the - * time, you will not notice this, because the driver automatically - * waits for statements to complete at specific points to fulfill its contracts. + * result streams are retrieved lazily from the network. This means that when + * any of the blocking {@link #run(Statement)} or async {@link #runAsync(Statement)} + * methods return a result, the statement has only started executing - it may not + * have completed yet. Most of the time, you will not notice this, because the + * driver automatically waits for statements to complete at specific points to + * fulfill its contracts. * * Specifically, the driver will ensure all outstanding statements are completed * whenever you: * *

    - *
  • Read from or discard a result, for instance via {@link StatementResult#next()}, - * {@link StatementResult#consume()}.
  • - *
  • Explicitly commit a transaction using {@link Transaction#close()}
  • - *
  • Return a session to the pool using {@link Session#close()}
  • + *
  • Read from or discard a result, for instance via blocking + * {@link StatementResult#next()}, {@link StatementResult#consume()} or async + * {@link StatementResultCursor#nextAsync()}, {@link StatementResultCursor#consumeAsync()}
  • + *
  • Explicitly commit/rollback a transaction using blocking {@link Transaction#close()} + * or async {@link Transaction#commitAsync()}, {@link Transaction#rollbackAsync()}
  • + *
  • Close a session using blocking {@link Session#close()} or + * async {@link Session#closeAsync()}
  • *
* * As noted, most of the time, you will not need to consider this - your writes will * always be durably stored as long as you either use the results, explicitly commit - * {@link Transaction transactions} or return the session you used to the pool using - * {@link Session#close()}. + * {@link Transaction transactions} or close the session you used using {@link Session#close()}. * * While these semantics introduce some complexity, it gives the driver the ability * to handle infinite result streams (like subscribing to events), significantly lowers * the memory overhead for your application and improves performance. * + *

Asynchronous API

+ * + * All overloads of {@link #runAsync(Statement)} execute queries in async fashion and return {@link CompletionStage} of + * a new {@link StatementResultCursor}. Stage can be completed exceptionally when error happens, e.g. connection can't + * be acquired from the pool. + *

+ * Note: Returned stage can be completed by an IO thread which should never block. Otherwise IO operations on + * this and potentially other network connections might deadlock. Please do not chain blocking operations like + * {@link #run(String)} on the returned stage. Driver will throw {@link IllegalStateException} when blocking API + * call is executed in IO thread. Consider using asynchronous calls throughout the chain or offloading blocking + * operation to a different {@link Executor}. This can be done using methods with "Async" suffix like + * {@link CompletionStage#thenApplyAsync(Function)} or {@link CompletionStage#thenApplyAsync(Function, Executor)}. + * * @see Session * @see Transaction * @since 1.0 @@ -83,6 +101,7 @@ public interface StatementRunner *

Example

*
      * {@code
+     *
      * StatementResult cursor = session.run( "MATCH (n) WHERE n.name = {myNameParam} RETURN (n)",
      *                                       Values.parameters( "myNameParam", "Bob" ) );
      * }
@@ -94,7 +113,39 @@ public interface StatementRunner
      */
     StatementResult run( String statementTemplate, Value parameters );
 
-    CompletionStage runAsync( String statementText, Value parameters );
+    /**
+     * Run a statement asynchronously and return a {@link CompletionStage} with a
+     * result cursor.
+     * 

+ * This method takes a set of parameters that will be injected into the + * statement by Neo4j. Using parameters is highly encouraged, it helps avoid + * dangerous cypher injection attacks and improves database performance as + * Neo4j can re-use query plans more often. + *

+ * This particular method takes a {@link Value} as its input. This is useful + * if you want to take a map-like value that you've gotten from a prior result + * and send it back as parameters. + *

+ * If you are creating parameters programmatically, {@link #runAsync(String, Map)} + * might be more helpful, it converts your map to a {@link Value} for you. + *

Example

+ *
+     * {@code
+     *
+     * CompletionStage cursorStage = session.runAsync(
+     *             "MATCH (n) WHERE n.name = {myNameParam} RETURN (n)",
+     *             Values.parameters("myNameParam", "Bob"));
+     * }
+     * 
+ * It is not allowed to chain blocking operations on the returned {@link CompletionStage}. See class javadoc for + * more information. + * + * @param statementTemplate text of a Neo4j statement + * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. + * @return new {@link CompletionStage} that gets completed with a result cursor when query execution is successful. + * Stage can be completed exceptionally when error happens, e.g. connection can't be acquired from the pool. + */ + CompletionStage runAsync( String statementTemplate, Value parameters ); /** * Run a statement and return a result stream. @@ -111,6 +162,7 @@ public interface StatementRunner *

Example

*
      * {@code
+     *
      * Map parameters = new HashMap();
      * parameters.put("myNameParam", "Bob");
      *
@@ -125,6 +177,38 @@ public interface StatementRunner
      */
     StatementResult run( String statementTemplate, Map statementParameters );
 
+    /**
+     * Run a statement asynchronously and return a {@link CompletionStage} with a
+     * result cursor.
+     * 

+ * This method takes a set of parameters that will be injected into the + * statement by Neo4j. Using parameters is highly encouraged, it helps avoid + * dangerous cypher injection attacks and improves database performance as + * Neo4j can re-use query plans more often. + *

+ * This version of runAsync takes a {@link Map} of parameters. The values in the map + * must be values that can be converted to Neo4j types. See {@link Values#parameters(Object...)} for + * a list of allowed types. + *

Example

+ *
+     * {@code
+     *
+     * Map parameters = new HashMap();
+     * parameters.put("myNameParam", "Bob");
+     *
+     * CompletionStage cursorStage = session.runAsync(
+     *             "MATCH (n) WHERE n.name = {myNameParam} RETURN (n)",
+     *             parameters);
+     * }
+     * 
+ * It is not allowed to chain blocking operations on the returned {@link CompletionStage}. See class javadoc for + * more information. + * + * @param statementTemplate text of a Neo4j statement + * @param statementParameters input data for the statement + * @return new {@link CompletionStage} that gets completed with a result cursor when query execution is successful. + * Stage can be completed exceptionally when error happens, e.g. connection can't be acquired from the pool. + */ CompletionStage runAsync( String statementTemplate, Map statementParameters ); /** @@ -144,6 +228,26 @@ public interface StatementRunner */ StatementResult run( String statementTemplate, Record statementParameters ); + /** + * Run a statement asynchronously and return a {@link CompletionStage} with a + * result cursor. + *

+ * This method takes a set of parameters that will be injected into the + * statement by Neo4j. Using parameters is highly encouraged, it helps avoid + * dangerous cypher injection attacks and improves database performance as + * Neo4j can re-use query plans more often. + *

+ * This version of runAsync takes a {@link Record} of parameters, which can be useful + * if you want to use the output of one statement as input for another. + *

+ * It is not allowed to chain blocking operations on the returned {@link CompletionStage}. See class javadoc for + * more information. + * + * @param statementTemplate text of a Neo4j statement + * @param statementParameters input data for the statement + * @return new {@link CompletionStage} that gets completed with a result cursor when query execution is successful. + * Stage can be completed exceptionally when error happens, e.g. connection can't be acquired from the pool. + */ CompletionStage runAsync( String statementTemplate, Record statementParameters ); /** @@ -154,6 +258,17 @@ public interface StatementRunner */ StatementResult run( String statementTemplate ); + /** + * Run a statement asynchronously and return a {@link CompletionStage} with a + * result cursor. + *

+ * It is not allowed to chain blocking operations on the returned {@link CompletionStage}. See class javadoc for + * more information. + * + * @param statementTemplate text of a Neo4j statement + * @return new {@link CompletionStage} that gets completed with a result cursor when query execution is successful. + * Stage can be completed exceptionally when error happens, e.g. connection can't be acquired from the pool. + */ CompletionStage runAsync( String statementTemplate ); /** @@ -161,6 +276,7 @@ public interface StatementRunner *

Example

*
      * {@code
+     *
      * Statement statement = new Statement( "MATCH (n) WHERE n.name={myNameParam} RETURN n.age" );
      * StatementResult cursor = session.run( statement.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
      * }
@@ -171,6 +287,33 @@ public interface StatementRunner
      */
     StatementResult run( Statement statement );
 
+    /**
+     * Run a statement asynchronously and return a {@link CompletionStage} with a
+     * result cursor.
+     * 

+ * This method takes a set of parameters that will be injected into the + * statement by Neo4j. Using parameters is highly encouraged, it helps avoid + * dangerous cypher injection attacks and improves database performance as + * Neo4j can re-use query plans more often. + *

+ * This version of runAsync takes a {@link Map} of parameters. The values in the map + * must be values that can be converted to Neo4j types. See {@link Values#parameters(Object...)} for + * a list of allowed types. + *

Example

+ *
+     * {@code
+     *
+     * Statement statement = new Statement( "MATCH (n) WHERE n.name={myNameParam} RETURN n.age" );
+     * CompletionStage cursorStage = session.runAsync(statement);
+     * }
+     * 
+ * It is not allowed to chain blocking operations on the returned {@link CompletionStage}. See class javadoc for + * more information. + * + * @param statement a Neo4j statement + * @return new {@link CompletionStage} that gets completed with a result cursor when query execution is successful. + * Stage can be completed exceptionally when error happens, e.g. connection can't be acquired from the pool. + */ CompletionStage runAsync( Statement statement ); /** diff --git a/driver/src/main/java/org/neo4j/driver/v1/Transaction.java b/driver/src/main/java/org/neo4j/driver/v1/Transaction.java index 7e4359a2f2..17a2c2a653 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Transaction.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Transaction.java @@ -19,16 +19,20 @@ package org.neo4j.driver.v1; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.Function; import org.neo4j.driver.v1.util.Resource; /** * Logical container for an atomic unit of work. - *

* A driver Transaction object corresponds to a server transaction. + *

+ * Blocking API: + *

* Transactions are typically wrapped in a try-with-resources block * which ensures that COMMIT or ROLLBACK - * occurs correctly on close.Note that ROLLBACK is the + * occurs correctly on close. Note that ROLLBACK is the * default action unless {@link #success()} is called before closing. *

  * {@code
@@ -39,6 +43,30 @@
  * }
  * }
  * 
+ * Blocking calls are: {@link #success()}, {@link #failure()}, {@link #close()} + * and various overloads of {@link #run(Statement)}. + *

+ * Asynchronous API: + *

+ * Transactions are typically obtained in a {@link CompletionStage} and all + * operations chain on this stage. Explicit commit with {@link #commitAsync()} + * or rollback with {@link #rollbackAsync()} is required. Without explicit + * commit/rollback corresponding transaction will remain open in the database. + *

+ * {@code
+ * session.beginTransactionAsync()
+ *        .thenCompose(tx ->
+ *               tx.runAsync("CREATE (a:Person {name: {x}})", parameters("x", "Alice"))
+ *                 .exceptionally(e -> {
+ *                    e.printStackTrace();
+ *                    return null;
+ *                 })
+ *                 .thenApply(ignore -> tx)
+ *        ).thenCompose(Transaction::commitAsync);
+ * }
+ * 
+ * Async calls are: {@link #commitAsync()}, {@link #rollbackAsync()} and various overloads of + * {@link #runAsync(Statement)}. * * @see Session#run * @see StatementRunner @@ -81,7 +109,41 @@ public interface Transaction extends Resource, StatementRunner @Override void close(); + /** + * Commit this transaction in asynchronous fashion. This operation is typically executed as part of the + * {@link CompletionStage} chain that starts with a transaction. It is logically equivalent to a combination of + * blocking {@link #success()} and {@link #close()}. However, it is asynchronous and returns new + * {@link CompletionStage}. There is no need to close transaction after calling this method. Transaction object + * should not be used after calling this method. + *

+ * Returned stage can be completed by an IO thread which should never block. Otherwise IO operations on this and + * potentially other network connections might deadlock. Please do not chain blocking operations like + * {@link #run(String)} on the returned stage. Driver will throw {@link IllegalStateException} when blocking API + * call is executed in IO thread. Consider using asynchronous calls throughout the chain or offloading blocking + * operation to a different {@link Executor}. This can be done using methods with "Async" suffix like + * {@link CompletionStage#thenApplyAsync(Function)} or {@link CompletionStage#thenApplyAsync(Function, Executor)}. + * + * @return new {@link CompletionStage} that gets completed with {@code null} when commit is successful. Stage can + * be completed exceptionally when commit fails. + */ CompletionStage commitAsync(); + /** + * Rollback this transaction in asynchronous fashion. This operation is typically executed as part of the + * {@link CompletionStage} chain that starts with a transaction. It is logically equivalent to a combination of + * blocking {@link #failure()} and {@link #close()}. However, it is asynchronous and returns new + * {@link CompletionStage}. There is no need to close transaction after calling this method. Transaction object + * should not be used after calling this method. + *

+ * Returned stage can be completed by an IO thread which should never block. Otherwise IO operations on this and + * potentially other network connections might deadlock. Please do not chain blocking operations like + * {@link #run(String)} on the returned stage. Driver will throw {@link IllegalStateException} when blocking API + * call is executed in IO thread. Consider using asynchronous calls throughout the chain or offloading blocking + * operation to a different {@link Executor}. This can be done using methods with "Async" suffix like + * {@link CompletionStage#thenApplyAsync(Function)} or {@link CompletionStage#thenApplyAsync(Function, Executor)}. + * + * @return new {@link CompletionStage} that gets completed with {@code null} when rollback is successful. Stage can + * be completed exceptionally when rollback fails. + */ CompletionStage rollbackAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java index 3fb54b45f2..9a8ead931a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java @@ -47,7 +47,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -59,8 +58,8 @@ public void shouldProtocolErrorWhenNoRecord() { // Given RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); RoutingProcedureResponse noRecordsResponse = newRoutingResponse(); @@ -88,8 +87,8 @@ public void shouldProtocolErrorWhenMoreThanOneRecord() { // Given RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } ); @@ -118,8 +117,8 @@ public void shouldProtocolErrorWhenUnparsableRecord() { // Given RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } ); @@ -149,8 +148,8 @@ public void shouldProtocolErrorWhenNoRouters() // Given RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); Clock mockedClock = mock( Clock.class ); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mockedClock, - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mockedClock, mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ @@ -185,8 +184,8 @@ public void shouldProtocolErrorWhenNoReaders() // Given RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); Clock mockedClock = mock( Clock.class ); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mockedClock, - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mockedClock, mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ @@ -221,8 +220,8 @@ public void shouldPropagateConnectionFailureExceptions() { // Given RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); when( mockedRunner.run( connectionStage ) ).thenReturn( failedFuture( @@ -247,8 +246,8 @@ public void shouldReturnSuccessResultWhenNoError() // Given Clock mockedClock = mock( Clock.class ); RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); - ClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( mockedClock, - DEV_NULL_LOGGER, mockedRunner ); + ClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mockedClock, mockedRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ @@ -282,8 +281,8 @@ public void shouldReturnFailureWhenProcedureRunnerFails() when( procedureRunner.run( any( CompletionStage.class ) ) ) .thenReturn( completedFuture( newRoutingResponse( error ) ) ); - RoutingProcedureClusterCompositionProvider provider = new RoutingProcedureClusterCompositionProvider( - mock( Clock.class ), DEV_NULL_LOGGER, procedureRunner ); + RoutingProcedureClusterCompositionProvider provider = + new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), procedureRunner ); CompletionStage connectionStage = completedFuture( mock( Connection.class ) ); ClusterCompositionResponse response = await( provider.getClusterComposition( connectionStage ) );