Skip to content

Helpful error message when nesting transactions in the same session #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion driver/src/main/java/org/neo4j/driver/Statement.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @see Session
* @see Transaction
* @see StatementResult
* @see StatementResult#summary()
* @see StatementResult#consume()
* @see ResultSummary
* @since 1.0
*/
Expand Down
2 changes: 1 addition & 1 deletion driver/src/main/java/org/neo4j/driver/StatementResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,5 @@ public interface StatementResult extends Iterator<Record>
*
* @return a summary for the whole query result.
*/
ResultSummary summary();
ResultSummary consume();
}
2 changes: 1 addition & 1 deletion driver/src/main/java/org/neo4j/driver/StatementRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* <ul>
* <li>Read from or discard a result, for instance via
* {@link StatementResult#next()} or {@link StatementResult#summary()} </li>
* {@link StatementResult#next()} or {@link StatementResult#consume()} </li>
* <li>Explicitly commit/rollback a transaction using blocking {@link Transaction#close()} </li>
* <li>Close a session using blocking {@link Session#close()}</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
*
* <ul>
* <li>Read from or discard a result, for instance via
* {@link StatementResultCursor#nextAsync()}, {@link StatementResultCursor#summaryAsync()}</li>
* {@link StatementResultCursor#nextAsync()}, {@link StatementResultCursor#consumeAsync()}</li>
* <li>Explicitly commit/rollback a transaction using {@link AsyncTransaction#commitAsync()}, {@link AsyncTransaction#rollbackAsync()}</li>
* <li>Close a session using {@link AsyncSession#closeAsync()}</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public interface StatementResultCursor
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
* completed exceptionally if query execution fails.
*/
CompletionStage<ResultSummary> summaryAsync();
CompletionStage<ResultSummary> consumeAsync();

/**
* Asynchronously navigate to and retrieve the next {@link Record} in this result. Returned stage can contain
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.exceptions;

import org.neo4j.driver.StatementRunner;

/**
* A user is trying to access resources that are no longer valid due to
* the resources have already been consumed or
* the {@link StatementRunner} where the resources are created has already been closed.
*/
public class ResultConsumedException extends ClientException
{
public ResultConsumedException( String message )
{
super( message );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.exceptions;

/**
* This exception indicates a user is nesting new transaction with a on-going transaction (explicit and/or auto-commit).
*/
public class TransactionNestingException extends ClientException
{
public TransactionNestingException( String message )
{
super( message );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@

public interface FailableCursor
{
CompletionStage<Throwable> consumeAsync();
CompletionStage<Throwable> failureAsync();
/**
* Discarding all unconsumed records and returning failure if there is any to run and/or pulls.
*/
CompletionStage<Throwable> discardAllFailureAsync();

/**
* Pulling all unconsumed records into memory and returning failure if there is any to run and/or pulls.
*/
CompletionStage<Throwable> pullAllFailureAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ public <T> List<T> list( Function<Record, T> mapFunction )
}

@Override
public ResultSummary summary()
public ResultSummary consume()
{
return blockingGet( cursor.summaryAsync() );
return blockingGet( cursor.consumeAsync() );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionProvider;
Expand Down Expand Up @@ -196,7 +197,7 @@ public CompletionStage<Void> closeAsync()
if ( cursor != null )
{
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
return cursor.consumeAsync();
return cursor.discardAllFailureAsync();
}
// no result cursor exists so no error exists
return completedWithNull();
Expand Down Expand Up @@ -254,7 +255,7 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
return completedWithNull();
}
// make sure previous result is fully consumed and connection is released back to the pool
return cursor.failureAsync();
return cursor.pullAllFailureAsync();
} ).thenCompose( error ->
{
if ( error == null )
Expand Down Expand Up @@ -323,7 +324,7 @@ private CompletionStage<Void> ensureNoOpenTx( String errorMessage )
{
if ( tx != null )
{
throw new ClientException( errorMessage );
throw new TransactionNestingException( errorMessage );
}
} );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ private static CompletionStage<Throwable> retrieveFailure( CompletionStage<? ext
{
return cursorStage
.exceptionally( cursor -> null )
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.discardAllFailureAsync() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public List<String> keys()
}

@Override
public CompletionStage<ResultSummary> summaryAsync()
public CompletionStage<ResultSummary> consumeAsync()
{
return pullAllHandler.summaryAsync();
return pullAllHandler.consumeAsync();
}

@Override
Expand Down Expand Up @@ -95,7 +95,7 @@ public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
{
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
internalForEachAsync( action, resultFuture );
return resultFuture.thenCompose( ignore -> summaryAsync() );
return resultFuture.thenCompose( ignore -> consumeAsync() );
}

@Override
Expand All @@ -111,18 +111,17 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
}

@Override
public CompletionStage<Throwable> consumeAsync()
public CompletionStage<Throwable> discardAllFailureAsync()
{
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
return consumeAsync().handle( ( summary, error ) -> error );
}

@Override
public CompletionStage<Throwable> failureAsync()
public CompletionStage<Throwable> pullAllFailureAsync()
{
return pullAllHandler.failureAsync();
return pullAllHandler.pullAllFailureAsync();
}


private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
{
CompletionStage<Record> recordFuture = nextAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ public CompletionStage<AsyncStatementResultCursor> asyncResult()
if ( waitForRunResponse )
{
// wait for response of RUN before proceeding
return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
return runHandler.runFuture().thenApply( ignore ->
new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
}
else
{
return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
return completedFuture( new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cursor;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;

import org.neo4j.driver.Record;
import org.neo4j.driver.summary.ResultSummary;

import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

public class DisposableAsyncStatementResultCursor implements AsyncStatementResultCursor
{
private final AsyncStatementResultCursor delegate;
private boolean isDisposed;

public DisposableAsyncStatementResultCursor( AsyncStatementResultCursor delegate )
{
this.delegate = delegate;
}

@Override
public List<String> keys()
{
return delegate.keys();
}

@Override
public CompletionStage<ResultSummary> consumeAsync()
{
isDisposed = true;
return delegate.consumeAsync();
}

@Override
public CompletionStage<Record> nextAsync()
{
return assertNotDisposed().thenCompose( ignored -> delegate.nextAsync() );
}

@Override
public CompletionStage<Record> peekAsync()
{
return assertNotDisposed().thenCompose( ignored -> delegate.peekAsync() );
}

@Override
public CompletionStage<Record> singleAsync()
{
return assertNotDisposed().thenCompose( ignored -> delegate.singleAsync() );
}

@Override
public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
{
return assertNotDisposed().thenCompose( ignored -> delegate.forEachAsync( action ) );
}

@Override
public CompletionStage<List<Record>> listAsync()
{
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync() );
}

@Override
public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
{
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync( mapFunction ) );
}

@Override
public CompletionStage<Throwable> discardAllFailureAsync()
{
isDisposed = true;
return delegate.discardAllFailureAsync();
}

@Override
public CompletionStage<Throwable> pullAllFailureAsync()
{
// This one does not dispose the result so that a user could still visit the buffered result after this method call.
// This also does not assert not disposed so that this method can be called after summary.
return delegate.pullAllFailureAsync();
}

private <T> CompletableFuture<T> assertNotDisposed()
{
if ( isDisposed )
{
return failedFuture( newResultConsumedError() );
}
return completedWithNull();
}

boolean isDisposed()
{
return this.isDisposed;
}
}
Loading