Skip to content

Commit c262f3e

Browse files
committed
Improve error propagation
When closing sessions and committing/rolling back transactions. Previously only last async cursor failure was propagated. This means successfully completed cursors could hide not consumed failures from previous queries. This commit makes session and transaction track all results and query them for not consumed errors when session is closed or transaction either committed or rolled back.
1 parent f8545b1 commit c262f3e

File tree

6 files changed

+281
-35
lines changed

6 files changed

+281
-35
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
2321
import java.util.Map;
2422
import java.util.concurrent.CompletableFuture;
2523
import java.util.concurrent.CompletionException;
@@ -29,14 +27,14 @@
2927

3028
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
3129
import org.neo4j.driver.internal.async.QueryRunner;
30+
import org.neo4j.driver.internal.async.ResultCursorsHolder;
3231
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
3332
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
3433
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
3534
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3635
import org.neo4j.driver.internal.spi.Connection;
3736
import org.neo4j.driver.internal.spi.ResponseHandler;
3837
import org.neo4j.driver.internal.types.InternalTypeSystem;
39-
import org.neo4j.driver.internal.util.Futures;
4038
import org.neo4j.driver.v1.Record;
4139
import org.neo4j.driver.v1.Session;
4240
import org.neo4j.driver.v1.Statement;
@@ -93,15 +91,16 @@ private enum State
9391

9492
private final Connection connection;
9593
private final NetworkSession session;
94+
private final ResultCursorsHolder resultCursors;
9695

97-
private final List<CompletionStage<InternalStatementResultCursor>> resultCursors = new ArrayList<>();
9896
private volatile Bookmark bookmark = Bookmark.empty();
9997
private volatile State state = State.ACTIVE;
10098

10199
public ExplicitTransaction( Connection connection, NetworkSession session )
102100
{
103101
this.connection = connection;
104102
this.session = session;
103+
this.resultCursors = new ResultCursorsHolder();
105104
}
106105

107106
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
@@ -178,8 +177,8 @@ else if ( state == State.TERMINATED )
178177
}
179178
else
180179
{
181-
return receiveFailures()
182-
.thenCompose( failure -> doCommitAsync().handle( handleCommitOrRollback( failure ) ) )
180+
return resultCursors.retrieveNotConsumedError()
181+
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
183182
.whenComplete( transactionClosed( State.COMMITTED ) );
184183
}
185184
}
@@ -203,8 +202,8 @@ else if ( state == State.TERMINATED )
203202
}
204203
else
205204
{
206-
return receiveFailures()
207-
.thenCompose( failure -> doRollbackAsync().handle( handleCommitOrRollback( failure ) ) )
205+
return resultCursors.retrieveNotConsumedError()
206+
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
208207
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
209208
}
210209
}
@@ -279,17 +278,17 @@ public CompletionStage<StatementResultCursor> runAsync( Statement statement )
279278
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean asAsync )
280279
{
281280
ensureCanRunQueries();
282-
CompletionStage<InternalStatementResultCursor> result;
281+
CompletionStage<InternalStatementResultCursor> cursorStage;
283282
if ( asAsync )
284283
{
285-
result = QueryRunner.runAsAsync( connection, statement, this );
284+
cursorStage = QueryRunner.runAsAsync( connection, statement, this );
286285
}
287286
else
288287
{
289-
result = QueryRunner.runAsBlocking( connection, statement, this );
288+
cursorStage = QueryRunner.runAsBlocking( connection, statement, this );
290289
}
291-
resultCursors.add( result );
292-
return result;
290+
resultCursors.add( cursorStage );
291+
return cursorStage;
293292
}
294293

