Skip to content

Commit 605194f

Browse files
committed
Update handling of failed queries
1 parent ac17aa5 commit 605194f

22 files changed

+132
-75
lines changed

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
public interface FailableCursor
2424
{
2525
/**
26-
* Discarding all unconsumed records and returning failure if there is any to run and/or pulls.
26+
* Discarding all unconsumed records and returning failure if there is any pull errors.
2727
*/
2828
CompletionStage<Throwable> discardAllFailureAsync();
2929

3030
/**
31-
* Pulling all unconsumed records into memory and returning failure if there is any to run and/or pulls.
31+
* Pulling all unconsumed records into memory and returning failure if there is any pull errors.
3232
*/
3333
CompletionStage<Throwable> pullAllFailureAsync();
3434
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionException;
2223
import java.util.concurrent.CompletionStage;
2324
import java.util.concurrent.atomic.AtomicBoolean;
@@ -82,7 +83,10 @@ public CompletionStage<ResultCursor> runAsync( Query query, TransactionConfig co
8283
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::asyncResult );
8384

8485
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
85-
return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
86+
return newResultCursorStage.thenCompose(
87+
cursor -> cursor.runError()
88+
.map( Futures::<ResultCursor>failedFuture )
89+
.orElseGet( () -> CompletableFuture.completedFuture( cursor ) ) );
8690
}
8791

8892
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config )

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import java.util.Arrays;
2122
import java.util.EnumSet;
23+
import java.util.concurrent.CompletableFuture;
2224
import java.util.concurrent.CompletionException;
2325
import java.util.concurrent.CompletionStage;
2426
import java.util.function.BiFunction;
@@ -210,7 +212,10 @@ public CompletionStage<ResultCursor> runAsync( Query query )
210212
CompletionStage<AsyncResultCursor> cursorStage =
211213
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).asyncResult();
212214
resultCursors.add( cursorStage );
213-
return cursorStage.thenApply( cursor -> cursor );
215+
return cursorStage.thenCompose(
216+
cursor -> cursor.runError()
217+
.map( Futures::<ResultCursor>failedFuture )
218+
.orElseGet( () -> CompletableFuture.completedFuture( cursor ) ) );
214219
}
215220

216221
public CompletionStage<RxResultCursor> runRx(Query query)
@@ -229,7 +234,29 @@ public boolean isOpen()
229234

230235
public void markTerminated( Throwable cause )
231236
{
232-
state = StateHolder.terminatedWith( cause );
237+
if ( state.value == State.TERMINATED )
238+
{
239+
if ( state.causeOfTermination != null )
240+
{
241+
addSuppressedWhenNotCaptured( state.causeOfTermination, cause );
242+
}
243+
}
244+
else
245+
{
246+
state = StateHolder.terminatedWith( cause );
247+
}
248+
}
249+
250+
private void addSuppressedWhenNotCaptured( Throwable currentCause, Throwable newCause )
251+
{
252+
if ( currentCause != newCause )
253+
{
254+
boolean noneMatch = Arrays.stream( currentCause.getSuppressed() ).noneMatch( suppressed -> suppressed == newCause );
255+
if ( noneMatch )
256+
{
257+
currentCause.addSuppressed( newCause );
258+
}
259+
}
233260
}
234261

235262
public Connection connection()

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.cursor;
2020

21-
import org.neo4j.driver.internal.FailableCursor;
21+
import java.util.Optional;
22+
2223
import org.neo4j.driver.async.ResultCursor;
24+
import org.neo4j.driver.internal.FailableCursor;
2325

2426
public interface AsyncResultCursor extends ResultCursor, FailableCursor
2527
{
28+
Optional<Throwable> runError();
2629
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cursor;
2020

2121
import java.util.List;
22+
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.Consumer;
@@ -33,11 +34,13 @@
3334

3435
public class AsyncResultCursorImpl implements AsyncResultCursor
3536
{
37+
private final Throwable runError;
3638
private final RunResponseHandler runHandler;
3739
private final PullAllResponseHandler pullAllHandler;
3840

39-
public AsyncResultCursorImpl(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
41+
public AsyncResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
4042
{
43+
this.runError = runError;
4144
this.runHandler = runHandler;
4245
this.pullAllHandler = pullAllHandler;
4346
}
@@ -113,13 +116,14 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
113116
@Override
114117
public CompletionStage<Throwable> discardAllFailureAsync()
115118
{
116-
return consumeAsync().handle( ( summary, error ) -> error );
119+
return consumeAsync().handle( ( summary, error ) -> runError != null && runError == Futures.completionExceptionCause( error ) ? null : error );
117120
}
118121

119122
@Override
120123
public CompletionStage<Throwable> pullAllFailureAsync()
121124
{
122-
return pullAllHandler.pullAllFailureAsync();
125+
return pullAllHandler.pullAllFailureAsync()
126+
.thenApply( error -> runError != null && runError == Futures.completionExceptionCause( error ) ? null : error );
123127
}
124128

125129
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
@@ -154,4 +158,10 @@ else if ( record != null )
154158
}
155159
} );
156160
}
161+
162+
@Override
163+
public Optional<Throwable> runError()
164+
{
165+
return Optional.ofNullable( runError );
166+
}
157167
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6464
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6565
pullAllHandler.prePopulateRecords();
6666

