Skip to content

Commit ba5e5ac

Browse files
authored
Merge pull request #639 from zhenlineo/4.0-nested-queries-with-reactive-session
Helpful error message when nesting transactions in the same session
2 parents 9b27d89 + d744902 commit ba5e5ac

File tree

90 files changed

+1100
-573
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+1100
-573
lines changed

driver/src/main/java/org/neo4j/driver/Statement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* @see Session
3737
* @see Transaction
3838
* @see StatementResult
39-
* @see StatementResult#summary()
39+
* @see StatementResult#consume()
4040
* @see ResultSummary
4141
* @since 1.0
4242
*/

driver/src/main/java/org/neo4j/driver/StatementResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,5 +147,5 @@ public interface StatementResult extends Iterator<Record>
147147
*
148148
* @return a summary for the whole query result.
149149
*/
150-
ResultSummary summary();
150+
ResultSummary consume();
151151
}

driver/src/main/java/org/neo4j/driver/StatementRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
*
4343
* <ul>
4444
* <li>Read from or discard a result, for instance via
45-
* {@link StatementResult#next()} or {@link StatementResult#summary()} </li>
45+
* {@link StatementResult#next()} or {@link StatementResult#consume()} </li>
4646
* <li>Explicitly commit/rollback a transaction using blocking {@link Transaction#close()} </li>
4747
* <li>Close a session using blocking {@link Session#close()}</li>
4848
* </ul>

driver/src/main/java/org/neo4j/driver/async/AsyncStatementRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
*
5252
* <ul>
5353
* <li>Read from or discard a result, for instance via
54-
* {@link StatementResultCursor#nextAsync()}, {@link StatementResultCursor#summaryAsync()}</li>
54+
* {@link StatementResultCursor#nextAsync()}, {@link StatementResultCursor#consumeAsync()}</li>
5555
* <li>Explicitly commit/rollback a transaction using {@link AsyncTransaction#commitAsync()}, {@link AsyncTransaction#rollbackAsync()}</li>
5656
* <li>Close a session using {@link AsyncSession#closeAsync()}</li>
5757
* </ul>

driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public interface StatementResultCursor
8080
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
8181
* completed exceptionally if query execution fails.
8282
*/
83-
CompletionStage<ResultSummary> summaryAsync();
83+
CompletionStage<ResultSummary> consumeAsync();
8484

8585
/**
8686
* Asynchronously navigate to and retrieve the next {@link Record} in this result. Returned stage can contain
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
import org.neo4j.driver.StatementRunner;
22+
23+
/**
24+
* A user is trying to access resources that are no longer valid due to
25+
* the resources have already been consumed or
26+
* the {@link StatementRunner} where the resources are created has already been closed.
27+
*/
28+
public class ResultConsumedException extends ClientException
29+
{
30+
public ResultConsumedException( String message )
31+
{
32+
super( message );
33+
}
34+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
/**
22+
* This exception indicates a user is nesting new transaction with a on-going transaction (explicit and/or auto-commit).
23+
*/
24+
public class TransactionNestingException extends ClientException
25+
{
26+
public TransactionNestingException( String message )
27+
{
28+
super( message );
29+
}
30+
}

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@
2222

2323
public interface FailableCursor
2424
{
25-
CompletionStage<Throwable> consumeAsync();
26-
CompletionStage<Throwable> failureAsync();
25+
/**
26+
* Discarding all unconsumed records and returning failure if there is any to run and/or pulls.
27+
*/
28+
CompletionStage<Throwable> discardAllFailureAsync();
29+
30+
/**
31+
* Pulling all unconsumed records into memory and returning failure if there is any to run and/or pulls.
32+
*/
33+
CompletionStage<Throwable> pullAllFailureAsync();
2734
}

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ public <T> List<T> list( Function<Record, T> mapFunction )
112112
}
113113

114114
@Override
115-
public ResultSummary summary()
115+
public ResultSummary consume()
116116
{
117-
return blockingGet( cursor.summaryAsync() );
117+
return blockingGet( cursor.consumeAsync() );
118118
}
119119

120120
@Override

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.neo4j.driver.internal.cursor.RxStatementResultCursor;
3939
import org.neo4j.driver.internal.cursor.StatementResultCursorFactory;
4040
import org.neo4j.driver.internal.logging.PrefixedLogger;
41+
import org.neo4j.driver.exceptions.TransactionNestingException;
4142
import org.neo4j.driver.internal.retry.RetryLogic;
4243
import org.neo4j.driver.internal.spi.Connection;
4344
import org.neo4j.driver.internal.spi.ConnectionProvider;
@@ -196,7 +197,7 @@ public CompletionStage<Void> closeAsync()
196197
if ( cursor != null )
197198
{
198199
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
199-
return cursor.consumeAsync();
200+
return cursor.discardAllFailureAsync();
200201
}
201202
// no result cursor exists so no error exists
202203
return completedWithNull();
@@ -254,7 +255,7 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
254255
return completedWithNull();
255256
}
256257
// make sure previous result is fully consumed and connection is released back to the pool
257-
return cursor.failureAsync();
258+
return cursor.pullAllFailureAsync();
258259
} ).thenCompose( error ->
259260
{
260261
if ( error == null )
@@ -323,7 +324,7 @@ private CompletionStage<Void> ensureNoOpenTx( String errorMessage )
323324
{
324325
if ( tx != null )
325326
{
326-
throw new ClientException( errorMessage );
327+
throw new TransactionNestingException( errorMessage );
327328
}
328329
} );
329330
}

driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ private static CompletionStage<Throwable> retrieveFailure( CompletionStage<? ext
7474
{
7575
return cursorStage
7676
.exceptionally( cursor -> null )
77-
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
77+
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.discardAllFailureAsync() );
7878
}
7979
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ public List<String> keys()
4949
}
5050

5151
@Override
52-
public CompletionStage<ResultSummary> summaryAsync()
52+
public CompletionStage<ResultSummary> consumeAsync()
5353
{
54-
return pullAllHandler.summaryAsync();
54+
return pullAllHandler.consumeAsync();
5555
}
5656

5757
@Override
@@ -95,7 +95,7 @@ public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
9595
{
9696
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
9797
internalForEachAsync( action, resultFuture );
98-
return resultFuture.thenCompose( ignore -> summaryAsync() );
98+
return resultFuture.thenCompose( ignore -> consumeAsync() );
9999
}
100100

101101
@Override
@@ -111,18 +111,17 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
111111
}
112112

113113
@Override
114-
public CompletionStage<Throwable> consumeAsync()
114+
public CompletionStage<Throwable> discardAllFailureAsync()
115115
{
116-
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
116+
return consumeAsync().handle( ( summary, error ) -> error );
117117
}
118118

119119
@Override
120-
public CompletionStage<Throwable> failureAsync()
120+
public CompletionStage<Throwable> pullAllFailureAsync()
121121
{
122-
return pullAllHandler.failureAsync();
122+
return pullAllHandler.pullAllFailureAsync();
123123
}
124124

125-
126125
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
127126
{
128127
CompletionStage<Record> recordFuture = nextAsync();

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ public CompletionStage<AsyncStatementResultCursor> asyncResult()
6666
if ( waitForRunResponse )
6767
{
6868
// wait for response of RUN before proceeding
69-
return runHandler.runFuture().thenApply( ignore -> new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
69+
return runHandler.runFuture().thenApply( ignore ->
70+
new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
7071
}
7172
else
7273
{
73-
return completedFuture( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) );
74+
return completedFuture( new DisposableAsyncStatementResultCursor( new AsyncStatementResultCursorImpl( runHandler, pullAllHandler ) ) );
7475
}
7576
}
7677

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cursor;
20+
21+
import java.util.List;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionStage;
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
27+
import org.neo4j.driver.Record;
28+
import org.neo4j.driver.summary.ResultSummary;
29+
30+
import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
31+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
32+
import static org.neo4j.driver.internal.util.Futures.failedFuture;
33+
34+
public class DisposableAsyncStatementResultCursor implements AsyncStatementResultCursor
35+
{
36+
private final AsyncStatementResultCursor delegate;
37+
private boolean isDisposed;
38+
39+
public DisposableAsyncStatementResultCursor( AsyncStatementResultCursor delegate )
40+
{
41+
this.delegate = delegate;
42+
}
43+
44+
@Override
45+
public List<String> keys()
46+
{
47+
return delegate.keys();
48+
}
49+
50+
@Override
51+
public CompletionStage<ResultSummary> consumeAsync()
52+
{
53+
isDisposed = true;
54+
return delegate.consumeAsync();
55+
}
56+
57+
@Override
58+
public CompletionStage<Record> nextAsync()
59+
{
60+
return assertNotDisposed().thenCompose( ignored -> delegate.nextAsync() );
61+
}
62+
63+
@Override
64+
public CompletionStage<Record> peekAsync()
65+
{
66+
return assertNotDisposed().thenCompose( ignored -> delegate.peekAsync() );
67+
}
68+
69+
@Override
70+
public CompletionStage<Record> singleAsync()
71+
{
72+
return assertNotDisposed().thenCompose( ignored -> delegate.singleAsync() );
73+
}
74+
75+
@Override
76+
public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
77+
{
78+
return assertNotDisposed().thenCompose( ignored -> delegate.forEachAsync( action ) );
79+
}
80+
81+
@Override
82+
public CompletionStage<List<Record>> listAsync()
83+
{
84+
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync() );
85+
}
86+
87+
@Override
88+
public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
89+
{
90+
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync( mapFunction ) );
91+
}
92+
93+
@Override
94+
public CompletionStage<Throwable> discardAllFailureAsync()
95+
{
96+
isDisposed = true;
97+
return delegate.discardAllFailureAsync();
98+
}
99+
100+
@Override
101+
public CompletionStage<Throwable> pullAllFailureAsync()
102+
{
103+
// This one does not dispose the result so that a user could still visit the buffered result after this method call.
104+
// This also does not assert not disposed so that this method can be called after summary.
105+
return delegate.pullAllFailureAsync();
106+
}
107+
108+
private <T> CompletableFuture<T> assertNotDisposed()
109+
{
110+
if ( isDisposed )
111+
{
112+
return failedFuture( newResultConsumedError() );
113+
}
114+
return completedWithNull();
115+
}
116+
117+
boolean isDisposed()
118+
{
119+
return this.isDisposed;
120+
}
121+
}

0 commit comments

Comments
 (0)