Skip to content

Commit 466be5d

Browse files
author
Zhen Li
committed
Throw helpful error message when nesting transactions inside one session.
Session only support one active transaction (either auto-commit or exmplicit transaction) at a given time. For async and blocking sessions, when we chaining a new transaction while there is an on-going auto-commit transaction, we can allow users to "nesting transactions" in one session by sending a pull all for the preceding auto-commit transaction. However we cannot do the same for reactive sessions as project reactive will error if the underlying publisher does not respect back pressure. Ensure helpful error message when accessing results after the statement runner where the results belongs to is closed.
1 parent d8bba74 commit 466be5d

35 files changed

+797
-313
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
import org.neo4j.driver.StatementRunner;
22+
23+
/**
24+
* A user is trying to access resources that are no longer valid due to
25+
* the resources have already been consumed or
26+
* the {@link StatementRunner} where the resources are created has already been closed.
27+
*/
28+
public class ResultConsumedException extends ClientException
29+
{
30+
public ResultConsumedException()
31+
{
32+
super( "Cannot access records on this result any more as the result has already been consumed " +
33+
"or the statement runner where the result is created has already been closed." );
34+
}
35+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
/**
22+
* This exception indicates a user is nesting new transaction with a on-going transaction (explicit and/or auto-commit).
23+
*/
24+
public class TransactionNestingException extends ClientException
25+
{
26+
public TransactionNestingException( String message )
27+
{
28+
super( message );
29+
}
30+
}

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222

2323
public interface FailableCursor
2424
{
25+
/**
26+
* Dispose this cursor by discarding all unconsumed records and returning failure if there is any to run and/or pulls.
27+
*/
2528
CompletionStage<Throwable> consumeAsync();
29+
30+
/**
31+
* Pulling all unconsumed records into memory and returning failure if there is any to run and/or pulls.
32+
*/
2633
CompletionStage<Throwable> failureAsync();
2734
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
3939
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
4040
import org.neo4j.driver.internal.logging.PrefixedLogger;
41+
import org.neo4j.driver.exceptions.TransactionNestingException;
4142
import org.neo4j.driver.internal.retry.RetryLogic;
4243
import org.neo4j.driver.internal.spi.Connection;
4344
import org.neo4j.driver.internal.spi.ConnectionProvider;
@@ -323,7 +324,7 @@ private CompletionStage<Void> ensureNoOpenTx( String errorMessage )
323324
{
324325
if ( tx != null )
325326
{
326-
throw new ClientException( errorMessage );
327+
throw new TransactionNestingException( errorMessage );
327328
}
328329
} );
329330
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
113113
@Override
114114
public CompletionStage<Throwable> consumeAsync()
115115
{
116-
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
116+
return summaryAsync().handle( ( summary, error ) -> error );
117117
}
118118

119119
@Override

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ public CompletionStage<AsyncStatementResultCursor> asyncResult()
6666
if ( waitForRunResponse )
6767
{
6868
// wait for response of RUN before proceeding
69-
return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
69+
return runHandler.runFuture().thenApply( ignore ->
70+
new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
7071
}
7172
else
7273
{
73-
return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
74+
return completedFuture( new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
7475
}
7576
}
7677

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cursor;
20+
21+
import java.util.List;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionStage;
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
27+
import org.neo4j.driver.Record;
28+
import org.neo4j.driver.exceptions.ResultConsumedException;
29+
import org.neo4j.driver.summary.ResultSummary;
30+
31+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
32+
import static org.neo4j.driver.internal.util.Futures.failedFuture;
33+
34+
public class DisposableAsyncStatementResultCursor implements AsyncStatementResultCursor
35+
{
36+
private final AsyncStatementResultCursor delegate;
37+
private boolean isDisposed;
38+
39+
public DisposableAsyncStatementResultCursor( AsyncStatementResultCursor delegate )
40+
{
41+
this.delegate = delegate;
42+
}
43+
44+
@Override
45+
public List<String> keys()
46+
{
47+
return delegate.keys();
48+
}
49+
50+
@Override
51+
public CompletionStage<ResultSummary> summaryAsync()
52+
{
53+
isDisposed = true;
54+
return delegate.summaryAsync();
55+
}
56+
57+
@Override
58+
public CompletionStage<Record> nextAsync()
59+
{
60+
return assertNotDisposed().thenCompose( ignored -> delegate.nextAsync() );
61+
}
62+
63+
@Override
64+
public CompletionStage<Record> peekAsync()
65+
{
66+
return assertNotDisposed().thenCompose( ignored -> delegate.peekAsync() );
67+
}
68+
69+
@Override
70+
public CompletionStage<Record> singleAsync()
71+
{
72+
return assertNotDisposed().thenCompose( ignored -> delegate.singleAsync() );
73+
}
74+
75+
@Override
76+
public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
77+
{
78+
return assertNotDisposed().thenCompose( ignored -> delegate.forEachAsync( action ) );
79+
}
80+
81+
@Override
82+
public CompletionStage<List<Record>> listAsync()
83+
{
84+
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync() );
85+
}
86+
87+
@Override
88+
public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
89+
{
90+
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync( mapFunction ) );
91+
}
92+
93+
@Override
94+
public CompletionStage<Throwable> consumeAsync()
95+
{
96+
isDisposed = true;
97+
return delegate.consumeAsync();
98+
}
99+
100+
@Override
101+
public CompletionStage<Throwable> failureAsync()
102+
{
103+
// This one does not dispose the result so that a user could still visit the buffered result after this method call.
104+
// This also does not assert not disposed so that this method can be called after summary.
105+
return delegate.failureAsync();
106+
}
107+
108+
private <T> CompletableFuture<T> assertNotDisposed()
109+
{
110+
if ( isDisposed )
111+
{
112+
return failedFuture( new ResultConsumedException() );
113+
}
114+
return completedWithNull();
115+
}
116+
117+
boolean isDisposed()
118+
{
119+
return this.isDisposed;
120+
}
121+
}

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import java.util.function.BiConsumer;
2626