67-
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
67+
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
6868
}
6969

7070
public CompletionStage<RxResultCursor> rxResult()

driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java

+7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cursor;
2020

2121
import java.util.List;
22+
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.Consumer;
@@ -118,4 +119,10 @@ boolean isDisposed()
118119
{
119120
return this.isDisposed;
120121
}
122+
123+
@Override
124+
public Optional<Throwable> runError()
125+
{
126+
return this.delegate.runError();
127+
}
121128
}

driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6666
// only write and flush messages when async result is wanted.
6767
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6868
pullAllHandler.prePopulateRecords();
69-
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
69+
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
7070
}
7171

7272
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,9 @@ public void onFailure( Throwable error )
6666
{
6767
tx.markTerminated( error );
6868
}
69-
else
69+
else if ( error instanceof AuthorizationExpiredException )
7070
{
71-
if ( error instanceof AuthorizationExpiredException )
72-
{
73-
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
74-
}
75-
else
76-
{
77-
connection.release();
78-
}
71+
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
7972
}
8073
runFuture.completeExceptionally( error );
8174
}

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ public void afterFailure( Throwable error )
4545
// always mark transaction as terminated because every error is "acknowledged" with a RESET message
4646
// so database forgets about the transaction after the first error
4747
// such transaction should not attempt to commit and can be considered as rolled back
48-
if ( tx.isOpen() )
49-
{
50-
tx.markTerminated( error );
51-
}
48+
tx.markTerminated( error );
5249
}
5350
}

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToCommitte
278278
void connectionUsedForSessionRunReturnedToThePoolWhenSessionClose()
279279
{
280280
Session session = driver.session();
281-
Result result = createNodes( 12, session );
281+
createNodes( 12, session );
282282

283283
Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
284284
verify( connection1, never() ).release();

driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import java.util.List;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.logging.Level;
3435

3536
import org.neo4j.driver.AccessMode;
3637
import org.neo4j.driver.AuthToken;
3738
import org.neo4j.driver.AuthTokens;
3839
import org.neo4j.driver.Config;
3940
import org.neo4j.driver.Driver;
4041
import org.neo4j.driver.GraphDatabase;
42+
import org.neo4j.driver.Logging;
4143
import org.neo4j.driver.Record;
4244
import org.neo4j.driver.Session;
4345
import org.neo4j.driver.TransactionWork;
@@ -104,7 +106,8 @@ void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionAsync() throws IOExc
104106
StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 );
105107
URI uri = URI.create( "neo4j://127.0.0.1:9001" );
106108

107-
Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() );
109+
Driver driver = GraphDatabase
110+
.driver( uri, Config.builder().withLogging( Logging.console( Level.FINE ) ).withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() );
108111
AsyncSession session = driver.asyncSession( builder().withDatabase( "mydatabase" ).build() );
109112
List<String> names = Futures.blockingGet( session.writeTransactionAsync(
110113
tx -> tx.runAsync( "RETURN 1" )

driver/src/test/java/org/neo4j/driver/internal/InternalResultTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ private Result createResult(int numberOfRecords )
373373
}
374374
pullAllHandler.onSuccess( emptyMap() );
375375

376-
AsyncResultCursor cursor = new AsyncResultCursorImpl( runHandler, pullAllHandler );
376+
AsyncResultCursor cursor = new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
377377
return new InternalResult( connection, new DisposableAsyncResultCursor( cursor ) );
378378
}
379379

driver/src/test/java/org/neo4j/driver/internal/async/AsyncResultCursorImplTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,12 @@ void shouldPropagateFailureInConsumeAsync()
404404

405405
private static AsyncResultCursorImpl newCursor(PullAllResponseHandler pullAllHandler )
406406
{
407-
return new AsyncResultCursorImpl( newRunResponseHandler(), pullAllHandler );
407+
return new AsyncResultCursorImpl( null, newRunResponseHandler(), pullAllHandler );
408408
}
409409

