Skip to content

Commit 9720006

Browse files
committed
Update UnmanagedTransaction handling of pending query pulls
1 parent b0293bd commit 9720006

10 files changed

+67
-12
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.CompletionException;
2223
import java.util.concurrent.CompletionStage;
2324
import java.util.concurrent.atomic.AtomicBoolean;
@@ -82,7 +83,10 @@ public CompletionStage<ResultCursor> runAsync( Query query, TransactionConfig co
8283
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::asyncResult );
8384

8485
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
85-
return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
86+
return newResultCursorStage.thenCompose(
87+
cursor -> cursor.runError()
88+
.map( Futures::<ResultCursor>failedFuture )
89+
.orElseGet( () -> CompletableFuture.completedFuture( cursor ) ) );
8690
}
8791

8892
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config )

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Arrays;
2222
import java.util.EnumSet;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionException;
2425
import java.util.concurrent.CompletionStage;
2526
import java.util.function.BiFunction;
@@ -211,7 +212,10 @@ public CompletionStage<ResultCursor> runAsync( Query query )
211212
CompletionStage<AsyncResultCursor> cursorStage =
212213
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).asyncResult();
213214
resultCursors.add( cursorStage );
214-
return cursorStage.thenApply( cursor -> cursor );
215+
return cursorStage.thenCompose(
216+
cursor -> cursor.runError()
217+
.map( Futures::<ResultCursor>failedFuture )
218+
.orElseGet( () -> CompletableFuture.completedFuture( cursor ) ) );
215219
}
216220

217221
public CompletionStage<RxResultCursor> runRx(Query query)

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
*/
1919
package org.neo4j.driver.internal.cursor;
2020

21-
import org.neo4j.driver.internal.FailableCursor;
21+
import java.util.Optional;
22+
2223
import org.neo4j.driver.async.ResultCursor;
24+
import org.neo4j.driver.internal.FailableCursor;
2325

2426
public interface AsyncResultCursor extends ResultCursor, FailableCursor
2527
{
28+
Optional<Throwable> runError();
2629
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorImpl.java

+37-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cursor;
2020

2121
import java.util.List;
22+
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.Consumer;
@@ -33,11 +34,13 @@
3334

3435
public class AsyncResultCursorImpl implements AsyncResultCursor
3536
{
37+
private final Throwable runError;
3638
private final RunResponseHandler runHandler;
3739
private final PullAllResponseHandler pullAllHandler;
3840

39-
public AsyncResultCursorImpl(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
41+
public AsyncResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
4042
{
43+
this.runError = runError;
4144
this.runHandler = runHandler;
4245
this.pullAllHandler = pullAllHandler;
4346
}
@@ -113,13 +116,38 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
113116
@Override
114117
public CompletionStage<Throwable> discardAllFailureAsync()
115118
{
116-
return consumeAsync().handle( ( summary, error ) -> error );
119+
return consumeAsync().handle(
120+
( summary, consumeError ) ->
121+
{
122+
if ( runError != null )
123+
{
124+
if ( consumeError != null && runError != consumeError )
125+
{
126+
runError.addSuppressed( consumeError );
127+
}
128+
return runError;
129+
}
130+
return consumeError;
131+
} );
117132
}
118133

119134
@Override
120135
public CompletionStage<Throwable> pullAllFailureAsync()
121136
{
122-
return pullAllHandler.pullAllFailureAsync();
137+
return pullAllHandler.pullAllFailureAsync().handle(
138+
( ignored, pullAllError ) ->
139+
{
140+
if ( runError != null )
141+
{
142+
if ( pullAllError != null && runError != pullAllError )
143+
{
144+
runError.addSuppressed( pullAllError );
145+
}
146+
return runError;
147+
}
148+
return pullAllError;
149+
}
150+
);
123151
}
124152

125153
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
@@ -154,4 +182,10 @@ else if ( record != null )
154182
}
155183
} );
156184
}
185+
186+
@Override
187+
public Optional<Throwable> runError()
188+
{
189+
return Optional.ofNullable( runError );
190+
}
157191
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncResultCursorOnlyFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6464
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6565
pullAllHandler.prePopulateRecords();
6666

67-
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
67+
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
6868
}
6969

7070
public CompletionStage<RxResultCursor> rxResult()

driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableAsyncResultCursor.java

+7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cursor;
2020

2121
import java.util.List;
22+
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.Consumer;
@@ -118,4 +119,10 @@ boolean isDisposed()
118119
{
119120
return this.isDisposed;
120121
}
122+
123+
@Override
124+
public Optional<Throwable> runError()
125+
{
126+
return this.delegate.runError();
127+
}
121128
}

driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorFactoryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
6666
// only write and flush messages when async result is wanted.
6767
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6868
pullAllHandler.prePopulateRecords();
69-
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
69+
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
7070
}
7171

7272
@Override

driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import java.util.List;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.logging.Level;
3435

3536
import org.neo4j.driver.AccessMode;
3637
import org.neo4j.driver.AuthToken;
3738
import org.neo4j.driver.AuthTokens;
3839
import org.neo4j.driver.Config;
3940
import org.neo4j.driver.Driver;
4041
import org.neo4j.driver.GraphDatabase;
42+
import org.neo4j.driver.Logging;
4143
import org.neo4j.driver.Record;
4244
import org.neo4j.driver.Session;
4345
import org.neo4j.driver.TransactionWork;
@@ -104,7 +106,8 @@ void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionAsync() throws IOExc
104106
StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 );
105107
URI uri = URI.create( "neo4j://127.0.0.1:9001" );
106108

107-
Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() );
109+
Driver driver = GraphDatabase
110+
.driver( uri, Config.builder().withLogging( Logging.console( Level.FINE ) ).withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() );
108111
AsyncSession session = driver.asyncSession( builder().withDatabase( "mydatabase" ).build() );
109112
List<String> names = Futures.blockingGet( session.writeTransactionAsync(
110113
tx -> tx.runAsync( "RETURN 1" )

driver/src/test/java/org/neo4j/driver/internal/InternalResultTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ private Result createResult(int numberOfRecords )
373373
}
374374
pullAllHandler.onSuccess( emptyMap() );
375375

376-
AsyncResultCursor cursor = new AsyncResultCursorImpl( runHandler, pullAllHandler );
376+
AsyncResultCursor cursor = new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
377377
return new InternalResult( connection, new DisposableAsyncResultCursor( cursor ) );
378378
}
379379

driver/src/test/java/org/neo4j/driver/internal/async/AsyncResultCursorImplTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,12 @@ void shouldPropagateFailureInConsumeAsync()
404404

405405
private static AsyncResultCursorImpl newCursor(PullAllResponseHandler pullAllHandler )
406406
{
407-
return new AsyncResultCursorImpl( newRunResponseHandler(), pullAllHandler );
407+
return new AsyncResultCursorImpl( null, newRunResponseHandler(), pullAllHandler );
408408
}
409409

410410
private static AsyncResultCursorImpl newCursor(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
411411
{
412-
return new AsyncResultCursorImpl( runHandler, pullAllHandler );
412+
return new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
413413
}
414414

415415
private static RunResponseHandler newRunResponseHandler()

0 commit comments

Comments
 (0)