diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index 3c306d6802..ffb6852148 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -76,7 +76,8 @@ public NetworkSession( this.mode = mode; this.retryLogic = retryLogic; this.logging = logging; - this.log = new PrefixedLogger("[" + hashCode() + "]", logging.getLog(getClass())); + this.log = new PrefixedLogger( + "[" + Thread.currentThread().getName() + "][" + hashCode() + "]", logging.getLog(getClass())); this.bookmarkHolder = bookmarkHolder; CompletableFuture databaseNameFuture = databaseName .databaseName() @@ -267,7 +268,11 @@ private CompletionStage acquireConnection(AccessMode mode) { // there somehow is an existing open connection, this should not happen, just a precondition throw new IllegalStateException("Existing open connection detected"); } - return connectionProvider.acquireConnection(connectionContext.contextWithMode(mode)); + log.trace("connectionProvider.acquireConnection"); + return connectionProvider + .acquireConnection(connectionContext.contextWithMode(mode)) + .whenComplete( + (connection, error) -> log.trace("connectionProvider.acquireConnection finished")); }); connectionStage = newConnectionStage.exceptionally(error -> null); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 24bafffe01..a9c74c3585 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -23,7 +23,6 @@ import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.internal.util.Futures.futureCompletingConsumer; -import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; import java.util.Arrays; import java.util.EnumSet; @@ -34,7 +33,9 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.Query; import org.neo4j.driver.Session; @@ -93,6 +94,7 @@ private enum State { private CompletableFuture rollbackFuture; private Throwable causeOfTermination; private final Logging logging; + private final Logger log; public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize, Logging logging) { this(connection, bookmarkHolder, fetchSize, new ResultCursorsHolder(), logging); @@ -110,11 +112,14 @@ protected UnmanagedTransaction( this.resultCursors = resultCursors; this.fetchSize = fetchSize; this.logging = logging; + this.log = logging.getLog(getClass()); } public CompletionStage beginAsync(Bookmark initialBookmark, TransactionConfig config) { + logTrace("beginAsync"); return protocol.beginTransaction(connection, initialBookmark, config, logging) .handle((ignore, beginError) -> { + logTrace("beginAsync protocol.beginTransaction finished"); if (beginError != null) { if (beginError instanceof AuthorizationExpiredException) { connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION); @@ -166,10 +171,14 @@ public CompletionStage runRx(Query query) { } public boolean isOpen() { - return OPEN_STATES.contains(executeWithLock(lock, () -> state)); + logTrace("before isOpen lock"); + boolean result = OPEN_STATES.contains(executeWithLock(lock, () -> state)); + logTrace("after isOpen unlock"); + return result; } public void markTerminated(Throwable cause) { + logTrace("before markTerminated lock"); executeWithLock(lock, () -> { if (state == State.TERMINATED) { if (causeOfTermination != null) { @@ -180,6 +189,7 @@ public void markTerminated(Throwable cause) { causeOfTermination = cause; } }); + logTrace("after markTerminated unlock"); } private void addSuppressedWhenNotCaptured(Throwable currentCause, Throwable newCause) { @@ -197,6 +207,7 @@ public Connection connection() { } private void ensureCanRunQueries() { + logTrace("before ensureCanRunQueries lock"); executeWithLock(lock, () -> { if (state == State.COMMITTED) { throw new ClientException("Cannot run more queries in this transaction, it has been committed"); @@ -209,9 +220,11 @@ private void ensureCanRunQueries() { causeOfTermination); } }); + logTrace("after ensureCanRunQueries unlock"); } private CompletionStage doCommitAsync(Throwable cursorFailure) { + logTrace("before doCommitAsync lock"); ClientException exception = executeWithLock( lock, () -> state == State.TERMINATED @@ -220,15 +233,19 @@ private CompletionStage doCommitAsync(Throwable cursorFailure) { + "It has been rolled back either because of an error or explicit termination", cursorFailure != causeOfTermination ? causeOfTermination : null) : null); + logTrace("after doCommitAsync unlock"); return exception != null ? failedFuture(exception) : protocol.commitTransaction(connection).thenAccept(bookmarkHolder::setBookmark); } private CompletionStage doRollbackAsync() { - return executeWithLock(lock, () -> state) == State.TERMINATED + logTrace("before doRollbackAsync lock"); + CompletionStage result = executeWithLock(lock, () -> state) == State.TERMINATED ? completedWithNull() : protocol.rollbackTransaction(connection); + logTrace("after doRollbackAsync unlock"); + return result; } private static BiFunction handleCommitOrRollback(Throwable cursorFailure) { @@ -242,13 +259,18 @@ private static BiFunction handleCommitOrRollback(Throwabl } private void handleTransactionCompletion(boolean commitAttempt, Throwable throwable) { + logTrace(String.format( + "handleTransactionCompletion(commitAttempt=%b, throwable is null=%b)", + commitAttempt, throwable == null)); executeWithLock(lock, () -> { + logTrace("handleTransactionCompletion lock acquired"); if (commitAttempt && throwable == null) { state = State.COMMITTED; } else { state = State.ROLLED_BACK; } }); + logTrace("handleTransactionCompletion lock released"); if (throwable instanceof AuthorizationExpiredException) { connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION); } else if (throwable instanceof ConnectionReadTimeoutException) { @@ -256,42 +278,58 @@ private void handleTransactionCompletion(boolean commitAttempt, Throwable throwa } else { connection.release(); // release in background } + logTrace("handleTransactionCompletion finished"); } private CompletionStage closeAsync(boolean commit, boolean completeWithNullIfNotOpen) { + logTrace(String.format( + "closeAsync(commit=%b, completeWithNullIfNotOpen=%b) before lock", commit, completeWithNullIfNotOpen)); CompletionStage stage = executeWithLock(lock, () -> { + logTrace("closeAsync lock acquired"); CompletionStage resultStage = null; if (completeWithNullIfNotOpen && !isOpen()) { + logTrace("closeAsync will complete with null"); resultStage = completedWithNull(); } else if (state == State.COMMITTED) { + logTrace(String.format("closeAsync state=%s", state)); resultStage = failedFuture( new ClientException(commit ? CANT_COMMIT_COMMITTED_MSG : CANT_ROLLBACK_COMMITTED_MSG)); } else if (state == State.ROLLED_BACK) { + logTrace(String.format("closeAsync state=%s", state)); resultStage = failedFuture( new ClientException(commit ? CANT_COMMIT_ROLLED_BACK_MSG : CANT_ROLLBACK_ROLLED_BACK_MSG)); } else { + logTrace(String.format("closeAsync state=%s", state)); if (commit) { if (rollbackFuture != null) { + logTrace("closeAsync rollbackFuture not null"); resultStage = failedFuture(new ClientException(CANT_COMMIT_ROLLING_BACK_MSG)); } else if (commitFuture != null) { + logTrace("closeAsync commitFuture not null"); resultStage = commitFuture; } else { + logTrace("closeAsync initializing commitFuture"); commitFuture = new CompletableFuture<>(); } } else { if (commitFuture != null) { + logTrace("closeAsync commitFuture not null"); resultStage = failedFuture(new ClientException(CANT_ROLLBACK_COMMITTING_MSG)); } else if (rollbackFuture != null) { + logTrace("closeAsync rollbackFuture not null"); resultStage = rollbackFuture; } else { + logTrace("closeAsync initializing rollbackFuture"); rollbackFuture = new CompletableFuture<>(); } } } return resultStage; }); + logTrace("closeAsync lock released"); if (stage == null) { + logTrace("closeAsync stage is null"); CompletableFuture targetFuture; Function> targetAction; if (commit) { @@ -309,6 +347,118 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul stage = targetFuture; } + logTrace("closeAsync finished"); return stage; } + + private void executeWithLock(Lock lock, Runnable runnable) { + logTrace(String.format("[%d] Before lock acquisition", lock.hashCode())); + try { + lock.lock(); + } catch (Throwable t) { + log.error( + String.format( + "[%s][%d][%d] lock aquisition failed", + Thread.currentThread().getName(), hashCode(), lock.hashCode()), + t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + logTrace(String.format("[%d] After lock acquisition", lock.hashCode())); + + try { + logTrace(String.format("[%d] Before logic run", lock.hashCode())); + runnable.run(); + logTrace(String.format("[%d] After logic run", lock.hashCode())); + } catch (Throwable t) { + log.error( + String.format( + "[%s][%d][%d] logic run failed", + Thread.currentThread().getName(), hashCode(), lock.hashCode()), + t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } finally { + logTrace(String.format("[%d] Before lock release", lock.hashCode())); + try { + lock.unlock(); + } catch (Throwable t) { + log.error( + String.format( + "[%s][%d][%d] lock release failed", + Thread.currentThread().getName(), hashCode(), lock.hashCode()), + t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + logTrace(String.format("[%d] After lock release", lock.hashCode())); + } + } + + private T executeWithLock(Lock lock, Supplier supplier) { + logTrace(String.format("[%d] Before lock acquisition", lock.hashCode())); + try { + lock.lock(); + } catch (Throwable t) { + log.error( + String.format( + "[%s][%d][%d] lock aquisition failed", + Thread.currentThread().getName(), hashCode(), lock.hashCode()), + t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + logTrace(String.format("[%d] After lock acquisition", lock.hashCode())); + + try { + logTrace(String.format("[%d] Before logic run", lock.hashCode())); + T result = supplier.get(); + logTrace(String.format("[%d] After logic run", lock.hashCode())); + return result; + } catch (Throwable t) { + log.error( + String.format( + "[%s][%d][%d] logic run failed", + Thread.currentThread().getName(), hashCode(), lock.hashCode()), + t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } finally { + logTrace(String.format("[%d] Before lock release", lock.hashCode())); + try { + lock.unlock(); + } catch (Throwable t) { + log.error( + String.format( + "[%s][%d][%d] lock release failed", + Thread.currentThread().getName(), hashCode(), lock.hashCode()), + t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + logTrace(String.format("[%d] After lock release", lock.hashCode())); + } + } + + private void logTrace(String message) { + log.trace("[%s][%d] %s", Thread.currentThread().getName(), hashCode(), message); + } }