295294
private void ensureCanRunQueries()
@@ -391,11 +390,4 @@ private BiConsumer<Object,Throwable> transactionClosed( State newState )
391390
session.setBookmark( bookmark );
392391
};
393392
}
394-
395-
private CompletionStage<Throwable> receiveFailures()
396-
{
397-
return resultCursors.stream()
398-
.map( stage -> stage.thenCompose( InternalStatementResultCursor::failureAsync ) )
399-
.reduce( completedFuture( null ), Futures::firstNotNull );
400-
}
401393
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2828
import org.neo4j.driver.internal.async.QueryRunner;
29+
import org.neo4j.driver.internal.async.ResultCursorsHolder;
2930
import org.neo4j.driver.internal.logging.DelegatingLogger;
3031
import org.neo4j.driver.internal.retry.RetryLogic;
3132
import org.neo4j.driver.internal.spi.Connection;
@@ -59,12 +60,12 @@ public class NetworkSession implements Session
5960
private final ConnectionProvider connectionProvider;
6061
private final AccessMode mode;
6162
private final RetryLogic retryLogic;
63+
private final ResultCursorsHolder resultCursors;
6264
protected final Logger logger;
6365

6466
private volatile Bookmark bookmark = Bookmark.empty();
6567
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
6668
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
67-
private volatile CompletionStage<InternalStatementResultCursor> lastResultStage = completedFuture( null );
6869

6970
private final AtomicBoolean open = new AtomicBoolean( true );
7071

@@ -74,6 +75,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
7475
this.connectionProvider = connectionProvider;
7576
this.mode = mode;
7677
this.retryLogic = retryLogic;
78+
this.resultCursors = new ResultCursorsHolder();
7779
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
7880
}
7981