410410
private static AsyncResultCursorImpl newCursor(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
411411
{
412-
return new AsyncResultCursorImpl( runHandler, pullAllHandler );
412+
return new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
413413
}
414414

415415
private static RunResponseHandler newRunResponseHandler()

driver/src/test/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactoryTest.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@
3131
import org.neo4j.driver.internal.spi.Connection;
3232

3333
import static org.hamcrest.CoreMatchers.containsString;
34-
import static org.hamcrest.CoreMatchers.equalTo;
3534
import static org.hamcrest.CoreMatchers.instanceOf;
3635
import static org.junit.Assert.assertThat;
36+
import static org.junit.jupiter.api.Assertions.assertSame;
3737
import static org.junit.jupiter.api.Assertions.assertThrows;
38+
import static org.junit.jupiter.api.Assertions.assertTrue;
3839
import static org.mockito.ArgumentMatchers.any;
3940
import static org.mockito.Mockito.mock;
4041
import static org.mockito.Mockito.verify;
@@ -58,7 +59,7 @@ void shouldReturnAsyncResultWhenRunSucceeded()
5859
}
5960

6061
@Test
61-
void shouldFailAsyncResultWhenRunFailed()
62+
void shouldReturnAsyncResultWithRunErrorWhenRunFailed()
6263
{
6364
// Given
6465
Throwable error = new RuntimeException( "Hi there" );
@@ -68,8 +69,9 @@ void shouldFailAsyncResultWhenRunFailed()
6869
CompletionStage<AsyncResultCursor> cursorFuture = cursorFactory.asyncResult();
6970

7071
// Then
71-
CompletionException actual = assertThrows( CompletionException.class, () -> getNow( cursorFuture ) );
72-
assertThat( actual.getCause(), equalTo( error ) );
72+
AsyncResultCursor cursor = getNow( cursorFuture );
73+
assertTrue( cursor.runError().isPresent() );
74+
assertSame( error, cursor.runError().get() );
7375
}
7476

7577
@Test

driver/src/test/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImplTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.junit.jupiter.api.Test;
2222

2323
import java.util.concurrent.CompletableFuture;
24-
import java.util.concurrent.CompletionException;
2524
import java.util.concurrent.CompletionStage;
2625

2726
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
@@ -30,10 +29,10 @@
3029
import org.neo4j.driver.internal.messaging.Message;
3130
import org.neo4j.driver.internal.spi.Connection;
3231

33-
import static org.hamcrest.CoreMatchers.equalTo;
3432
import static org.hamcrest.CoreMatchers.instanceOf;
3533
import static org.junit.Assert.assertThat;
36-
import static org.junit.jupiter.api.Assertions.assertThrows;
34+
import static org.junit.jupiter.api.Assertions.assertSame;
35+
import static org.junit.jupiter.api.Assertions.assertTrue;
3736
import static org.mockito.ArgumentMatchers.any;
3837
import static org.mockito.Mockito.mock;
3938
import static org.mockito.Mockito.verify;
@@ -58,7 +57,7 @@ void shouldReturnAsyncResultWhenRunSucceeded()
5857
}
5958

6059
@Test
61-
void shouldFailAsyncResultWhenRunFailed()
60+
void shouldReturnAsyncResultWithRunErrorWhenRunFailed()
6261
{
6362
// Given
6463
Throwable error = new RuntimeException( "Hi there" );
@@ -68,8 +67,9 @@ void shouldFailAsyncResultWhenRunFailed()
6867
CompletionStage<AsyncResultCursor> cursorFuture = cursorFactory.asyncResult();
6968

7069
// Then
71-
CompletionException actual = assertThrows( CompletionException.class, () -> getNow( cursorFuture ) );
72-
assertThat( actual.getCause(), equalTo( error ) );
70+
AsyncResultCursor cursor = getNow( cursorFuture );
71+
assertTrue( cursor.runError().isPresent() );
72+
assertSame( error, cursor.runError().get() );
7373
}
7474

7575
@Test

driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void shouldMarkTxAndKeepConnectionAndFailOnFailure()
154154
}
155155

156156
@Test
157-
void shouldReleaseConnectionAndFailOnFailure()
157+
void shouldNotReleaseConnectionAndFailOnFailure()
158158
{
159159
CompletableFuture<Void> runFuture = new CompletableFuture<>();
160160
Connection connection = mock( Connection.class );
@@ -167,7 +167,7 @@ void shouldReleaseConnectionAndFailOnFailure()
167167
assertTrue( runFuture.isCompletedExceptionally() );
168168
Throwable actualException = assertThrows( Throwable.class, () -> await( runFuture ) );
169169
assertSame( throwable, actualException );
170-
verify( connection ).release();
170+
verify( connection, never() ).release();
171171
verify( connection, never() ).terminateAndRelease( any( String.class ) );
172172
}
173173

0 commit comments

Comments
 (0)