@@ -123,7 +123,7 @@ void shouldBePossibleToRunSingleStatementAndCommit()
123
123
Flux <Integer > ids = Flux .usingWhen ( session .beginTransaction (),
124
124
tx -> Flux .from ( tx .run ( "CREATE (n:Node {id: 42}) RETURN n" ).records () )
125
125
.map ( record -> record .get ( 0 ).asNode ().get ( "id" ).asInt () ),
126
- RxTransaction ::commit , RxTransaction :: rollback );
126
+ RxTransaction ::commit , ( tx , error ) -> tx . rollback (), null );
127
127
128
128
StepVerifier .create ( ids ).expectNext ( 42 ).verifyComplete ();
129
129
assertEquals ( 1 , countNodes ( 42 ) );
@@ -348,7 +348,7 @@ void shouldAllowRollbackAfterFailedCommit()
348
348
{
349
349
Flux <Record > records = Flux .usingWhen ( session .beginTransaction (),
350
350
tx -> Flux .from ( tx .run ( "WRONG" ).records () ),
351
- RxTransaction ::commit , RxTransaction :: rollback );
351
+ RxTransaction ::commit , ( tx , error ) -> tx . rollback (), null );
352
352
353
353
StepVerifier .create ( records ).verifyErrorSatisfies ( error ->
354
354
assertThat ( error .getMessage (), containsString ( "Invalid input" ) ) );
@@ -501,7 +501,7 @@ void shouldFailForEachWhenActionFails()
501
501
Flux <Record > records = Flux .usingWhen ( session .beginTransaction (),
502
502
tx -> Flux .from ( tx .run ( "RETURN 'Hi!'" ).records () ).doOnNext ( record -> { throw e ; } ),
503
503
RxTransaction ::commit ,
504
- RxTransaction :: rollback );
504
+ ( tx , error ) -> tx . rollback (), null );
505
505
506
506
StepVerifier .create ( records ).expectErrorSatisfies ( error -> assertEquals ( e , error ) ).verify ();
507
507
}
@@ -546,7 +546,7 @@ void shouldFailWhenListTransformationFunctionFails()
546
546
547
547
Flux <Object > records = Flux .usingWhen ( session .beginTransaction (),
548
548
tx -> Flux .from ( tx .run ( "RETURN 'Hi!'" ).records () ).map ( record -> { throw e ; } ),
549
- RxTransaction ::commit , RxTransaction :: rollback );
549
+ RxTransaction ::commit , ( tx , error ) -> tx . rollback (), null );
550
550
551
551
StepVerifier .create ( records ).expectErrorSatisfies ( error -> {
552
552
assertEquals ( e , error );
@@ -680,7 +680,7 @@ void shouldUpdateSessionBookmarkAfterCommit()
680
680
await ( Flux .usingWhen ( session .beginTransaction (),
681
681
tx -> tx .run ( "CREATE (:MyNode)" ).records (),
682
682
RxTransaction ::commit ,
683
- RxTransaction :: rollback ) );
683
+ ( tx , error ) -> tx . rollback (), null ) );
684
684
685
685
Bookmark bookmarkAfter = session .lastBookmark ();
686
686
@@ -795,7 +795,6 @@ private int countNodes( Object id )
795
795
796
796
private void testForEach ( String query , int expectedSeenRecords )
797
797
{
798
-
799
798
Flux <ResultSummary > summary = Flux .usingWhen ( session .beginTransaction (), tx -> {
800
799
RxStatementResult result = tx .run ( query );
801
800
AtomicInteger recordsSeen = new AtomicInteger ();
@@ -808,7 +807,7 @@ private void testForEach( String query, int expectedSeenRecords )
808
807
assertEquals ( emptyMap (), s .statement ().parameters ().asMap () );
809
808
assertEquals ( expectedSeenRecords , recordsSeen .get () );
810
809
} );
811
- }, RxTransaction ::commit , RxTransaction :: rollback );
810
+ }, RxTransaction ::commit , ( tx , error ) -> tx . rollback (), null );
812
811
813
812
StepVerifier .create ( summary ).expectNextCount ( 1 ).verifyComplete (); // we indeed get a summary.
814
813
}
@@ -820,7 +819,7 @@ private <T> void testList( String query, List<T> expectedList )
820
819
Flux <List <Record >> records = Flux .usingWhen ( session .beginTransaction (),
821
820
tx -> Flux .from ( tx .run ( query ).records () ).collectList (),
822
821
RxTransaction ::commit ,
823
- RxTransaction :: rollback );
822
+ ( tx , error ) -> tx . rollback (), null );
824
823
825
824
StepVerifier .create ( records .single () ).consumeNextWith ( allRecords -> {
826
825
for ( Record record : allRecords )
@@ -836,9 +835,7 @@ private void testConsume( String query )
836
835
{
837
836
Flux <ResultSummary > summary = Flux .usingWhen ( session .beginTransaction (), tx ->
838
837
tx .run ( query ).consume (),
839
- RxTransaction ::commit ,
840
- RxTransaction ::rollback
841
- );
838
+ RxTransaction ::commit , ( tx , error ) -> tx .rollback (), null );
842
839
843
840
StepVerifier .create ( summary .single () ).consumeNextWith ( Assertions ::assertNotNull ).verifyComplete ();
844
841
}
0 commit comments