Skip to content

Commit ac3a78a

Browse files
committed
Add holder objects for driver, session, transaction and result objects in backend (neo4j#1001)
This update also removes hardcoded fetch size in reactive backend and adds support for sending additional PULL requests when there is more data to consume.
1 parent 979b13e commit ac3a78a

35 files changed

+947
-426
lines changed

testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,33 @@
1818
*/
1919
package neo4j.org.testkit.backend;
2020

21-
import lombok.Getter;
2221
import org.reactivestreams.Subscriber;
2322
import org.reactivestreams.Subscription;
2423

2524
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
2626

2727
public class RxBlockingSubscriber<T> implements Subscriber<T>
2828
{
29-
@Getter
3029
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
30+
private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();
3131
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;
3232

3333
public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
3434
{
3535
nextSignalConsumerFuture.complete( nextSignalConsumer );
3636
}
3737

38+
public CompletionStage<Subscription> getSubscriptionStage()
39+
{
40+
return subscriptionFuture;
41+
}
42+
43+
public CompletionStage<Void> getCompletionStage()
44+
{
45+
return completionFuture;
46+
}
47+
3848
@Override
3949
public void onSubscribe( Subscription s )
4050
{
@@ -51,13 +61,13 @@ public void onNext( T t )
5161
@Override
5262
public void onError( Throwable t )
5363
{
54-
blockUntilNextSignalConsumer().completeExceptionally( t );
64+
completionFuture.completeExceptionally( t );
5565
}
5666

5767
@Override
5868
public void onComplete()
5969
{
60-
blockUntilNextSignalConsumer().complete( null );
70+
completionFuture.complete( null );
6171
}
6272

6373
private CompletableFuture<T> blockUntilNextSignalConsumer()

testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java

Lines changed: 143 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,56 @@
1818
*/
1919
package neo4j.org.testkit.backend;
2020

21-
import lombok.AccessLevel;
2221
import lombok.Getter;
22+
import neo4j.org.testkit.backend.holder.AsyncSessionHolder;
23+
import neo4j.org.testkit.backend.holder.AsyncTransactionHolder;
24+
import neo4j.org.testkit.backend.holder.DriverHolder;
25+
import neo4j.org.testkit.backend.holder.ResultCursorHolder;
26+
import neo4j.org.testkit.backend.holder.ResultHolder;
27+
import neo4j.org.testkit.backend.holder.RxResultHolder;
28+
import neo4j.org.testkit.backend.holder.RxSessionHolder;
29+
import neo4j.org.testkit.backend.holder.RxTransactionHolder;
30+
import neo4j.org.testkit.backend.holder.SessionHolder;
31+
import neo4j.org.testkit.backend.holder.TransactionHolder;
2332
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
2433
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2534
import reactor.core.publisher.Mono;
2635

2736
import java.util.HashMap;
2837
import java.util.Map;
2938
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.CompletionStage;
3040
import java.util.concurrent.atomic.AtomicInteger;
3141
import java.util.function.Consumer;
3242

33-
import org.neo4j.driver.Driver;
34-
import org.neo4j.driver.Record;
35-
import org.neo4j.driver.Result;
36-
import org.neo4j.driver.Transaction;
37-
import org.neo4j.driver.async.AsyncTransaction;
38-
import org.neo4j.driver.async.ResultCursor;
3943
import org.neo4j.driver.exceptions.Neo4jException;
4044
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
41-
import org.neo4j.driver.reactive.RxResult;
42-
import org.neo4j.driver.reactive.RxTransaction;
4345

44-
@Getter
4546
public class TestkitState
4647
{
48+
private static final String DRIVER_NOT_FOUND_MESSAGE = "Could not find driver";
49+
private static final String SESSION_NOT_FOUND_MESSAGE = "Could not find session";
4750
private static final String TRANSACTION_NOT_FOUND_MESSAGE = "Could not find transaction";
51+
private static final String RESULT_NOT_FOUND_MESSAGE = "Could not find result";
4852

49-
private final Map<String,Driver> drivers = new HashMap<>();
53+
private final Map<String,DriverHolder> driverIdToDriverHolder = new HashMap<>();
54+
@Getter
5055
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
51-
private final Map<String,SessionState> sessionStates = new HashMap<>();
52-
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
53-
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
54-
private final Map<String,Result> results = new HashMap<>();
55-
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
56-
private final Map<String,RxResult> rxResults = new HashMap<>();
57-
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
58-
@Getter( AccessLevel.NONE )
59-
private final Map<String,Transaction> transactions = new HashMap<>();
60-
@Getter( AccessLevel.NONE )
61-
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
62-
@Getter( AccessLevel.NONE )
63-
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
56+
private final Map<String,SessionHolder> sessionIdToSessionHolder = new HashMap<>();
57+
private final Map<String,AsyncSessionHolder> sessionIdToAsyncSessionHolder = new HashMap<>();
58+
private final Map<String,RxSessionHolder> sessionIdToRxSessionHolder = new HashMap<>();
59+
private final Map<String,ResultHolder> resultIdToResultHolder = new HashMap<>();
60+
private final Map<String,ResultCursorHolder> resultIdToResultCursorHolder = new HashMap<>();
61+
private final Map<String,RxResultHolder> resultIdToRxResultHolder = new HashMap<>();
62+
private final Map<String,TransactionHolder> transactionIdToTransactionHolder = new HashMap<>();
63+
private final Map<String,AsyncTransactionHolder> transactionIdToAsyncTransactionHolder = new HashMap<>();
64+
private final Map<String,RxTransactionHolder> transactionIdToRxTransactionHolder = new HashMap<>();
65+
@Getter
6466
private final Map<String,Neo4jException> errors = new HashMap<>();
65-
@Getter( AccessLevel.NONE )
6667
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
68+
@Getter
6769
private final Consumer<TestkitResponse> responseWriter;
70+
@Getter
6871
private final Map<String,CompletableFuture<TestkitCallbackResult>> callbackIdToFuture = new HashMap<>();
6972

7073
public TestkitState( Consumer<TestkitResponse> responseWriter )
@@ -77,53 +80,140 @@ public String newId()
7780
return String.valueOf( idGenerator.getAndIncrement() );
7881
}
7982

80-
public String addTransaction( Transaction transaction )
83+
public void addDriverHolder( String id, DriverHolder driverHolder )
8184
{
82-
String id = newId();
83-
this.transactions.put( id, transaction );
84-
return id;
85+
driverIdToDriverHolder.put( id, driverHolder );
8586
}
8687

87-
public Transaction getTransaction( String id )
88+
public DriverHolder getDriverHolder( String id )
8889
{
89-
if ( !this.transactions.containsKey( id ) )
90-
{
91-
throw new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE );
92-
}
93-
return this.transactions.get( id );
90+
return get( id, driverIdToDriverHolder, DRIVER_NOT_FOUND_MESSAGE );
9491
}
9592

96-
public String addAsyncTransaction( AsyncTransaction transaction )
93+
public String addSessionHolder( SessionHolder sessionHolder )
9794
{
98-
String id = newId();
99-
this.asyncTransactions.put( id, transaction );
100-
return id;
95+
return add( sessionHolder, sessionIdToSessionHolder );
10196
}
10297

103-
public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
98+
public SessionHolder getSessionHolder( String id )
10499
{
105-
if ( !this.asyncTransactions.containsKey( id ) )
106-
{
107-
CompletableFuture<AsyncTransaction> future = new CompletableFuture<>();
108-
future.completeExceptionally( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
109-
return future;
110-
}
111-
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
100+
return get( id, sessionIdToSessionHolder, SESSION_NOT_FOUND_MESSAGE );
101+
}
102+
103+
public String addAsyncSessionHolder( AsyncSessionHolder sessionHolder )
104+
{
105+
return add( sessionHolder, sessionIdToAsyncSessionHolder );
106+
}
107+
108+
public CompletionStage<AsyncSessionHolder> getAsyncSessionHolder( String id )
109+
{
110+
return getAsync( id, sessionIdToAsyncSessionHolder, SESSION_NOT_FOUND_MESSAGE );
111+
}
112+
113+
public String addRxSessionHolder( RxSessionHolder sessionHolder )
114+
{
115+
return add( sessionHolder, sessionIdToRxSessionHolder );
116+
}
117+
118+
public Mono<RxSessionHolder> getRxSessionHolder( String id )
119+
{
120+
return getRx( id, sessionIdToRxSessionHolder, SESSION_NOT_FOUND_MESSAGE );
121+
}
122+
123+
public String addTransactionHolder( TransactionHolder transactionHolder )
124+
{
125+
return add( transactionHolder, transactionIdToTransactionHolder );
126+
}
127+
128+
public TransactionHolder getTransactionHolder( String id )
129+
{
130+
return get( id, transactionIdToTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
131+
}
132+
133+
public String addAsyncTransactionHolder( AsyncTransactionHolder transactionHolder )
134+
{
135+
return add( transactionHolder, transactionIdToAsyncTransactionHolder );
136+
}
137+
138+
public CompletionStage<AsyncTransactionHolder> getAsyncTransactionHolder( String id )
139+
{
140+
return getAsync( id, transactionIdToAsyncTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
141+
}
142+
143+
public String addRxTransactionHolder( RxTransactionHolder transactionHolder )
144+
{
145+
return add( transactionHolder, transactionIdToRxTransactionHolder );
146+
}
147+
148+
public Mono<RxTransactionHolder> getRxTransactionHolder( String id )
149+
{
150+
return getRx( id, transactionIdToRxTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE );
151+
}
152+
153+
public String addResultHolder( ResultHolder resultHolder )
154+
{
155+
return add( resultHolder, resultIdToResultHolder );
156+
}
157+
158+
public ResultHolder getResultHolder( String id )
159+
{
160+
return get( id, resultIdToResultHolder, RESULT_NOT_FOUND_MESSAGE );
161+
}
162+
163+
public String addAsyncResultHolder( ResultCursorHolder resultHolder )
164+
{
165+
return add( resultHolder, resultIdToResultCursorHolder );
166+
}
167+
168+
public CompletionStage<ResultCursorHolder> getAsyncResultHolder( String id )
169+
{
170+
return getAsync( id, resultIdToResultCursorHolder, RESULT_NOT_FOUND_MESSAGE );
112171
}
113172

114-
public String addRxTransaction( RxTransaction transaction )
173+
public String addRxResultHolder( RxResultHolder resultHolder )
174+
{
175+
return add( resultHolder, resultIdToRxResultHolder );
176+
}
177+
178+
public Mono<RxResultHolder> getRxResultHolder( String id )
179+
{
180+
return getRx( id, resultIdToRxResultHolder, RESULT_NOT_FOUND_MESSAGE );
181+
}
182+
183+
private <T> String add( T value, Map<String,T> idToT )
115184
{
116185
String id = newId();
117-
this.rxTransactions.put( id, transaction );
186+
idToT.put( id, value );
118187
return id;
119188
}
120189

121-
public Mono<RxTransaction> getRxTransaction( String id )
190+
private <T> T get( String id, Map<String,T> idToT, String notFoundMessage )
191+
{
192+
T value = idToT.get( id );
193+
if ( value == null )
194+
{
195+
throw new RuntimeException( notFoundMessage );
196+
}
197+
return value;
198+
}
199+
200+
private <T> CompletableFuture<T> getAsync( String id, Map<String,T> idToT, String notFoundMessage )
122201
{
123-
if ( !this.rxTransactions.containsKey( id ) )
202+
CompletableFuture<T> result = new CompletableFuture<>();
203+
T value = idToT.get( id );
204+
if ( value == null )
205+
{
206+
result.completeExceptionally( new RuntimeException( notFoundMessage ) );
207+
}
208+
else
124209
{
125-
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
210+
result.complete( value );
126211
}
127-
return Mono.just( rxTransactions.get( id ) );
212+
return result;
213+
}
214+
215+
private <T> Mono<T> getRx( String id, Map<String,T> idToT, String notFoundMessage )
216+
{
217+
return Mono.fromCompletionStage( getAsync( id, idToT, notFoundMessage ) );
128218
}
129219
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package neo4j.org.testkit.backend.holder;
20+
21+
import lombok.Getter;
22+
23+
import java.util.Optional;
24+
25+
public abstract class AbstractResultHolder<T1, T2 extends AbstractTransactionHolder<?,?>, T3>
26+
{
27+
private final T1 sessionHolder;
28+
private final T2 transactionHolder;
29+
@Getter
30+
private final T3 result;
31+
32+
public AbstractResultHolder( T1 sessionHolder, T3 result )
33+
{
34+
this.sessionHolder = sessionHolder;
35+
this.transactionHolder = null;
36+
this.result = result;
37+
}
38+
39+
public AbstractResultHolder( T2 transactionHolder, T3 result )
40+
{
41+
this.sessionHolder = null;
42+
this.transactionHolder = transactionHolder;
43+
this.result = result;
44+
}
45+
46+
public T1 getSessionHolder()
47+
{
48+
return transactionHolder != null ? getSessionHolder( transactionHolder ) : sessionHolder;
49+
}
50+
51+
public Optional<T2> getTransactionHolder()
52+
{
53+
return Optional.ofNullable( transactionHolder );
54+
}
55+
56+
protected abstract T1 getSessionHolder( T2 transactionHolder );
57+
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java renamed to testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractSessionHolder.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package neo4j.org.testkit.backend;
19+
package neo4j.org.testkit.backend.holder;
2020

2121
import lombok.Getter;
22+
import lombok.RequiredArgsConstructor;
2223
import lombok.Setter;
2324

2425
import java.util.concurrent.CompletableFuture;
2526

26-
import org.neo4j.driver.async.AsyncSession;
27+
import org.neo4j.driver.SessionConfig;
2728

29+
@RequiredArgsConstructor
2830
@Getter
29-
@Setter
30-
public class AsyncSessionState
31+
public abstract class AbstractSessionHolder<T>
3132
{
32-
public AsyncSession session;
33+
public final DriverHolder driverHolder;
34+
public final T session;
35+
public final SessionConfig config;
36+
@Setter
3337
public CompletableFuture<Void> txWorkFuture;
34-
35-
public AsyncSessionState( AsyncSession session )
36-
{
37-
this.session = session;
38-
}
3938
}

0 commit comments

Comments
 (0)