2727
import org.neo4j.driver.Record;
28+
import org.neo4j.driver.exceptions.ResultConsumedException;
2829
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2930
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
31+
import org.neo4j.driver.exceptions.TransactionNestingException;
3032
import org.neo4j.driver.summary.ResultSummary;
3133

3234
public class RxStatementResultCursorImpl implements RxStatementResultCursor
@@ -37,6 +39,7 @@ public class RxStatementResultCursorImpl implements RxStatementResultCursor
3739
private final Throwable runResponseError;
3840
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
3941
private BiConsumer<Record,Throwable> recordConsumer;
42+
private boolean resultConsumed;
4043

4144
public RxStatementResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
4245
{
@@ -64,6 +67,10 @@ public List<String> keys()
6467
@Override
6568
public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
6669
{
70+
if ( resultConsumed )
71+
{
72+
throw new ResultConsumedException();
73+
}
6774
if ( isRecordConsumerInstalled() )
6875
{
6976
return;
@@ -100,19 +107,24 @@ public CompletionStage<Throwable> consumeAsync()
100107
@Override
101108
public CompletionStage<Throwable> failureAsync()
102109
{
110+
if ( isRecordConsumerInstalled() && !isDone() )
111+
{
112+
throw new TransactionNestingException(
113+
"You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result." );
114+
}
103115
// It is safe to discard records as either the streaming has not started at all, or the streaming is fully finished.
104116
return consumeAsync();
105117
}
106118

107119
@Override
108120
public CompletionStage<ResultSummary> summaryAsync()
109121
{
110-
if ( !isDone() ) // the summary is called before record streaming
122+
if ( !isDone() && !resultConsumed ) // the summary is called before record streaming
111123
{
112124
installRecordConsumer( DISCARD_RECORD_CONSUMER );
113125
cancel();
126+
resultConsumed = true;
114127
}
115-
116128
return this.summaryFuture;
117129
}
118130

driver/src/main/java/org/neo4j/driver/internal/cursor/StatementResultCursorFactoryImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,12 @@ public CompletionStage<AsyncStatementResultCursor> asyncResult()
6969
if ( waitForRunResponse )
7070
{
7171
// wait for response of RUN before proceeding
72-
return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
72+
return runHandler.runFuture().thenApply(
73+
ignore -> new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
7374
}
7475
else
7576
{
76-
return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
77+
return completedFuture( new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
7778
}
7879
}
7980

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,14 @@ public synchronized CompletionStage<Record> nextAsync()
139139

140140
public synchronized CompletionStage<ResultSummary> summaryAsync()
141141
{
142+
records.clear();
142143
if ( isDone() )
143144
{
144-
records.clear();
145145
return completedWithValueIfNoFailure( summary );
146146
}
147147
else
148148
{
149149
cancel();
150-
records.clear();
151150
if ( summaryFuture == null )
152151
{
153152
summaryFuture = new CompletableFuture<>();

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.neo4j.driver.Statement;
2929
import org.neo4j.driver.TransactionConfig;
3030
import org.neo4j.driver.Bookmark;
31+
import org.neo4j.driver.exceptions.TransactionNestingException;
3132
import org.neo4j.driver.internal.async.NetworkSession;
3233
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
3334
import org.neo4j.driver.internal.util.Futures;
@@ -122,7 +123,8 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<Publisher<T>> work,
122123

123124
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<Publisher<T>> work, TransactionConfig config )
124125
{
125-
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, RxTransaction::commit, RxTransaction::rollback );
126+
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
127+
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null );
126128
return session.retryLogic().retryRx( repeatableWork );
127129
}
128130

@@ -168,11 +170,18 @@ private <T> void releaseConnectionBeforeReturning( CompletableFuture<T> returnFu
168170
// We failed to create a result cursor so we cannot rely on result cursor to cleanup resources.
169171
// Therefore we will first release the connection that might have been created in the session and then notify the error.
170172
// The logic here shall be the same as `SessionPullResponseHandler#afterFailure`.
171-
// The reason we need to release connection in session is that we do not have a `rxSession.close()`;
173+
// The reason we need to release connection in session is that we made `rxSession.close()` optional;
172174
// Otherwise, session.close shall handle everything for us.
173175
Throwable error = Futures.completionExceptionCause( completionError );
174-
session.releaseConnectionAsync().whenComplete( ( ignored, closeError ) ->
175-
returnFuture.completeExceptionally( Futures.combineErrors( error, closeError ) ) );
176+
if ( error instanceof TransactionNestingException )
177+
{
178+
returnFuture.completeExceptionally( error );
179+
}
180+
else
181+
{
182+
session.releaseConnectionAsync().whenComplete( ( ignored, closeError ) ->
183+
returnFuture.completeExceptionally( Futures.combineErrors( error, closeError ) ) );
184+
}
176185
}
177186

178187
@Override

0 commit comments

Comments
 (0)