Skip to content

Commit c02621f

Browse files
authored
Make reactive subscription request with Long.MAX_VALUE unbounded and update testkit backend (#1007)
* Add holder objects for driver, session, transaction and result objects in backend (#1001) This update also removes hardcoded fetch size in reactive backend and adds support for sending additional PULL requests when there is more data to consume. * Enable tests with custom fetch size in reactive backend (#1002) Excluding negative fetch size values. * Make reactive subscription request with Long.MAX_VALUE unbounded (#1003) * Make reactive subscription request with Long.MAX_VALUE unbounded This update ensures that unbounded reactive request is translated to unbounded Bolt request. * Update ListBasedPullHandler test class to support unbounded request * Enable unexpected interruption routing Testkit tests for reactive backend (#1004)
1 parent 979b13e commit c02621f

40 files changed

+994
-461
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public void installRecordConsumer( BiConsumer<Record,Throwable> recordConsumer )
8888
@Override
8989
public void request( long n )
9090
{
91+
if ( n == Long.MAX_VALUE )
92+
{
93+
n = -1;
94+
}
9195
pullHandler.request( n );
9296
}
9397

driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,21 @@ void shouldPull()
122122
verify( pullHandler ).request( 100 );
123123
}
124124

125+
@Test
126+
void shouldPullUnboundedOnLongMax()
127+
{
128+
// Given
129+
RunResponseHandler runHandler = newRunResponseHandler();
130+
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
131+
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
132+
133+
// When
134+
cursor.request( Long.MAX_VALUE );
135+
136+
// Then
137+
verify( pullHandler ).request( -1 );
138+
}
139+
125140
@Test
126141
void shouldCancel()
127142
{

driver/src/test/java/org/neo4j/driver/internal/reactive/util/ListBasedPullHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222
import java.util.Map;
2323

2424
import org.neo4j.driver.Query;
25+
import org.neo4j.driver.Record;
26+
import org.neo4j.driver.Value;
2527
import org.neo4j.driver.internal.handlers.PullResponseCompletionListener;
2628
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2729
import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler;
2830
import org.neo4j.driver.internal.spi.Connection;
2931
import org.neo4j.driver.internal.util.MetadataExtractor;
3032
import org.neo4j.driver.internal.util.QueryKeys;
3133
import org.neo4j.driver.internal.value.BooleanValue;
32-
import org.neo4j.driver.Record;
33-
import org.neo4j.driver.Value;
3434
import org.neo4j.driver.summary.ResultSummary;
3535

3636
import static java.util.Collections.emptyList;
@@ -81,7 +81,7 @@ private ListBasedPullHandler( List<Record> list, Throwable error )
8181
public void request( long n )
8282
{
8383
super.request( n );
84-
while ( index < list.size() && n-- > 0 )
84+
while ( index < list.size() && (n == -1 || n-- > 0) )
8585
{
8686
onRecord( list.get( index++ ).values().toArray( new Value[0] ) );
8787
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,33 @@
1818
*/
1919
package neo4j.org.testkit.backend;
2020

21-
import lombok.Getter;
2221
import org.reactivestreams.Subscriber;
2322
import org.reactivestreams.Subscription;
2423

2524
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
2626

2727
public class RxBlockingSubscriber<T> implements Subscriber<T>
2828
{
29-
@Getter
3029
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
30+
private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();
3131
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;
3232

3333
public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
3434
{
3535
nextSignalConsumerFuture.complete( nextSignalConsumer );
3636
}
3737

38+
public CompletionStage<Subscription> getSubscriptionStage()
39+
{
40+
return subscriptionFuture;
41+
}
42+
43+
public CompletionStage<Void> getCompletionStage()
44+
{
45+
return completionFuture;
46+
}
47+
3848
@Override
3949
public void onSubscribe( Subscription s )
4050
{
@@ -51,13 +61,13 @@ public void onNext( T t )
5161
@Override
5262
public void onError( Throwable t )
5363
{
54-
blockUntilNextSignalConsumer().completeExceptionally( t );
64+
completionFuture.completeExceptionally( t );
5565
}
5666

5767
@Override
5868
public void onComplete()
5969
{
60-
blockUntilNextSignalConsumer().complete( null );
70+
completionFuture.complete( null );
6171
}
6272

6373
private CompletableFuture<T> blockUntilNextSignalConsumer()

testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java

Lines changed: 143 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,56 @@
1818
*/
1919
package neo4j.org.testkit.backend;
2020

21-
import lombok.AccessLevel;
2221
import lombok.Getter;
22+
import neo4j.org.testkit.backend.holder.AsyncSessionHolder;
23+
import neo4j.org.testkit.backend.holder.AsyncTransactionHolder;
24+
import neo4j.org.testkit.backend.holder.DriverHolder;
25+
import neo4j.org.testkit.backend.holder.ResultCursorHolder;
26+
import neo4j.org.testkit.backend.holder.ResultHolder;
27+
import neo4j.org.testkit.backend.holder.RxResultHolder;
28+
import neo4j.org.testkit.backend.holder.RxSessionHolder;
29+
import neo4j.org.testkit.backend.holder.RxTransactionHolder;
30+
import neo4j.org.testkit.backend.holder.SessionHolder;
31+
import neo4j.org.testkit.backend.holder.TransactionHolder;
2332
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
2433
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2534
import reactor.core.publisher.Mono;
2635

2736
import java.util.HashMap;
2837
import java.util.Map;
2938
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.CompletionStage;
3040
import java.util.concurrent.atomic.AtomicInteger;
3141
import java.util.function.Consumer;
3242

33-
import org.neo4j.driver.Driver;
34-
import org.neo4j.driver.Record;
35-
import org.neo4j.driver.Result;
36-
import org.neo4j.driver.Transaction;
37-
import org.neo4j.driver.async.AsyncTransaction;
38-
import org.neo4j.driver.async.ResultCursor;
3943
import org.neo4j.driver.exceptions.Neo4jException;
4044
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
41-
import org.neo4j.driver.reactive.RxResult;
42-
import org.neo4j.driver.reactive.RxTransaction;
4345

44-
@Getter
4546
public class TestkitState
4647
{
48+
private static final String DRIVER_NOT_FOUND_MESSAGE = "Could not find driver";
49+
private static final String SESSION_NOT_FOUND_MESSAGE = "Could not find session";
4750
private static final String TRANSACTION_NOT_FOUND_MESSAGE = "Could not find transaction";
51+
private static final String RESULT_NOT_FOUND_MESSAGE = "Could not find result";
4852

49-
private final Map<String,Driver> drivers = new HashMap<>();
53+
private final Map<String,DriverHolder> driverIdToDriverHolder = new HashMap<>();
54+
@Getter
5055
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
51-
private final Map<String,SessionState> sessionStates = new HashMap<>();
52-
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
53-
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
54-
private final Map<String,Result> results = new HashMap<>();
55-
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
56-
private final Map<String,RxResult> rxResults = new HashMap<>();
57-
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
58-
@Getter( AccessLevel.NONE )
59-
private final Map<String,Transaction> transactions = new HashMap<>();
60-
@Getter( AccessLevel.NONE )
61-
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
62-
@Getter( AccessLevel.NONE )
63-
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
56+
private final Map<String,SessionHolder> sessionIdToSessionHolder = new HashMap<>();
57+
private final Map<String,AsyncSessionHolder> sessionIdToAsyncSessionHolder = new HashMap<>();
58+
private final Map<String,RxSessionHolder> sessionIdToRxSessionHolder = new HashMap<>();
59+
private final Map<String,ResultHolder> resultIdToResultHolder = new HashMap<>();
60+
private final Map<String,ResultCursorHolder> resultIdToResultCursorHolder = new HashMap<>();
61+
private final Map<String,RxResultHolder> resultIdToRxResultHolder = new HashMap<>();
62+
private final Map<String,TransactionHolder> transactionIdToTransactionHolder = new HashMap<>();
63+
private final Map<String,AsyncTransactionHolder> transactionIdToAsyncTransactionHolder = new HashMap<>();
64+
private final Map<String,RxTransactionHolder> transactionIdToRxTransactionHolder = new HashMap<>();
65+
@Getter
6466
private final Map<String,Neo4jException> errors = new HashMap<>();
65-
@Getter( AccessLevel.NONE )
6667
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
68+
@Getter
6769
private final Consumer<TestkitResponse> responseWriter;
70+
@Getter
6871
private final Map<String,CompletableFuture<TestkitCallbackResult>> callbackIdToFuture = new HashMap<>();
6972

7073
public TestkitState( Consumer<TestkitResponse> responseWriter )
@@ -77,53 +80,140 @@ public String newId()
7780
return String.valueOf( idGenerator.getAndIncrement() );
7881
}
7982

80-
public String addTransaction( Transaction transaction )
83+
public void addDriverHolder( String id, DriverHolder driverHolder )
8184
{
82-
String id = newId();
83-
this.transactions.put( id, transaction );
84-
return id;
85+
driverIdToDriverHolder.put( id, driverHolder );
8586
}
8687

87-
public Transaction getTransaction( String id )
88+
public DriverHolder getDriverHolder( String id )
8889
{
89-
if ( !this.transactions.containsKey( id ) )
90-
{
91-
throw new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE );
92-
}
93-
return this.transactions.get( id );
90+
return get( id, driverIdToDriverHolder, DRIVER_NOT_FOUND_MESSAGE );
9491
}
9592

96-
public String addAsyncTransaction( AsyncTransaction transaction )
93+
public String addSessionHolder( SessionHolder sessionHolder )
9794
{
98-
String id = newId();
99-
this.asyncTransactions.put( id, transaction );
100-
return id;
95+
return add( sessionHolder, sessionIdToSessionHolder );
10196
}
10297

103-
public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
98+
public SessionHolder getSessionHolder( String id )
10499
{
105-
if ( !this.asyncTransactions.containsKey( id ) )
106-
{
107-
CompletableFuture<AsyncTransaction> future = new CompletableFuture<>();
108-
future.completeExceptionally( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
109-
return future;
110-
}
111-
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
100+
return get( id, sessionIdToSessionHolder, SESSION_NOT_FOUND_MESSAGE );
101+
}
102+
103+
public String addAsyncSessionHolder( AsyncSessionHolder sessionHolder )
104+
{
105+
return add( sessionHolder, sessionIdToAsyncSessionHolder );
106+
}
107+
108+
public CompletionStage<AsyncSessionHolder> getAsyncSessionHolder( String id )
109+
{
110+
return getAsync( id, sessionIdToAsyncSessionHolder, SESSION_NOT_FOUND_MESSAGE );
111+
}
112+
113+
public String addRxSessionHolder( RxSessionHolder sessionHolder )
114+
{
115+
return add( sessionHolder, sessionIdToRxSessionHolder );
116+
}
117+
118+
public Mono<RxSessionHolder> getRxSessionHolder( String id )
119+
{
120+
return getRx( id, sessionIdToRxSessionHolder, SESSION_NOT_FOUND_MESSAGE );
121+
}
122+
123+
public String addTransactionHolder( TransactionHolder transactionHolder )
124+
{
125+
return add( transactionHolder, transactionIdToTransactionHolder );
126+
}
127+
128+
public TransactionHolder getTransactionHolder( String id )
129+
{
130+
return get( id, transactionIdToTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
131+
}
132+
133+
public String addAsyncTransactionHolder( AsyncTransactionHolder transactionHolder )
134+
{
135+
return add( transactionHolder, transactionIdToAsyncTransactionHolder );
136+
}
137+
138+
public CompletionStage<AsyncTransactionHolder> getAsyncTransactionHolder( String id )
139+
{
140+
return getAsync( id, transactionIdToAsyncTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
141+
}
142+
143+
public String addRxTransactionHolder( RxTransactionHolder transactionHolder )
144+
{
145+
return add( transactionHolder, transactionIdToRxTransactionHolder );
146+
}
147+
148+
public Mono<RxTransactionHolder> getRxTransactionHolder( String id )
149+
{
150+
return getRx( id, transactionIdToRxTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
151+
}
152+
153+
public String addResultHolder( ResultHolder resultHolder )
154+
{
155+
return add( resultHolder, resultIdToResultHolder );
156+
}
157+
158+
public ResultHolder getResultHolder( String id )
159+
{
160+
return get( id, resultIdToResultHolder, RESULT_NOT_FOUND_MESSAGE );
161+
}
162+
163+
public String addAsyncResultHolder( ResultCursorHolder resultHolder )
164+
{
165+
return add( resultHolder, resultIdToResultCursorHolder );
166+
}
167+
168+
public CompletionStage<ResultCursorHolder> getAsyncResultHolder( String id )
169+
{
170+
return getAsync( id, resultIdToResultCursorHolder, RESULT_NOT_FOUND_MESSAGE );
112171
}
113172

114-
public String addRxTransaction( RxTransaction transaction )
173+
public String addRxResultHolder( RxResultHolder resultHolder )
174+
{
175+
return add( resultHolder, resultIdToRxResultHolder );
176+
}
177+
178+
public Mono<RxResultHolder> getRxResultHolder( String id )
179+
{
180+
return getRx( id, resultIdToRxResultHolder, RESULT_NOT_FOUND_MESSAGE );
181+
}
182+
183+
private <T> String add( T value, Map<String,T> idToT )
115184
{
116185
String id = newId();
117-
this.rxTransactions.put( id, transaction );
186+
idToT.put( id, value );
118187
return id;
119188
}
120189

121-
public Mono<RxTransaction> getRxTransaction( String id )
190+
private <T> T get( String id, Map<String,T> idToT, String notFoundMessage )
191+
{
192+
T value = idToT.get( id );
193+
if ( value == null )
194+
{
195+
throw new RuntimeException( notFoundMessage );
196+
}
197+
return value;
198+
}
199+
200+
private <T> CompletableFuture<T> getAsync( String id, Map<String,T> idToT, String notFoundMessage )
122201
{
123-
if ( !this.rxTransactions.containsKey( id ) )
202+
CompletableFuture<T> result = new CompletableFuture<>();
203+
T value = idToT.get( id );
204+
if ( value == null )
205+
{
206+
result.completeExceptionally( new RuntimeException( notFoundMessage ) );
207+
}
208+
else
124209
{
125-
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
210+
result.complete( value );
126211
}
127-
return Mono.just( rxTransactions.get( id ) );
212+
return result;
213+
}
214+
215+
private <T> Mono<T> getRx( String id, Map<String,T> idToT, String notFoundMessage )
216+
{
217+
return Mono.fromCompletionStage( getAsync( id, idToT, notFoundMessage ) );
128218
}
129219
}

0 commit comments

Comments
 (0)