Skip to content

Commit f5d21ff

Browse files
authored
Merge pull request #648 from zhenlineo/4.0-records-after-close
More tests to ensure the expected use pattern for reactive API.
2 parents c1a1856 + 53d417a commit f5d21ff

File tree

4 files changed

+163
-10
lines changed

4 files changed

+163
-10
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxStatementResult.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.reactive.RxStatementResult;
3535
import org.neo4j.driver.summary.ResultSummary;
3636

37+
import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
3738
import static reactor.core.publisher.FluxSink.OverflowStrategy.IGNORE;
3839

3940
public class InternalRxStatementResult implements RxStatementResult
@@ -61,7 +62,7 @@ public Publisher<Record> records()
6162
{
6263
if( cursor.isDone() )
6364
{
64-
sink.complete();
65+
sink.error( newResultConsumedError() );
6566
}
6667
else
6768
{

driver/src/test/java/org/neo4j/driver/integration/StatementRunnerCloseIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void tearDown()
6262
}
6363

6464
@Test
65-
void shouldErrorToAccessRecordsAfterSummary()
65+
void shouldErrorToAccessRecordsAfterConsume()
6666
{
6767
// Given
6868
StatementResult result = neo4j.driver().session().run("UNWIND [1,2] AS a RETURN a");
@@ -103,7 +103,7 @@ void shouldErrorToAccessRecordsAfterClose()
103103
}
104104

105105
@Test
106-
void shouldAllowSummaryAndKeysAfterSummary()
106+
void shouldAllowConsumeAndKeysAfterConsume()
107107
{
108108
// Given
109109
StatementResult result = neo4j.driver().session().run("UNWIND [1,2] AS a RETURN a");
@@ -141,7 +141,7 @@ void shouldAllowSummaryAndKeysAfterClose()
141141
}
142142

143143
@Test
144-
void shouldErrorToAccessRecordsAfterSummaryAsync()
144+
void shouldErrorToAccessRecordsAfterConsumeAsync()
145145
{
146146
// Given
147147
AsyncSession session = neo4j.driver().asyncSession();
@@ -178,7 +178,7 @@ void shouldErrorToAccessRecordsAfterCloseAsync()
178178
assertThrows( ResultConsumedException.class, () -> await( result.listAsync( record -> record ) ) ); }
179179

180180
@Test
181-
void shouldAllowSummaryAndKeysAfterSummaryAsync()
181+
void shouldAllowConsumeAndKeysAfterConsumeAsync()
182182
{
183183
// Given
184184
AsyncSession session = neo4j.driver().asyncSession();
@@ -198,7 +198,7 @@ void shouldAllowSummaryAndKeysAfterSummaryAsync()
198198
}
199199

200200
@Test
201-
void shouldAllowSummaryAndKeysAfterCloseAsync()
201+
void shouldAllowConsumeAndKeysAfterCloseAsync()
202202
{
203203
// Given
204204
AsyncSession session = neo4j.driver().asyncSession();

driver/src/test/java/org/neo4j/driver/integration/reactive/RxStatementResultIT.java

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.neo4j.driver.util.DatabaseExtension;
3737
import org.neo4j.driver.util.ParallelizableIT;
3838

39+
import static java.util.Collections.EMPTY_LIST;
3940
import static java.util.Collections.emptyList;
4041
import static java.util.Collections.singletonList;
4142
import static org.hamcrest.CoreMatchers.containsString;
@@ -302,6 +303,154 @@ void shouldStreamCorrectRecordsBackBeforeError()
302303
.verify();
303304
}
304305

306+
@Test
307+
void shouldErrorToAccessRecordAfterSessionClose()
308+
{
309+
// Given
310+
RxSession session = neo4j.driver().rxSession();
311+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
312+
313+
// When
314+
StepVerifier.create( Flux.from( session.close() ).thenMany( result.records() ) ).expectErrorSatisfies( error -> {
315+
assertThat( error.getMessage(), containsString( "session is already closed" ) );
316+
} ).verify();
317+
}
318+
319+
@Test
320+
void shouldErrorToAccessKeysAfterSessionClose()
321+
{
322+
// Given
323+
RxSession session = neo4j.driver().rxSession();
324+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
325+
326+
// When
327+
StepVerifier.create( Flux.from( session.close() ).thenMany( result.keys() ) ).expectErrorSatisfies( error -> {
328+
assertThat( error.getMessage(), containsString( "session is already closed" ) );
329+
} ).verify();
330+
}
331+
332+
@Test
333+
void shouldErrorToAccessSummaryAfterSessionClose()
334+
{
335+
// Given
336+
RxSession session = neo4j.driver().rxSession();
337+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
338+
339+
// When
340+
StepVerifier.create( Flux.from( session.close() ).thenMany( result.consume() ) ).expectErrorSatisfies( error -> {
341+
assertThat( error.getMessage(), containsString( "session is already closed" ) );
342+
} ).verify();
343+
}
344+
345+
@Test
346+
void shouldErrorToAccessRecordAfterTxClose()
347+
{
348+
// Given
349+
RxSession session = neo4j.driver().rxSession();
350+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
351+
352+
// When
353+
StepVerifier.create(
354+
Flux.from( session.beginTransaction() ).single()
355+
.flatMap( tx -> Flux.from( tx.rollback() ).singleOrEmpty().thenReturn( tx ) )
356+
.flatMapMany( tx -> tx.run( "UNWIND [1,2] AS a RETURN a" ).records() ) )
357+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Cannot run more statements" ) ) )
358+
.verify();
359+
}
360+
361+
@Test
362+
void shouldErrorToAccessKeysAfterTxClose()
363+
{
364+
// Given
365+
RxSession session = neo4j.driver().rxSession();
366+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
367+
368+
// When
369+
StepVerifier.create(
370+
Flux.from( session.beginTransaction() ).single()
371+
.flatMap( tx -> Flux.from( tx.rollback() ).singleOrEmpty().thenReturn( tx ) )
372+
.flatMapMany( tx -> tx.run( "UNWIND [1,2] AS a RETURN a" ).keys() ) )
373+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Cannot run more statements" ) ) )
374+
.verify();
375+
}
376+
377+
@Test
378+
void shouldErrorToAccessSummaryAfterTxClose()
379+
{
380+
// Given
381+
RxSession session = neo4j.driver().rxSession();
382+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
383+
384+
// When
385+
StepVerifier.create(
386+
Flux.from( session.beginTransaction() ).single()
387+
.flatMap( tx -> Flux.from( tx.rollback() ).singleOrEmpty().thenReturn( tx ) )
388+
.flatMapMany( tx -> tx.run( "UNWIND [1,2] AS a RETURN a" ).consume() ) )
389+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Cannot run more statements" ) ) )
390+
.verify();
391+
}
392+
393+
@Test
394+
void throwErrorAfterKeys()
395+
{
396+
// Given
397+
RxSession session = neo4j.driver().rxSession();
398+
RxStatementResult result = session.run( "UNWIND [1,2] AS a RETURN a" );
399+
400+
// When
401+
StepVerifier.create(
402+
Flux.from( session.beginTransaction() ).single()
403+
.flatMap( tx -> Flux.from( tx.rollback() ).singleOrEmpty().thenReturn( tx ) )
404+
.flatMapMany( tx -> tx.run( "UNWIND [1,2] AS a RETURN a" ).consume() ) )
405+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Cannot run more statements" ) ) )
406+
.verify();
407+
}
408+
409+
@Test
410+
void throwTheSameErrorWhenCallingConsumeMultipleTimes()
411+
{
412+
// Given
413+
RxSession session = neo4j.driver().rxSession();
414+
RxStatementResult result = session.run( "Invalid" );
415+
416+
// When
417+
StepVerifier.create( Flux.from( result.consume() ) )
418+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Invalid" ) ) )
419+
.verify();
420+
421+
StepVerifier.create( Flux.from( result.consume() ) )
422+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Invalid" ) ) )
423+
.verify();
424+
}
425+
426+
@Test
427+
void keysShouldNotReportRunError()
428+
{
429+
// Given
430+
RxSession session = neo4j.driver().rxSession();
431+
RxStatementResult result = session.run( "Invalid" );
432+
433+
// When
434+
StepVerifier.create( Flux.from( result.keys() ) ).expectNext( EMPTY_LIST ).verifyComplete();
435+
StepVerifier.create( Flux.from( result.keys() ) ).expectNext( EMPTY_LIST ).verifyComplete();
436+
}
437+
438+
@Test
439+
void throwResultConsumedErrorWhenCallingRecordsMultipleTimes()
440+
{
441+
// Given
442+
RxSession session = neo4j.driver().rxSession();
443+
RxStatementResult result = session.run( "Invalid" );
444+
445+
// When
446+
StepVerifier.create( Flux.from( result.records() ) )
447+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "Invalid" ) ) )
448+
.verify();
449+
450+
verifyRecordsAlreadyDiscarded( result );
451+
verifyRecordsAlreadyDiscarded( result );
452+
}
453+
305454
private void verifyCanAccessSummary( RxStatementResult res )
306455
{
307456
StepVerifier.create( res.consume() ).assertNext( summary -> {
@@ -313,8 +462,8 @@ private void verifyCanAccessSummary( RxStatementResult res )
313462

314463
private void verifyRecordsAlreadyDiscarded( RxStatementResult res )
315464
{
316-
StepVerifier.create( Flux.from( res.records() ).map( r -> r.get( "a" ).asInt() ) )
317-
.expectComplete()
465+
StepVerifier.create( Flux.from( res.records() ) )
466+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), containsString( "has already been consumed" ) ) )
318467
.verify();
319468
}
320469

driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.integration.reactive;
2020

21+
import org.hamcrest.CoreMatchers;
2122
import org.junit.jupiter.api.Assertions;
2223
import org.junit.jupiter.api.BeforeEach;
2324
import org.junit.jupiter.api.Test;
@@ -207,8 +208,10 @@ void shouldDiscardOnCommitOrRollback( boolean commit )
207208
assertCanCommitOrRollback( commit, tx );
208209

209210
// As a result the records size shall be 0.
210-
List<Record> records = await( Flux.from( cursor.records() ) );
211-
assertThat( records.size(), equalTo( 0 ) );
211+
212+
StepVerifier.create( Flux.from( cursor.records() ) )
213+
.expectErrorSatisfies( error -> assertThat( error.getMessage(), CoreMatchers.containsString( "has already been consumed" ) ) )
214+
.verify();
212215
}
213216

214217
@ParameterizedTest

0 commit comments

Comments
 (0)