Skip to content

Commit dbce261

Browse files
author
Zhen Li
committed
Refined the tx run methods using Flux.usingWhen
Added more IT for tx run with retries.
1 parent 2ca1f68 commit dbce261

File tree

4 files changed

+211
-84
lines changed

4 files changed

+211
-84
lines changed

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.reactivestreams.Publisher;
2222
import reactor.core.publisher.Flux;
23-
import reactor.core.publisher.Mono;
2423

2524
import java.util.Map;
2625
import java.util.concurrent.CompletableFuture;
@@ -122,14 +121,8 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<Publisher<T>> work,
122121

123122
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<Publisher<T>> work, TransactionConfig config )
124123
{
125-
Publisher<RxTransaction> publisher = beginTransaction( mode, config );
126-
Flux<T> txResult = Mono.from( publisher )
127-
.flatMapMany( tx -> Flux.from( work.execute( tx ) )
128-
.onErrorResume( error -> Mono.from( tx.rollback() ).then( Mono.error( error ) ) ) // if failed then we rollback and rethrow the error
129-
.concatWith( tx.commit() ) // if succeeded then we commit.
130-
);
131-
132-
return session.retryLogic().retryRx( txResult );
124+
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, RxTransaction::commit, RxTransaction::rollback );
125+
return session.retryLogic().retryRx( repeatableWork );
133126
}
134127

135128
@Override

driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,33 @@
2020

2121
import org.junit.jupiter.api.Test;
2222
import org.junit.jupiter.api.extension.RegisterExtension;
23+
import org.reactivestreams.Publisher;
2324
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Mono;
2426
import reactor.test.StepVerifier;
2527

28+
import java.util.Arrays;
29+
import java.util.Iterator;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
32+
import org.neo4j.driver.Session;
33+
import org.neo4j.driver.StatementResult;
34+
import org.neo4j.driver.exceptions.ClientException;
35+
import org.neo4j.driver.exceptions.DatabaseException;
36+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
37+
import org.neo4j.driver.exceptions.SessionExpiredException;
38+
import org.neo4j.driver.exceptions.TransientException;
2639
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
2740
import org.neo4j.driver.reactive.RxResult;
2841
import org.neo4j.driver.reactive.RxSession;
29-
import org.neo4j.driver.exceptions.ClientException;
42+
import org.neo4j.driver.reactive.RxTransaction;
43+
import org.neo4j.driver.reactive.RxTransactionWork;
3044
import org.neo4j.driver.util.DatabaseExtension;
3145
import org.neo4j.driver.util.ParallelizableIT;
3246

47+
import static java.util.Collections.emptyIterator;
48+
import static org.hamcrest.CoreMatchers.instanceOf;
49+
import static org.hamcrest.MatcherAssert.assertThat;
3350
import static org.junit.jupiter.api.Assertions.assertEquals;
3451
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
3552