@@ -161,9 +163,7 @@ public CompletionStage<Void> closeAsync()
161163
{
162164
if ( open.compareAndSet( true, false ) )
163165
{
164-
return lastResultStage
165-
.exceptionally( error -> null ) // ignore connection acquisition failures
166-
.thenCompose( this::receiveFailure )
166+
return resultCursors.retrieveNotConsumedError()
167167
.thenCompose( error -> releaseResources().thenApply( ignore ->
168168
{
169169
Throwable queryError = Futures.completionErrorCause( error );
@@ -183,15 +183,6 @@ public CompletionStage<Void> closeAsync()
183183
return completedFuture( null );
184184
}
185185

186-
private CompletionStage<Throwable> receiveFailure( InternalStatementResultCursor cursor )
187-
{
188-
if ( cursor == null )
189-
{
190-
return completedFuture( null );
191-
}
192-
return cursor.failureAsync();
193-
}
194-
195186
@Override
196187
public Transaction beginTransaction()
197188
{
@@ -419,7 +410,7 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
419410
{
420411
ensureSessionIsOpen();
421412

422-
lastResultStage = ensureNoOpenTxBeforeRunningQuery()
413+
CompletionStage<InternalStatementResultCursor> cursorStage = ensureNoOpenTxBeforeRunningQuery()
423414
.thenCompose( ignore -> acquireConnection( mode ) )
424415
.thenCompose( connection ->
425416
{
@@ -433,7 +424,8 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
433424
}
434425
} );
435426

436-
return lastResultStage;
427+
resultCursors.add( cursorStage );
428+
return cursorStage;
437429
}
438430

439431
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Objects;
24+
import java.util.concurrent.CompletionStage;
25+
26+
import static java.util.concurrent.CompletableFuture.completedFuture;
27+
28+
public class ResultCursorsHolder
29+
{
30+
private final List<CompletionStage<InternalStatementResultCursor>> cursorStages = new ArrayList<>();
31+
32+
public void add( CompletionStage<InternalStatementResultCursor> cursorStage )
33+
{
34+
Objects.requireNonNull( cursorStage );
35+
cursorStages.add( cursorStage );
36+
}
37+
38+
public CompletionStage<Throwable> retrieveNotConsumedError()
39+
{
40+
return cursorStages.stream()
41+
.map( this::retrieveFailure )
42+
.reduce( completedFuture( null ), this::nonNullFailureFromEither );
43+
}
44+
45+
private CompletionStage<Throwable> retrieveFailure( CompletionStage<InternalStatementResultCursor> cursorStage )
46+
{
47+
return cursorStage
48+
.exceptionally( cursor -> null )
49+
.thenCompose( cursor -> cursor == null ? completedFuture( null ) : cursor.failureAsync() );
50+
}
51+
52+
private CompletionStage<Throwable> nonNullFailureFromEither( CompletionStage<Throwable> stage1,
53+
CompletionStage<Throwable> stage2 )
54+
{
55+
return stage1.thenCompose( value ->
56+
{
57+
if ( value != null )
58+
{
59+
return completedFuture( value );
60+
}
61+
return stage2;
62+
} );
63+
}
64+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import org.junit.Test;
22+
23+
import java.io.IOException;
24+
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.TimeoutException;
26+
27+
import org.neo4j.driver.internal.util.Futures;
28+
29+
import static java.util.concurrent.CompletableFuture.completedFuture;
30+
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertNull;
32+
import static org.junit.Assert.fail;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
35+
import static org.neo4j.driver.internal.util.Futures.getBlocking;
36+
37+
public class ResultCursorsHolderTest
38+
{
39+
@Test
40+
public void shouldReturnNoErrorWhenNoCursorStages()
41+
{
42+
ResultCursorsHolder holder = new ResultCursorsHolder();
43+
44+
Throwable error = getBlocking( holder.retrieveNotConsumedError() );
45+
assertNull( error );
46+
}
47+
48+
@Test
49+
public void shouldFailToAddNullCursorStage()
50+
{
51+
ResultCursorsHolder holder = new ResultCursorsHolder();
52+
53+
try
54+
{
55+
holder.add( null );
56+
fail( "Exception expected" );
57+
}
58+
catch ( NullPointerException e )
59+
{
60+
// expected
61+
}
62+
}
63+
64+
@Test
65+
public void shouldReturnNoErrorWhenCursorStagesHaveNoErrors()
66+
{
67+
ResultCursorsHolder holder = new ResultCursorsHolder();
68+
69+
holder.add( cursorWithoutError() );
70+
holder.add( cursorWithoutError() );
71+
holder.add( cursorWithoutError() );
72+
holder.add( cursorWithoutError() );
73+
74+
Throwable error = getBlocking( holder.retrieveNotConsumedError() );
75+
assertNull( error );
76+
}
77+
78+
@Test
79+
public void shouldNotReturnStageErrors()
80+
{
81+
ResultCursorsHolder holder = new ResultCursorsHolder();
82+
83+
holder.add( Futures.failedFuture( new RuntimeException( "Failed to acquire a connection" ) ) );
84+
holder.add( cursorWithoutError() );
85+
holder.add( cursorWithoutError() );
86+
holder.add( Futures.failedFuture( new IOException( "Failed to do IO" ) ) );
87+
88+
Throwable error = getBlocking( holder.retrieveNotConsumedError() );
89+
assertNull( error );
90+
}
91+
92+
@Test
93+
public void shouldReturnErrorWhenOneCursorFailed()
94+
{
95+
IOException error = new IOException( "IO failed" );
96+
ResultCursorsHolder holder = new ResultCursorsHolder();
97+
98+
holder.add( cursorWithoutError() );
99+
holder.add( cursorWithoutError() );
100+
holder.add( cursorWithError( error ) );
101+
holder.add( cursorWithoutError() );
102+
103+
Throwable retrievedError = getBlocking( holder.retrieveNotConsumedError() );
104+
assertEquals( error, retrievedError );
105+
}
106+
107+
@Test
108+
public void shouldReturnFirstError()
109+
{
110+
RuntimeException error1 = new RuntimeException( "Error 1" );
111+
IOException error2 = new IOException( "Error 2" );
112+
TimeoutException error3 = new TimeoutException( "Error 3" );
113+
ResultCursorsHolder holder = new ResultCursorsHolder();
114+
115+
holder.add( cursorWithoutError() );
116+
holder.add( cursorWithError( error1 ) );
117+
holder.add( cursorWithError( error2 ) );
118+
holder.add( cursorWithError( error3 ) );
119+
120+
assertEquals( error1, getBlocking( holder.retrieveNotConsumedError() ) );
121+
}
122+
123+
private CompletionStage<InternalStatementResultCursor> cursorWithoutError()
124+
{
125+
return cursorWithError( null );
126+
}
127+
128+
private CompletionStage<InternalStatementResultCursor> cursorWithError( Throwable error )
129+
{
130+
InternalStatementResultCursor cursor = mock( InternalStatementResultCursor.class );
131+
when( cursor.failureAsync() ).thenReturn( completedFuture( error ) );
132+
return completedFuture( cursor );
133+
}
134+
}

0 commit comments

Comments
 (0)