Skip to content

Commit a3961b1

Browse files
committed
Add support for multiple tx result streams in reactive backend (neo4j#1085)
Prior to this update only a single result stream could exist at a time because publishing thread (event loop) used to be blocked for iterative consumption and, hence, could not be used for additional result streams. Now, the publishing thread will not be blocked and will be available for other result streams too. Skip reasons have been clarified for tests that required investigation. Testkit configs have been updated.
1 parent cae2b9d commit a3961b1

File tree

8 files changed

+269
-294
lines changed

8 files changed

+269
-294
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java

Lines changed: 0 additions & 87 deletions
This file was deleted.
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend;
20+
21+
import org.reactivestreams.Subscription;
22+
import reactor.core.publisher.BaseSubscriber;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.FluxSink;
25+
import reactor.core.publisher.Mono;
26+
import reactor.core.publisher.MonoSink;
27+
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
32+
import java.util.function.Supplier;
33+
34+
/**
35+
* Buffered subscriber for testing purposes.
36+
* <p>
37+
* It consumes incoming signals as soon as they arrive and prevents publishing thread from getting blocked.
38+
* <p>
39+
* The consumed signals can be retrieved one-by-one using {@link #next()}. It calls upstream {@link org.reactivestreams.Subscription#request(long)} with
40+
* configured fetch size only when next signal is requested and no signals are expected to be emitted either because they have not been requested yet or the
41+
* previous demand has been satisfied.
42+
*
43+
* @param <T>
44+
*/
45+
public class RxBufferedSubscriber<T> extends BaseSubscriber<T>
46+
{
47+
private final Lock lock = new ReentrantLock();
48+
private final long fetchSize;
49+
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
50+
private final FluxSink<T> itemsSink;
51+
private final OneSignalSubscriber<T> itemsSubscriber;
52+
private long pendingItems;
53+
private boolean nextInProgress;
54+
55+
public RxBufferedSubscriber( long fetchSize )
56+
{
57+
this.fetchSize = fetchSize;
58+
AtomicReference<FluxSink<T>> sinkRef = new AtomicReference<>();
59+
itemsSubscriber = new OneSignalSubscriber<>();
60+
Flux.<T>create( fluxSink ->
61+
{
62+
sinkRef.set( fluxSink );
63+
fluxSink.onRequest( ignored -> requestFromUpstream() );
64+
} ).subscribe( itemsSubscriber );
65+
itemsSink = sinkRef.get();
66+
}
67+
68+
/**
69+
* Returns a {@link Mono} of next signal from this subscription.
70+
* <p>
71+
* If necessary, a request with configured fetch size is made for more signals to be published.
72+
* <p>
73+
* <b>Only a single in progress request is supported at a time.</b> The returned {@link Mono} must succeed or error before next call is permitted.
74+
* <p>
75+
* Both empty successful completion and error completion indicate the completion of the subscribed publisher. This method must not be called after this.
76+
*
77+
* @return the {@link Mono} of next signal.
78+
*/
79+
public Mono<T> next()
80+
{
81+
executeWithLock( lock, () ->
82+
{
83+
if ( nextInProgress )
84+
{
85+
throw new IllegalStateException( "Only one in progress next is allowed at a time" );
86+
}
87+
return nextInProgress = true;
88+
} );
89+
return Mono.fromCompletionStage( subscriptionFuture )
90+
.then( Mono.create( itemsSubscriber::requestNext ) )
91+
.doOnSuccess( ignored -> executeWithLock( lock, () -> nextInProgress = false ) )
92+
.doOnError( ignored -> executeWithLock( lock, () -> nextInProgress = false ) );
93+
}
94+
95+
@Override
96+
protected void hookOnSubscribe( Subscription subscription )
97+
{
98+
subscriptionFuture.complete( subscription );
99+
}
100+
101+
@Override
102+
protected void hookOnNext( T value )
103+
{
104+
executeWithLock( lock, () -> pendingItems-- );
105+
itemsSink.next( value );
106+
}
107+
108+
@Override
109+
protected void hookOnComplete()
110+
{
111+
itemsSink.complete();
112+
}
113+
114+
@Override
115+
protected void hookOnError( Throwable throwable )
116+
{
117+
itemsSink.error( throwable );
118+
}
119+
120+
private void requestFromUpstream()
121+
{
122+
boolean moreItemsPending = executeWithLock( lock, () ->
123+
{
124+
boolean morePending;
125+
if ( pendingItems > 0 )
126+
{
127+
morePending = true;
128+
}
129+
else
130+
{
131+
pendingItems = fetchSize;
132+
morePending = false;
133+
}
134+
return morePending;
135+
} );
136+
if ( moreItemsPending )
137+
{
138+
return;
139+
}
140+
Subscription subscription = subscriptionFuture.getNow( null );
141+
if ( subscription == null )
142+
{
143+
throw new IllegalStateException( "Upstream subscription must not be null at this stage" );
144+
}
145+
subscription.request( fetchSize );
146+
}
147+
148+
public static <T> T executeWithLock( Lock lock, Supplier<T> supplier )
149+
{
150+
lock.lock();
151+
try
152+
{
153+
return supplier.get();
154+
}
155+
finally
156+
{
157+
lock.unlock();
158+
}
159+
}
160+
161+
private static class OneSignalSubscriber<T> extends BaseSubscriber<T>
162+
{
163+
private final Lock lock = new ReentrantLock();
164+
private MonoSink<T> sink;
165+
private boolean emitted;
166+
private boolean done;
167+
private Throwable throwable;
168+
169+
public void requestNext( MonoSink<T> sink )
170+
{
171+
boolean done = executeWithLock( lock, () ->
172+
{
173+
this.sink = sink;
174+
emitted = false;
175+
return this.done;
176+
} );
177+
178+
if ( done )
179+
{
180+
if ( throwable != null )
181+
{
182+
this.sink.error( throwable );
183+
}
184+
else
185+
{
186+
this.sink.success();
187+
}
188+
}
189+
else
190+
{
191+
upstream().request( 1 );
192+
}
193+
}
194+
195+
@Override
196+
protected void hookOnSubscribe( Subscription subscription )
197+
{
198+
// left empty to prevent requesting signals immediately
199+
}
200+
201+
@Override
202+
protected void hookOnNext( T value )
203+
{
204+
MonoSink<T> sink = executeWithLock( lock, () ->
205+
{
206+
emitted = true;
207+
return this.sink;
208+
} );
209+
sink.success( value );
210+
}
211+
212+
@Override
213+
protected void hookOnComplete()
214+
{
215+
MonoSink<T> sink = executeWithLock( lock, () ->
216+
{
217+
done = true;
218+
return !emitted ? this.sink : null;
219+
} );
220+
if ( sink != null )
221+
{
222+
sink.success();
223+
}
224+
}
225+
226+
@Override
227+
protected void hookOnError( Throwable throwable )
228+
{
229+
MonoSink<T> sink = executeWithLock( lock, () ->
230+
{
231+
done = true;
232+
this.throwable = throwable;
233+
return !emitted ? this.sink : null;
234+
} );
235+
if ( sink != null )
236+
{
237+
sink.error( throwable );
238+
}
239+
}
240+
}
241+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import lombok.Getter;
2222
import lombok.Setter;
23-
import neo4j.org.testkit.backend.RxBlockingSubscriber;
23+
import neo4j.org.testkit.backend.RxBufferedSubscriber;
2424

2525
import java.util.Optional;
2626
import java.util.concurrent.atomic.AtomicLong;
@@ -31,23 +31,21 @@
3131
public class RxResultHolder extends AbstractResultHolder<RxSessionHolder,RxTransactionHolder,RxResult>
3232
{
3333
@Setter
34-
private RxBlockingSubscriber<Record> subscriber;
34+
private RxBufferedSubscriber<Record> subscriber;
3535
@Getter
3636
private final AtomicLong requestedRecordsCounter = new AtomicLong();
3737

3838
public RxResultHolder( RxSessionHolder sessionHolder, RxResult result )
3939
{
4040
super( sessionHolder, result );
41-
sessionHolder.setResultHolder( this );
4241
}
4342

4443
public RxResultHolder( RxTransactionHolder transactionHolder, RxResult result )
4544
{
4645
super( transactionHolder, result );
47-
transactionHolder.getSessionHolder().setResultHolder( this );
4846
}
4947

50-
public Optional<RxBlockingSubscriber<Record>> getSubscriber()
48+
public Optional<RxBufferedSubscriber<Record>> getSubscriber()
5149
{
5250
return Optional.ofNullable( subscriber );
5351
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,13 @@
1818
*/
1919
package neo4j.org.testkit.backend.holder;
2020

21-
import lombok.Setter;
22-
23-
import java.util.Optional;
24-
2521
import org.neo4j.driver.SessionConfig;
2622
import org.neo4j.driver.reactive.RxSession;
2723

2824
public class RxSessionHolder extends AbstractSessionHolder<RxSession>
2925
{
30-
@Setter
31-
private RxResultHolder resultHolder;
32-
3326
public RxSessionHolder( DriverHolder driverHolder, RxSession session, SessionConfig config )
3427
{
3528
super( driverHolder, session, config );
3629
}
37-
38-
public Optional<RxResultHolder> getResultHolder()
39-
{
40-
return Optional.ofNullable( resultHolder );
41-
}
4230
}

0 commit comments

Comments
 (0)