@@ -74,4 +91,146 @@ void shouldBeAbleToReuseSessionAfterFailure()
7491
assertEquals( record.get("1").asLong(), 1L );
7592
} ).expectComplete().verify();
7693
}
94+
95+
@Test
96+
void shouldRunAsyncTransactionWithoutRetries()
97+
{
98+
RxSession session = neo4j.driver().rxSession();
99+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Apa) RETURN 42" );
100+
Publisher<Integer> publisher = session.writeTransaction( work );
101+
102+
StepVerifier.create( publisher ).expectNext( 42 ).verifyComplete();
103+
104+
assertEquals( 1, work.invocationCount() );
105+
assertEquals( 1, countNodesByLabel( "Apa" ) );
106+
}
107+
108+
@Test
109+
void shouldRunAsyncTransactionWithRetriesOnAsyncFailures()
110+
{
111+
RxSession session = neo4j.driver().rxSession();
112+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24" ).withAsyncFailures(
113+
new ServiceUnavailableException( "Oh!" ),
114+
new SessionExpiredException( "Ah!" ),
115+
new TransientException( "Code", "Message" ) );
116+
117+
Publisher<Integer> publisher = session.writeTransaction( work );
118+
StepVerifier.create( publisher ).expectNext( 24 ).verifyComplete();
119+
120+
assertEquals( 4, work.invocationCount() );
121+
assertEquals( 1, countNodesByLabel( "Node" ) );
122+
}
123+
124+
@Test
125+
void shouldRunAsyncTransactionWithRetriesOnSyncFailures()
126+
{
127+
RxSession session = neo4j.driver().rxSession();
128+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Test) RETURN 12" ).withSyncFailures(
129+
new TransientException( "Oh!", "Deadlock!" ),
130+
new ServiceUnavailableException( "Oh! Network Failure" ) );
131+
132+
Publisher<Integer> publisher = session.writeTransaction( work );
133+
StepVerifier.create( publisher ).expectNext( 12 ).verifyComplete();
134+
135+
assertEquals( 3, work.invocationCount() );
136+
assertEquals( 1, countNodesByLabel( "Test" ) );
137+
}
138+
139+
@Test
140+
void shouldRunAsyncTransactionThatCanNotBeRetried()
141+
{
142+
RxSession session = neo4j.driver().rxSession();
143+
InvocationTrackingWork work = new InvocationTrackingWork( "UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x" );
144+
Publisher<Integer> publisher = session.writeTransaction( work );
145+
146+
StepVerifier.create( publisher )
147+
.expectNext( 1 ).expectNext( 2 )
148+
.expectErrorSatisfies( error -> assertThat( error, instanceOf( ClientException.class ) ) )
149+
.verify();
150+
151+
assertEquals( 1, work.invocationCount() );
152+
assertEquals( 0, countNodesByLabel( "Hi" ) );
153+
}
154+
155+
@Test
156+
void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure()
157+
{
158+
RxSession session = neo4j.driver().rxSession();
159+
// first throw TransientException directly from work, retry can happen afterwards
160+
// then return a future failed with DatabaseException, retry can't happen afterwards
161+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Person) RETURN 1" )
162+
.withSyncFailures( new TransientException( "Oh!", "Deadlock!" ) )
163+
.withAsyncFailures( new DatabaseException( "Oh!", "OutOfMemory!" ) );
164+
Publisher<Integer> publisher = session.writeTransaction( work );
165+
166+
StepVerifier.create( publisher )
167+
.expectErrorSatisfies( e -> {
168+
assertThat( e, instanceOf( DatabaseException.class ) );
169+
assertEquals( 1, e.getSuppressed().length );
170+
assertThat( e.getSuppressed()[0], instanceOf( TransientException.class ) );
171+
} )
172+
.verify();
173+
174+
assertEquals( 2, work.invocationCount() );
175+
assertEquals( 0, countNodesByLabel( "Person" ) );
176+
}
177+
178+
private long countNodesByLabel( String label )
179+
{
180+
try ( Session session = neo4j.driver().session() )
181+
{
182+
StatementResult result = session.run( "MATCH (n:" + label + ") RETURN count(n)" );
183+
return result.single().get( 0 ).asLong();
184+
}
185+
}
186+
187+
private static class InvocationTrackingWork implements RxTransactionWork<Publisher<Integer>>
188+
{
189+
final String query;
190+
final AtomicInteger invocationCount;
191+
192+
Iterator<RuntimeException> asyncFailures = emptyIterator();
193+
Iterator<RuntimeException> syncFailures = emptyIterator();
194+
195+
InvocationTrackingWork( String query )
196+
{
197+
this.query = query;
198+
this.invocationCount = new AtomicInteger();
199+
}
200+
201+
InvocationTrackingWork withAsyncFailures( RuntimeException... failures )
202+
{
203+
asyncFailures = Arrays.asList( failures ).iterator();
204+
return this;
205+
}
206+
207+
InvocationTrackingWork withSyncFailures( RuntimeException... failures )
208+
{
209+
syncFailures = Arrays.asList( failures ).iterator();
210+
return this;
211+
}
212+
213+
int invocationCount()
214+
{
215+
return invocationCount.get();
216+
}
217+
218+
@Override
219+
public Publisher<Integer> execute( RxTransaction tx )
220+
{
221+
invocationCount.incrementAndGet();
222+
223+
if ( syncFailures.hasNext() )
224+
{
225+
throw syncFailures.next();
226+
}
227+
228+
if ( asyncFailures.hasNext() )
229+
{
230+
return Mono.error( asyncFailures.next() );
231+
}
232+
233+
return Flux.from( tx.run( query ).records() ).map( r -> r.get( 0 ).asInt() );
234+
}
235+
}
77236
}

0 commit comments

Comments
 (0)