Skip to content

Commit e257197

Browse files
authored
Merge pull request #628 from zhenlineo/4.0-reactive-discard
Ensure that in-flight reactive result streams are DISCARDed
2 parents a9e2049 + 913fcc2 commit e257197

File tree

5 files changed

+72
-3
lines changed

5 files changed

+72
-3
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class RxStatementResultCursor implements Subscription, FailableCursor
4040
private final BasicPullResponseHandler pullHandler;
4141
private final Throwable runResponseError;
4242
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
43-
boolean isRecordHandlerInstalled = false;
43+
private boolean isRecordHandlerInstalled = false;
4444

4545
public RxStatementResultCursor( RunResponseHandler runHandler, BasicPullResponseHandler pullHandler )
4646
{

driver/src/main/java/org/neo4j/driver/reactive/RxStatementResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public interface RxStatementResult
4343
/**
4444
* Returns a cold publisher of keys.
4545
* <p>
46-
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and get executed.
46+
* When this publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query statement is sent to the server and executed.
4747
* This method does not start the record streaming nor publish query execution error.
4848
* To retrieve the execution result, either {@link #records()} or {@link #summary()} can be used.
4949
* {@link #records()} starts record streaming and reports query execution error.

driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,41 @@ void shouldBePossibleToRunMultipleStatementsWithoutWaiting( boolean commit )
177177
verifyCommittedOrRolledBack( commit );
178178
}
179179

180+
@ParameterizedTest
181+
@MethodSource( "commit" )
182+
void shouldRunStatementsOnResultPublish( boolean commit )
183+
{
184+
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
185+
186+
RxStatementResult cursor1 = tx.run( "CREATE (n:Person {name: 'Alice'}) RETURN n.name" );
187+
RxStatementResult cursor2 = tx.run( "CREATE (n:Person {name: 'Bob'}) RETURN n.name" );
188+
189+
// The execution order is the same as the record publishing order.
190+
List<Record> records = await( Flux.from( cursor2.records() ).concatWith( cursor1.records() ) );
191+
assertThat( records.size(), equalTo( 2 ) );
192+
assertThat( records.get( 0 ).get( "n.name" ).asString(), equalTo( "Bob" ) );
193+
assertThat( records.get( 1 ).get( "n.name" ).asString(), equalTo( "Alice" ) );
194+
195+
assertCanCommitOrRollback( commit, tx );
196+
}
197+
198+
@ParameterizedTest
199+
@MethodSource( "commit" )
200+
void shouldDiscardOnCommitOrRollback( boolean commit )
201+
{
202+
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
203+
RxStatementResult cursor = tx.run( "UNWIND [1,2,3,4] AS a RETURN a" );
204+
205+
// We only perform run without any pull
206+
await( Flux.from( cursor.keys() ) );
207+
// We shall perform a discard here and then commit/rollback
208+
assertCanCommitOrRollback( commit, tx );
209+
210+
// As a result the records size shall be 0.
211+
List<Record> records = await( Flux.from( cursor.records() ) );
212+
assertThat( records.size(), equalTo( 0 ) );
213+
}
214+
180215
@ParameterizedTest
181216
@MethodSource( "commit" )
182217
void shouldBePossibleToRunMultipleStatementsWithoutStreaming( boolean commit )
@@ -207,7 +242,6 @@ void shouldAllowRollbackAfterSingleWrongStatement()
207242
RxTransaction tx = await( Mono.from( session.beginTransaction() ) );
208243
assertFailToRunWrongStatement( tx );
209244
assertCanRollback( tx );
210-
211245
}
212246

213247
@Test

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.netty.channel.Channel;
2222
import org.junit.jupiter.api.Test;
2323
import org.mockito.ArgumentCaptor;
24+
import reactor.core.publisher.Flux;
25+
import reactor.test.StepVerifier;
2426

2527
import java.net.URI;
2628
import java.util.List;
@@ -43,6 +45,7 @@
4345
import org.neo4j.driver.internal.retry.RetrySettings;
4446
import org.neo4j.driver.internal.util.Clock;
4547
import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory;
48+
import org.neo4j.driver.reactive.RxSession;
4649
import org.neo4j.driver.util.StubServer;
4750

4851
import static java.util.Arrays.asList;
@@ -334,6 +337,25 @@ void shouldAllowDatabaseNameInBeginTransaction() throws Throwable
334337
}
335338
}
336339

340+
@Test
341+
void shouldDiscardIfPullNotFinished() throws Throwable
342+
{
343+
StubServer server = StubServer.start( "read_tx_v4_discard.script", 9001 );
344+
345+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG ) )
346+
{
347+
Flux<String> keys = Flux.using(
348+
driver::rxSession,
349+
session -> session.readTransaction( tx -> tx.run( "UNWIND [1,2,3,4] AS a RETURN a" ).keys() ),
350+
RxSession::close );
351+
StepVerifier.create( keys ).expectNext( "a" ).verifyComplete();
352+
}
353+
finally
354+
{
355+
assertEquals( 0, server.exitStatus() );
356+
}
357+
}
358+
337359
private static void testTxCloseErrorPropagation( String script, Consumer<Transaction> txAction, String expectedErrorMessage )
338360
throws Exception
339361
{
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
!: BOLT 4
2+
!: AUTO RESET
3+
!: AUTO HELLO
4+
!: AUTO GOODBYE
5+
6+
C: BEGIN { "mode": "r" }
7+
RUN "UNWIND [1,2,3,4] AS a RETURN a" {} {}
8+
S: SUCCESS {}
9+
SUCCESS {"t_first": 110, "fields": ["a"], "qid": 0}
10+
C: DISCARD {"qid": 0, "n": -1}
11+
S: SUCCESS {"type": "r", "t_last": 3, "db": "neo4j"}
12+
C: COMMIT
13+
S: SUCCESS {"bookmark": "e57085e2-727f-43f3-b632-7ec57978806e:117"}

0 commit comments

Comments
 (0)