diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java index cde67f220d..ee875cc5b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java @@ -88,6 +88,10 @@ public void installRecordConsumer( BiConsumer recordConsumer ) @Override public void request( long n ) { + if ( n == Long.MAX_VALUE ) + { + n = -1; + } pullHandler.request( n ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java index ea9181b0c2..6d7d9242fa 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cursor/RxResultCursorImplTest.java @@ -122,6 +122,21 @@ void shouldPull() verify( pullHandler ).request( 100 ); } + @Test + void shouldPullUnboundedOnLongMax() + { + // Given + RunResponseHandler runHandler = newRunResponseHandler(); + PullResponseHandler pullHandler = mock( PullResponseHandler.class ); + RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler ); + + // When + cursor.request( Long.MAX_VALUE ); + + // Then + verify( pullHandler ).request( -1 ); + } + @Test void shouldCancel() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/util/ListBasedPullHandler.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/util/ListBasedPullHandler.java index 0651eb561b..b41d33e411 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/util/ListBasedPullHandler.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/util/ListBasedPullHandler.java @@ -22,6 +22,8 @@ import java.util.Map; import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; import org.neo4j.driver.internal.handlers.PullResponseCompletionListener; import org.neo4j.driver.internal.handlers.RunResponseHandler; import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler; @@ -29,8 +31,6 @@ import org.neo4j.driver.internal.util.MetadataExtractor; import org.neo4j.driver.internal.util.QueryKeys; import org.neo4j.driver.internal.value.BooleanValue; -import org.neo4j.driver.Record; -import org.neo4j.driver.Value; import org.neo4j.driver.summary.ResultSummary; import static java.util.Collections.emptyList; @@ -81,7 +81,7 @@ private ListBasedPullHandler( List list, Throwable error ) public void request( long n ) { super.request( n ); - while ( index < list.size() && n-- > 0 ) + while ( index < list.size() && (n == -1 || n-- > 0) ) { onRecord( list.get( index++ ).values().toArray( new Value[0] ) ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java index 7658123cf0..f8ca02c18d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java @@ -18,16 +18,16 @@ */ package neo4j.org.testkit.backend; -import lombok.Getter; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; public class RxBlockingSubscriber implements Subscriber { - @Getter private final CompletableFuture subscriptionFuture = new CompletableFuture<>(); + private final CompletableFuture completionFuture = new CompletableFuture<>(); private CompletableFuture> nextSignalConsumerFuture; public void setNextSignalConsumer( CompletableFuture nextSignalConsumer ) @@ -35,6 +35,16 @@ public void setNextSignalConsumer( CompletableFuture nextSignalConsumer ) nextSignalConsumerFuture.complete( nextSignalConsumer ); } + public CompletionStage getSubscriptionStage() + { + return subscriptionFuture; + } + + public CompletionStage getCompletionStage() + { + return completionFuture; + } + @Override public void onSubscribe( Subscription s ) { @@ -51,13 +61,13 @@ public void onNext( T t ) @Override public void onError( Throwable t ) { - blockUntilNextSignalConsumer().completeExceptionally( t ); + completionFuture.completeExceptionally( t ); } @Override public void onComplete() { - blockUntilNextSignalConsumer().complete( null ); + completionFuture.complete( null ); } private CompletableFuture blockUntilNextSignalConsumer() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index 33671d533a..ed10e9a02f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -18,8 +18,17 @@ */ package neo4j.org.testkit.backend; -import lombok.AccessLevel; import lombok.Getter; +import neo4j.org.testkit.backend.holder.AsyncSessionHolder; +import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; +import neo4j.org.testkit.backend.holder.DriverHolder; +import neo4j.org.testkit.backend.holder.ResultCursorHolder; +import neo4j.org.testkit.backend.holder.ResultHolder; +import neo4j.org.testkit.backend.holder.RxResultHolder; +import neo4j.org.testkit.backend.holder.RxSessionHolder; +import neo4j.org.testkit.backend.holder.RxTransactionHolder; +import neo4j.org.testkit.backend.holder.SessionHolder; +import neo4j.org.testkit.backend.holder.TransactionHolder; import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; @@ -27,44 +36,38 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import org.neo4j.driver.Driver; -import org.neo4j.driver.Record; -import org.neo4j.driver.Result; -import org.neo4j.driver.Transaction; -import org.neo4j.driver.async.AsyncTransaction; -import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; -import org.neo4j.driver.reactive.RxResult; -import org.neo4j.driver.reactive.RxTransaction; -@Getter public class TestkitState { + private static final String DRIVER_NOT_FOUND_MESSAGE = "Could not find driver"; + private static final String SESSION_NOT_FOUND_MESSAGE = "Could not find session"; private static final String TRANSACTION_NOT_FOUND_MESSAGE = "Could not find transaction"; + private static final String RESULT_NOT_FOUND_MESSAGE = "Could not find result"; - private final Map drivers = new HashMap<>(); + private final Map driverIdToDriverHolder = new HashMap<>(); + @Getter private final Map routingTableRegistry = new HashMap<>(); - private final Map sessionStates = new HashMap<>(); - private final Map asyncSessionStates = new HashMap<>(); - private final Map rxSessionStates = new HashMap<>(); - private final Map results = new HashMap<>(); - private final Map resultCursors = new HashMap<>(); - private final Map rxResults = new HashMap<>(); - private final Map> rxResultIdToRecordSubscriber = new HashMap<>(); - @Getter( AccessLevel.NONE ) - private final Map transactions = new HashMap<>(); - @Getter( AccessLevel.NONE ) - private final Map asyncTransactions = new HashMap<>(); - @Getter( AccessLevel.NONE ) - private final Map rxTransactions = new HashMap<>(); + private final Map sessionIdToSessionHolder = new HashMap<>(); + private final Map sessionIdToAsyncSessionHolder = new HashMap<>(); + private final Map sessionIdToRxSessionHolder = new HashMap<>(); + private final Map resultIdToResultHolder = new HashMap<>(); + private final Map resultIdToResultCursorHolder = new HashMap<>(); + private final Map resultIdToRxResultHolder = new HashMap<>(); + private final Map transactionIdToTransactionHolder = new HashMap<>(); + private final Map transactionIdToAsyncTransactionHolder = new HashMap<>(); + private final Map transactionIdToRxTransactionHolder = new HashMap<>(); + @Getter private final Map errors = new HashMap<>(); - @Getter( AccessLevel.NONE ) private final AtomicInteger idGenerator = new AtomicInteger( 0 ); + @Getter private final Consumer responseWriter; + @Getter private final Map> callbackIdToFuture = new HashMap<>(); public TestkitState( Consumer responseWriter ) @@ -77,53 +80,140 @@ public String newId() return String.valueOf( idGenerator.getAndIncrement() ); } - public String addTransaction( Transaction transaction ) + public void addDriverHolder( String id, DriverHolder driverHolder ) { - String id = newId(); - this.transactions.put( id, transaction ); - return id; + driverIdToDriverHolder.put( id, driverHolder ); } - public Transaction getTransaction( String id ) + public DriverHolder getDriverHolder( String id ) { - if ( !this.transactions.containsKey( id ) ) - { - throw new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ); - } - return this.transactions.get( id ); + return get( id, driverIdToDriverHolder, DRIVER_NOT_FOUND_MESSAGE ); } - public String addAsyncTransaction( AsyncTransaction transaction ) + public String addSessionHolder( SessionHolder sessionHolder ) { - String id = newId(); - this.asyncTransactions.put( id, transaction ); - return id; + return add( sessionHolder, sessionIdToSessionHolder ); } - public CompletableFuture getAsyncTransaction( String id ) + public SessionHolder getSessionHolder( String id ) { - if ( !this.asyncTransactions.containsKey( id ) ) - { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) ); - return future; - } - return CompletableFuture.completedFuture( asyncTransactions.get( id ) ); + return get( id, sessionIdToSessionHolder, SESSION_NOT_FOUND_MESSAGE ); + } + + public String addAsyncSessionHolder( AsyncSessionHolder sessionHolder ) + { + return add( sessionHolder, sessionIdToAsyncSessionHolder ); + } + + public CompletionStage getAsyncSessionHolder( String id ) + { + return getAsync( id, sessionIdToAsyncSessionHolder, SESSION_NOT_FOUND_MESSAGE ); + } + + public String addRxSessionHolder( RxSessionHolder sessionHolder ) + { + return add( sessionHolder, sessionIdToRxSessionHolder ); + } + + public Mono getRxSessionHolder( String id ) + { + return getRx( id, sessionIdToRxSessionHolder, SESSION_NOT_FOUND_MESSAGE ); + } + + public String addTransactionHolder( TransactionHolder transactionHolder ) + { + return add( transactionHolder, transactionIdToTransactionHolder ); + } + + public TransactionHolder getTransactionHolder( String id ) + { + return get( id, transactionIdToTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE ); + } + + public String addAsyncTransactionHolder( AsyncTransactionHolder transactionHolder ) + { + return add( transactionHolder, transactionIdToAsyncTransactionHolder ); + } + + public CompletionStage getAsyncTransactionHolder( String id ) + { + return getAsync( id, transactionIdToAsyncTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE ); + } + + public String addRxTransactionHolder( RxTransactionHolder transactionHolder ) + { + return add( transactionHolder, transactionIdToRxTransactionHolder ); + } + + public Mono getRxTransactionHolder( String id ) + { + return getRx( id, transactionIdToRxTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE ); + } + + public String addResultHolder( ResultHolder resultHolder ) + { + return add( resultHolder, resultIdToResultHolder ); + } + + public ResultHolder getResultHolder( String id ) + { + return get( id, resultIdToResultHolder, RESULT_NOT_FOUND_MESSAGE ); + } + + public String addAsyncResultHolder( ResultCursorHolder resultHolder ) + { + return add( resultHolder, resultIdToResultCursorHolder ); + } + + public CompletionStage getAsyncResultHolder( String id ) + { + return getAsync( id, resultIdToResultCursorHolder, RESULT_NOT_FOUND_MESSAGE ); } - public String addRxTransaction( RxTransaction transaction ) + public String addRxResultHolder( RxResultHolder resultHolder ) + { + return add( resultHolder, resultIdToRxResultHolder ); + } + + public Mono getRxResultHolder( String id ) + { + return getRx( id, resultIdToRxResultHolder, RESULT_NOT_FOUND_MESSAGE ); + } + + private String add( T value, Map idToT ) { String id = newId(); - this.rxTransactions.put( id, transaction ); + idToT.put( id, value ); return id; } - public Mono getRxTransaction( String id ) + private T get( String id, Map idToT, String notFoundMessage ) + { + T value = idToT.get( id ); + if ( value == null ) + { + throw new RuntimeException( notFoundMessage ); + } + return value; + } + + private CompletableFuture getAsync( String id, Map idToT, String notFoundMessage ) { - if ( !this.rxTransactions.containsKey( id ) ) + CompletableFuture result = new CompletableFuture<>(); + T value = idToT.get( id ); + if ( value == null ) + { + result.completeExceptionally( new RuntimeException( notFoundMessage ) ); + } + else { - return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) ); + result.complete( value ); } - return Mono.just( rxTransactions.get( id ) ); + return result; + } + + private Mono getRx( String id, Map idToT, String notFoundMessage ) + { + return Mono.fromCompletionStage( getAsync( id, idToT, notFoundMessage ) ); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractResultHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractResultHolder.java new file mode 100644 index 0000000000..d9bb3704c1 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractResultHolder.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import lombok.Getter; + +import java.util.Optional; + +public abstract class AbstractResultHolder, T3> +{ + private final T1 sessionHolder; + private final T2 transactionHolder; + @Getter + private final T3 result; + + public AbstractResultHolder( T1 sessionHolder, T3 result ) + { + this.sessionHolder = sessionHolder; + this.transactionHolder = null; + this.result = result; + } + + public AbstractResultHolder( T2 transactionHolder, T3 result ) + { + this.sessionHolder = null; + this.transactionHolder = transactionHolder; + this.result = result; + } + + public T1 getSessionHolder() + { + return transactionHolder != null ? getSessionHolder( transactionHolder ) : sessionHolder; + } + + public Optional getTransactionHolder() + { + return Optional.ofNullable( transactionHolder ); + } + + protected abstract T1 getSessionHolder( T2 transactionHolder ); +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractSessionHolder.java similarity index 72% rename from testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java rename to testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractSessionHolder.java index 2cde77bef6..f8e9381848 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractSessionHolder.java @@ -16,24 +16,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package neo4j.org.testkit.backend; +package neo4j.org.testkit.backend.holder; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import java.util.concurrent.CompletableFuture; -import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.SessionConfig; +@RequiredArgsConstructor @Getter -@Setter -public class AsyncSessionState +public abstract class AbstractSessionHolder { - public AsyncSession session; + public final DriverHolder driverHolder; + public final T session; + public final SessionConfig config; + @Setter public CompletableFuture txWorkFuture; - - public AsyncSessionState( AsyncSession session ) - { - this.session = session; - } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractTransactionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractTransactionHolder.java new file mode 100644 index 0000000000..7bd0429b96 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AbstractTransactionHolder.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public abstract class AbstractTransactionHolder +{ + private final T1 sessionHolder; + private final T2 transaction; +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AsyncSessionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AsyncSessionHolder.java new file mode 100644 index 0000000000..ddba2a3eaa --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AsyncSessionHolder.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.SessionConfig; +import org.neo4j.driver.async.AsyncSession; + +public class AsyncSessionHolder extends AbstractSessionHolder +{ + public AsyncSessionHolder( DriverHolder driverHolder, AsyncSession session, SessionConfig config ) + { + super( driverHolder, session, config ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AsyncTransactionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AsyncTransactionHolder.java new file mode 100644 index 0000000000..afca81a67d --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/AsyncTransactionHolder.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.async.AsyncTransaction; + +public class AsyncTransactionHolder extends AbstractTransactionHolder +{ + public AsyncTransactionHolder( AsyncSessionHolder sessionHolder, AsyncTransaction transaction ) + { + super( sessionHolder, transaction ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/DriverHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/DriverHolder.java new file mode 100644 index 0000000000..b765fa20e7 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/DriverHolder.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import org.neo4j.driver.Config; +import org.neo4j.driver.Driver; + +@RequiredArgsConstructor +@Getter +public class DriverHolder +{ + private final Driver driver; + private final Config config; +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ResultCursorHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ResultCursorHolder.java new file mode 100644 index 0000000000..02f4bedd13 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ResultCursorHolder.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.async.ResultCursor; + +public class ResultCursorHolder extends AbstractResultHolder +{ + public ResultCursorHolder( AsyncSessionHolder sessionHolder, ResultCursor result ) + { + super( sessionHolder, result ); + } + + public ResultCursorHolder( AsyncTransactionHolder transactionHolder, ResultCursor result ) + { + super( transactionHolder, result ); + } + + @Override + protected AsyncSessionHolder getSessionHolder( AsyncTransactionHolder transactionHolder ) + { + return transactionHolder.getSessionHolder(); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ResultHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ResultHolder.java new file mode 100644 index 0000000000..07d6fc6ff8 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ResultHolder.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.Result; + +public class ResultHolder extends AbstractResultHolder +{ + public ResultHolder( SessionHolder sessionHolder, Result result ) + { + super( sessionHolder, result ); + } + + public ResultHolder( TransactionHolder transactionHolder, Result result ) + { + super( transactionHolder, result ); + } + + @Override + protected SessionHolder getSessionHolder( TransactionHolder transactionHolder ) + { + return transactionHolder.getSessionHolder(); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java new file mode 100644 index 0000000000..27cecd1b88 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import lombok.Getter; +import lombok.Setter; +import neo4j.org.testkit.backend.RxBlockingSubscriber; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.driver.Record; +import org.neo4j.driver.reactive.RxResult; + +public class RxResultHolder extends AbstractResultHolder +{ + @Setter + private RxBlockingSubscriber subscriber; + @Getter + private final AtomicLong requestedRecordsCounter = new AtomicLong(); + + public RxResultHolder( RxSessionHolder sessionHolder, RxResult result ) + { + super( sessionHolder, result ); + } + + public RxResultHolder( RxTransactionHolder transactionHolder, RxResult result ) + { + super( transactionHolder, result ); + } + + public Optional> getSubscriber() + { + return Optional.ofNullable( subscriber ); + } + + @Override + protected RxSessionHolder getSessionHolder( RxTransactionHolder transactionHolder ) + { + return transactionHolder.getSessionHolder(); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxSessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java similarity index 68% rename from testkit-backend/src/main/java/neo4j/org/testkit/backend/RxSessionState.java rename to testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java index 0dfe5524e9..73e66b2920 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxSessionState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java @@ -16,24 +16,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package neo4j.org.testkit.backend; - -import lombok.Getter; -import lombok.Setter; - -import java.util.concurrent.CompletableFuture; +package neo4j.org.testkit.backend.holder; +import org.neo4j.driver.SessionConfig; import org.neo4j.driver.reactive.RxSession; -@Getter -@Setter -public class RxSessionState +public class RxSessionHolder extends AbstractSessionHolder { - public RxSession session; - public CompletableFuture txWorkFuture; - - public RxSessionState( RxSession session ) + public RxSessionHolder( DriverHolder driverHolder, RxSession session, SessionConfig config ) { - this.session = session; + super( driverHolder, session, config ); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxTransactionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxTransactionHolder.java new file mode 100644 index 0000000000..cbb3001a63 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxTransactionHolder.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.reactive.RxTransaction; + +public class RxTransactionHolder extends AbstractTransactionHolder +{ + public RxTransactionHolder( RxSessionHolder sessionHolder, RxTransaction transaction ) + { + super( sessionHolder, transaction ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/SessionHolder.java similarity index 68% rename from testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java rename to testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/SessionHolder.java index 9fc142b251..14bb3f3b96 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/SessionState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/SessionHolder.java @@ -16,23 +16,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package neo4j.org.testkit.backend; - -import lombok.Getter; -import lombok.Setter; - -import java.util.concurrent.CompletableFuture; +package neo4j.org.testkit.backend.holder; import org.neo4j.driver.Session; +import org.neo4j.driver.SessionConfig; -@Getter -@Setter -public class SessionState +public class SessionHolder extends AbstractSessionHolder { - public Session session; - public CompletableFuture txWorkFuture; - - public SessionState(Session session) { - this.session = session; + public SessionHolder( DriverHolder driverHolder, Session session, SessionConfig config ) + { + super( driverHolder, session, config ); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/TransactionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/TransactionHolder.java new file mode 100644 index 0000000000..4919750f5c --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/TransactionHolder.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.Transaction; + +public class TransactionHolder extends AbstractTransactionHolder +{ + public TransactionHolder( SessionHolder sessionHolder, Transaction transaction ) + { + super( sessionHolder, transaction ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java index 332280064f..03cb95016d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java @@ -37,14 +37,15 @@ public class CheckMultiDBSupport implements TestkitRequest public TestkitResponse process( TestkitState testkitState ) { String driverId = data.getDriverId(); - boolean available = testkitState.getDrivers().get( driverId ).supportsMultiDb(); + boolean available = testkitState.getDriverHolder( driverId ).getDriver().supportsMultiDb(); return createResponse( available ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getDrivers().get( data.getDriverId() ) + return testkitState.getDriverHolder( data.getDriverId() ) + .getDriver() .supportsMultiDbAsync() .thenApply( this::createResponse ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java index 2ba3e2f762..d7fd879fb5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java @@ -36,14 +36,15 @@ public class DriverClose implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - testkitState.getDrivers().get( data.getDriverId() ).close(); + testkitState.getDriverHolder( data.getDriverId() ).getDriver().close(); return createResponse(); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getDrivers().get( data.getDriverId() ) + return testkitState.getDriverHolder( data.getDriverId() ) + .getDriver() .closeAsync() .thenApply( ignored -> createResponse() ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index ffed5b782c..7bf2c7304c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -26,7 +26,6 @@ import reactor.core.publisher.Mono; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -38,15 +37,20 @@ public class GetFeatures implements TestkitRequest { private static final Set COMMON_FEATURES = new HashSet<>( Arrays.asList( "AuthorizationExpiredTreatment", - "Optimization:PullPipelining", "ConfHint:connection.recv_timeout_seconds", "Temporary:DriverFetchSize", - "Temporary:DriverMaxTxRetryTime", + "Temporary:DriverMaxTxRetryTime" + ) ); + + private static final Set SYNC_FEATURES = new HashSet<>( Arrays.asList( + "Optimization:PullPipelining", + "Temporary:TransactionClose", "Temporary:ResultList" ) ); - private static final Set SYNC_FEATURES = new HashSet<>( Collections.singletonList( - "Temporary:TransactionClose" + private static final Set ASYNC_FEATURES = new HashSet<>( Arrays.asList( + "Optimization:PullPipelining", + "Temporary:ResultList" ) ); @Override @@ -60,7 +64,9 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) ); + Set features = new HashSet<>( COMMON_FEATURES ); + features.addAll( ASYNC_FEATURES ); + return CompletableFuture.completedFuture( createResponse( features ) ); } @Override diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index a919a10b85..c516e75b68 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.DriverHolder; import neo4j.org.testkit.backend.messages.responses.BackendError; import neo4j.org.testkit.backend.messages.responses.DomainNameResolutionRequired; import neo4j.org.testkit.backend.messages.responses.Driver; @@ -101,15 +102,16 @@ public TestkitResponse process( TestkitState testkitState ) .map( RetrySettings::new ) .orElse( RetrySettings.DEFAULT ); org.neo4j.driver.Driver driver; + Config config = configBuilder.build(); try { - driver = driver( URI.create( data.uri ), authToken, configBuilder.build(), retrySettings, domainNameResolver, testkitState, id ); + driver = driver( URI.create( data.uri ), authToken, config, retrySettings, domainNameResolver, testkitState, id ); } catch ( RuntimeException e ) { return handleExceptionAsErrorResponse( testkitState, e ).orElseThrow( () -> e ); } - testkitState.getDrivers().putIfAbsent( id, driver ); + testkitState.addDriverHolder( id, new DriverHolder( driver, config ) ); return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index 0f21cbd287..020bfb93e3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -20,24 +20,24 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.RxSessionState; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.AsyncSessionHolder; +import neo4j.org.testkit.backend.holder.DriverHolder; +import neo4j.org.testkit.backend.holder.RxSessionHolder; +import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import org.neo4j.driver.AccessMode; -import org.neo4j.driver.Driver; import org.neo4j.driver.SessionConfig; import org.neo4j.driver.internal.InternalBookmark; @@ -50,26 +50,26 @@ public class NewSession implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return createSessionStateAndResponse( testkitState, this::createSessionState, testkitState.getSessionStates() ); + return createSessionStateAndResponse( testkitState, this::createSessionState, testkitState::addSessionHolder ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { return CompletableFuture.completedFuture( - createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ); + createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState::addAsyncSessionHolder ) ); } @Override public Mono processRx( TestkitState testkitState ) { - return Mono.just( createSessionStateAndResponse( testkitState, this::createRxSessionState, testkitState.getRxSessionStates() ) ); + return Mono.just( createSessionStateAndResponse( testkitState, this::createRxSessionState, testkitState::addRxSessionHolder ) ); } - protected TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, - Map sessionStateContainer ) + protected TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, + Function addSessionHolder ) { - Driver driver = testkitState.getDrivers().get( data.getDriverId() ); + DriverHolder driverHolder = testkitState.getDriverHolder( data.getDriverId() ); AccessMode formattedAccessMode = data.getAccessMode().equals( "r" ) ? AccessMode.READ : AccessMode.WRITE; SessionConfig.Builder builder = SessionConfig.builder() .withDefaultAccessMode( formattedAccessMode ); @@ -85,26 +85,25 @@ protected TestkitResponse createSessionStateAndResponse( TestkitState testki builder.withFetchSize( data.getFetchSize() ); } - String newId = testkitState.newId(); - T sessionState = sessionStateProducer.apply( driver, builder.build() ); - sessionStateContainer.put( newId, sessionState ); + T sessionStateHolder = sessionStateProducer.apply( driverHolder, builder.build() ); + String newId = addSessionHolder.apply( sessionStateHolder ); return Session.builder().data( Session.SessionBody.builder().id( newId ).build() ).build(); } - private SessionState createSessionState( Driver driver, SessionConfig sessionConfig ) + private SessionHolder createSessionState( DriverHolder driverHolder, SessionConfig sessionConfig ) { - return new SessionState( driver.session( sessionConfig ) ); + return new SessionHolder( driverHolder, driverHolder.getDriver().session( sessionConfig ), sessionConfig ); } - private AsyncSessionState createAsyncSessionState( Driver driver, SessionConfig sessionConfig ) + private AsyncSessionHolder createAsyncSessionState( DriverHolder driverHolder, SessionConfig sessionConfig ) { - return new AsyncSessionState( driver.asyncSession( sessionConfig ) ); + return new AsyncSessionHolder( driverHolder, driverHolder.getDriver().asyncSession( sessionConfig ), sessionConfig ); } - private RxSessionState createRxSessionState( Driver driver, SessionConfig sessionConfig ) + private RxSessionHolder createRxSessionState( DriverHolder driverHolder, SessionConfig sessionConfig ) { - return new RxSessionState( driver.rxSession( sessionConfig ) ); + return new RxSessionHolder( driverHolder, driverHolder.getDriver().rxSession( sessionConfig ), sessionConfig ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index fe65bbd8cc..dbf65f3871 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -30,7 +30,6 @@ import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; -import org.neo4j.driver.reactive.RxResult; @Setter @Getter @@ -43,7 +42,7 @@ public TestkitResponse process( TestkitState testkitState ) { try { - Result result = testkitState.getResults().get( data.getResultId() ); + Result result = testkitState.getResultHolder( data.getResultId() ).getResult(); return createResponse( result.consume() ); } catch ( NoSuchRecordException ignored ) @@ -55,17 +54,17 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getResultCursors().get( data.getResultId() ) - .consumeAsync() + return testkitState.getAsyncResultHolder( data.getResultId() ) + .thenCompose( resultCursorHolder -> resultCursorHolder.getResult().consumeAsync() ) .thenApply( this::createResponse ); } @Override public Mono processRx( TestkitState testkitState ) { - RxResult result = testkitState.getRxResults().get( data.getResultId() ); - return Mono.fromDirect( result.consume() ) - .map( this::createResponse ); + return testkitState.getRxResultHolder( data.getResultId() ) + .flatMap( resultHolder -> Mono.fromDirect( resultHolder.getResult().consume() ) ) + .map( this::createResponse ); } private Summary createResponse( org.neo4j.driver.summary.ResultSummary summary ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java index f0ec4d563c..90dd640567 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java @@ -39,14 +39,14 @@ public class ResultList implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return createResponse( testkitState.getResults().get( data.getResultId() ).list() ); + return createResponse( testkitState.getResultHolder( data.getResultId() ).getResult().list() ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getResultCursors().get( data.getResultId() ) - .listAsync() + return testkitState.getAsyncResultHolder( data.getResultId() ) + .thenCompose( resultCursorHolder -> resultCursorHolder.getResult().listAsync() ) .thenApply( this::createResponse ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index 012d84a715..7f83c5a665 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -22,18 +22,17 @@ import lombok.Setter; import neo4j.org.testkit.backend.RxBlockingSubscriber; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.responses.NullRecord; -import neo4j.org.testkit.backend.messages.responses.Record; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.function.Function; +import org.neo4j.driver.Record; import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; -import org.neo4j.driver.reactive.RxResult; @Setter @Getter @@ -46,7 +45,7 @@ public TestkitResponse process( TestkitState testkitState ) { try { - Result result = testkitState.getResults().get( data.getResultId() ); + Result result = testkitState.getResultHolder( data.getResultId() ).getResult(); return createResponse( result.next() ); } catch ( NoSuchRecordException ignored ) @@ -58,52 +57,99 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getResultCursors().get( data.getResultId() ) - .nextAsync() - .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ); + return testkitState.getAsyncResultHolder( data.getResultId() ) + .thenCompose( resultCursorHolder -> resultCursorHolder.getResult().nextAsync() ) + .thenApply( this::createResponseNullSafe ); } @Override public Mono processRx( TestkitState testkitState ) { - CompletableFuture> subscriberFuture; - String resultId = data.getResultId(); - RxBlockingSubscriber currentSubscriber = testkitState.getRxResultIdToRecordSubscriber().get( resultId ); + return testkitState.getRxResultHolder( data.getResultId() ) + .flatMap( resultHolder -> + { + CompletionStage responseStage = getSubscriberStage( resultHolder ) + .thenCompose( subscriber -> requestRecordsWhenPreviousAreConsumed( subscriber, resultHolder ) ) + .thenCompose( subscriber -> consumeNextRecordOrCompletionSignal( subscriber, resultHolder ) ) + .thenApply( this::createResponseNullSafe ); + return Mono.fromCompletionStage( responseStage ); + } ); + } - if ( currentSubscriber == null ) - { - RxBlockingSubscriber subscriber = new RxBlockingSubscriber<>(); - subscriberFuture = subscriber.getSubscriptionFuture().thenApply( subscription -> - { - subscription.request( 1000 ); - return subscriber; - } ); - testkitState.getRxResultIdToRecordSubscriber().put( resultId, subscriber ); - RxResult result = testkitState.getRxResults().get( resultId ); - result.records().subscribe( subscriber ); - } - else - { - subscriberFuture = CompletableFuture.completedFuture( currentSubscriber ); - } + private CompletionStage> getSubscriberStage( RxResultHolder resultHolder ) + { + return resultHolder.getSubscriber() + .>>map( CompletableFuture::completedFuture ) + .orElseGet( () -> + { + RxBlockingSubscriber subscriber = new RxBlockingSubscriber<>(); + CompletionStage> subscriberStage = + subscriber.getSubscriptionStage() + .thenApply( subscription -> + { + resultHolder.setSubscriber( subscriber ); + return subscriber; + } ); + resultHolder.getResult().records().subscribe( subscriber ); + return subscriberStage; + } ); + } - CompletableFuture responseFuture = subscriberFuture - .thenApply( recordsSubscriber -> - { - CompletableFuture recordConsumer = - new CompletableFuture<>(); - recordsSubscriber.setNextSignalConsumer( recordConsumer ); - return recordConsumer; - } ) - .thenCompose( Function.identity() ) - .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ); + private CompletionStage> requestRecordsWhenPreviousAreConsumed( + RxBlockingSubscriber subscriber, RxResultHolder resultHolder + ) + { + return resultHolder.getRequestedRecordsCounter().get() > 0 + ? CompletableFuture.completedFuture( subscriber ) + : subscriber.getSubscriptionStage() + .thenApply( subscription -> + { + long fetchSize = getFetchSize( resultHolder ); + subscription.request( fetchSize ); + resultHolder.getRequestedRecordsCounter().addAndGet( fetchSize ); + return subscriber; + } ); + } - return Mono.fromCompletionStage( responseFuture ); + private CompletionStage consumeNextRecord( RxBlockingSubscriber subscriber, RxResultHolder resultHolder ) + { + CompletableFuture recordConsumer = new CompletableFuture<>(); + subscriber.setNextSignalConsumer( recordConsumer ); + return recordConsumer.thenApply( record -> + { + resultHolder.getRequestedRecordsCounter().decrementAndGet(); + return record; + } ); + } + + private CompletionStage consumeNextRecordOrCompletionSignal( RxBlockingSubscriber subscriber, RxResultHolder resultHolder ) + { + return CompletableFuture.anyOf( + consumeNextRecord( subscriber, resultHolder ).toCompletableFuture(), + subscriber.getCompletionStage().toCompletableFuture() + ).thenApply( Record.class::cast ); + } + + private long getFetchSize( RxResultHolder resultHolder ) + { + long fetchSize = resultHolder.getSessionHolder().getConfig() + .fetchSize() + .orElse( resultHolder.getSessionHolder().getDriverHolder().getConfig().fetchSize() ); + return fetchSize == -1 ? Long.MAX_VALUE : fetchSize; + } + + private neo4j.org.testkit.backend.messages.responses.Record createResponse( Record record ) + { + return neo4j.org.testkit.backend.messages.responses.Record.builder() + .data( neo4j.org.testkit.backend.messages.responses.Record.RecordBody.builder() + .values( record ) + .build() ) + .build(); } - private Record createResponse( org.neo4j.driver.Record record ) + private neo4j.org.testkit.backend.messages.responses.TestkitResponse createResponseNullSafe( Record record ) { - return Record.builder().data( Record.RecordBody.builder().values( record ).build() ).build(); + return record != null ? createResponse( record ) : NullRecord.builder().build(); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java index c412b358ba..c5f5c0466c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java @@ -20,14 +20,11 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.RxSessionState; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @Setter @@ -39,11 +36,7 @@ public class RetryableNegative implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - SessionState sessionState = testkitState.getSessionStates().getOrDefault( data.sessionId, null ); - if ( sessionState == null ) - { - throw new RuntimeException( "Could not find session" ); - } + SessionHolder sessionHolder = testkitState.getSessionHolder( data.sessionId ); Throwable throwable; if ( !"".equals( data.getErrorId() ) ) { @@ -53,42 +46,50 @@ public TestkitResponse process( TestkitState testkitState ) { throwable = new RuntimeException( "Error from client in retryable tx" ); } - sessionState.getTxWorkFuture().completeExceptionally( throwable ); + sessionHolder.getTxWorkFuture().completeExceptionally( throwable ); return null; } @Override public CompletionStage processAsync( TestkitState testkitState ) { - AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); - Throwable throwable; - if ( !"".equals( data.getErrorId() ) ) - { - throwable = testkitState.getErrors().get( data.getErrorId() ); - } - else - { - throwable = new RuntimeException( "Error from client in retryable tx" ); - } - sessionState.getTxWorkFuture().completeExceptionally( throwable ); - return CompletableFuture.completedFuture( null ); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenApply( + sessionHolder -> + { + Throwable throwable; + if ( !"".equals( data.getErrorId() ) ) + { + throwable = testkitState.getErrors().get( data.getErrorId() ); + } + else + { + throwable = new RuntimeException( "Error from client in retryable tx" ); + } + sessionHolder.getTxWorkFuture().completeExceptionally( throwable ); + return null; + } ); } @Override public Mono processRx( TestkitState testkitState ) { - RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); - Throwable throwable; - if ( !"".equals( data.getErrorId() ) ) - { - throwable = testkitState.getErrors().get( data.getErrorId() ); - } - else - { - throwable = new RuntimeException( "Error from client in retryable tx" ); - } - sessionState.getTxWorkFuture().completeExceptionally( throwable ); - return Mono.empty(); + return testkitState.getRxSessionHolder( data.getSessionId() ) + .mapNotNull( + sessionHolder -> + { + Throwable throwable; + if ( !"".equals( data.getErrorId() ) ) + { + throwable = testkitState.getErrors().get( data.getErrorId() ); + } + else + { + throwable = new RuntimeException( "Error from client in retryable tx" ); + } + sessionHolder.getTxWorkFuture().completeExceptionally( throwable ); + return null; + } ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java index d4f5b28b70..4a7990ebbc 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java @@ -20,12 +20,11 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @Setter @@ -37,27 +36,35 @@ public class RetryablePositive implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - SessionState sessionState = testkitState.getSessionStates().get( data.sessionId ); - if ( sessionState == null ) + SessionHolder sessionHolder = testkitState.getSessionHolder( data.sessionId ); + if ( sessionHolder == null ) { throw new RuntimeException( "Could not find session" ); } - sessionState.getTxWorkFuture().complete( null ); + sessionHolder.getTxWorkFuture().complete( null ); return null; } @Override public CompletionStage processAsync( TestkitState testkitState ) { - testkitState.getAsyncSessionStates().get( data.getSessionId() ).getTxWorkFuture().complete( null ); - return CompletableFuture.completedFuture( null ); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenApply( sessionHolder -> + { + sessionHolder.getTxWorkFuture().complete( null ); + return null; + } ); } @Override public Mono processRx( TestkitState testkitState ) { - testkitState.getRxSessionStates().get( data.getSessionId() ).getTxWorkFuture().complete( null ); - return Mono.empty(); + return testkitState.getRxSessionHolder( data.getSessionId() ) + .mapNotNull( sessionHolder -> + { + sessionHolder.getTxWorkFuture().complete( null ); + return null; + } ); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 0c12a6f61a..0b10511fa2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -20,10 +20,11 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.RxSessionState; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; +import neo4j.org.testkit.backend.holder.RxTransactionHolder; +import neo4j.org.testkit.backend.holder.SessionHolder; +import neo4j.org.testkit.backend.holder.TransactionHolder; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; import reactor.core.publisher.Mono; @@ -31,9 +32,9 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.reactive.RxSession; @@ -47,70 +48,59 @@ public class SessionBeginTransaction implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.sessionId, null ) ) - .map( SessionState::getSession ) - .map( session -> - { - TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); + SessionHolder sessionHolder = testkitState.getSessionHolder( data.getSessionId() ); + Session session = sessionHolder.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); - if ( data.getTimeout() != null ) - { - builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); - } + if ( data.getTimeout() != null ) + { + builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); + } - return transaction( testkitState.addTransaction( session.beginTransaction( builder.build() ) ) ); - } ) - .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); + org.neo4j.driver.Transaction transaction = session.beginTransaction( builder.build() ); + return transaction( testkitState.addTransactionHolder( new TransactionHolder( sessionHolder, transaction ) ) ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); - if ( sessionState != null ) - { - AsyncSession session = sessionState.getSession(); - TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenCompose( sessionHolder -> + { + AsyncSession session = sessionHolder.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); - if ( data.getTimeout() != null ) - { - builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); - } + if ( data.getTimeout() != null ) + { + builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); + } - return session.beginTransactionAsync( builder.build() ).thenApply( tx -> transaction( testkitState.addAsyncTransaction( tx ) ) ); - } - else - { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally( new RuntimeException( "Could not find session" ) ); - return future; - } + return session.beginTransactionAsync( builder.build() ).thenApply( tx -> transaction( + testkitState.addAsyncTransactionHolder( new AsyncTransactionHolder( sessionHolder, tx ) ) ) ); + } ); } @Override public Mono processRx( TestkitState testkitState ) { - RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); - if ( sessionState != null ) - { - RxSession session = sessionState.getSession(); - TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); + return testkitState.getRxSessionHolder( data.getSessionId() ) + .flatMap( sessionHolder -> + { + RxSession session = sessionHolder.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); - if ( data.getTimeout() != null ) - { - builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); - } + if ( data.getTimeout() != null ) + { + builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); + } - return Mono.fromDirect( session.beginTransaction( builder.build() ) ) - .map( tx -> transaction( testkitState.addRxTransaction( tx ) ) ); - } - else - { - return Mono.error( new RuntimeException( "Could not find session" ) ); - } + return Mono.fromDirect( session.beginTransaction( builder.build() ) ) + .map( tx -> transaction( + testkitState.addRxTransactionHolder( new RxTransactionHolder( sessionHolder, tx ) ) ) ); + } ); } private Transaction transaction( String txId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index 0bbc94e1a7..d3680a3f49 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -36,23 +36,24 @@ public class SessionClose implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - testkitState.getSessionStates().get( data.getSessionId() ).getSession().close(); + testkitState.getSessionHolder( data.getSessionId() ).getSession().close(); return createResponse(); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession() - .closeAsync() + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenCompose( sessionHolder -> sessionHolder.getSession().closeAsync() ) .thenApply( ignored -> createResponse() ); } @Override public Mono processRx( TestkitState testkitState ) { - return Mono.fromDirect( testkitState.getRxSessionStates().get( data.getSessionId() ).getSession().close() ) - .then( Mono.just( createResponse() ) ); + return testkitState.getRxSessionHolder( data.getSessionId() ) + .flatMap( sessionHolder -> Mono.fromDirect( sessionHolder.getSession().close() ) ) + .then( Mono.just( createResponse() ) ); } private Session createResponse() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java index 5d4bedd22f..f227d94e3b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java @@ -20,14 +20,12 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.messages.responses.Bookmarks; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.neo4j.driver.Bookmark; @@ -41,28 +39,25 @@ public class SessionLastBookmarks implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.sessionId, null ) ) - .map( SessionState::getSession ) - .map( session -> - { - Bookmark bookmark = testkitState.getSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); - return createResponse( bookmark ); - } ) - .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); + SessionHolder sessionHolder = testkitState.getSessionHolder( data.getSessionId() ); + Bookmark bookmark = sessionHolder.getSession().lastBookmark(); + return createResponse( bookmark ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - Bookmark bookmark = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); - return CompletableFuture.completedFuture( createResponse( bookmark ) ); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenApply( sessionHolder -> sessionHolder.getSession().lastBookmark() ) + .thenApply( this::createResponse ); } @Override public Mono processRx( TestkitState testkitState ) { - Bookmark bookmark = testkitState.getRxSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); - return Mono.just( createResponse( bookmark ) ); + return testkitState.getRxSessionHolder( data.getSessionId() ) + .map( sessionHolder -> sessionHolder.getSession().lastBookmark() ) + .map( this::createResponse ); } private Bookmarks createResponse( Bookmark bookmark ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index 284f7a1f56..b032547164 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -20,17 +20,17 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.RxSessionState; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; +import neo4j.org.testkit.backend.holder.RxTransactionHolder; +import neo4j.org.testkit.backend.holder.SessionHolder; +import neo4j.org.testkit.backend.holder.TransactionHolder; import neo4j.org.testkit.backend.messages.responses.RetryableDone; import neo4j.org.testkit.backend.messages.responses.RetryableTry; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -51,59 +51,62 @@ public class SessionReadTransaction implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.getSessionId(), null ) ) - .map( sessionState -> - { - Session session = sessionState.getSession(); - session.readTransaction( handle( testkitState, sessionState ) ); - return retryableDone(); - } ).orElseThrow( () -> new RuntimeException( "Could not find session" ) ); + SessionHolder sessionHolder = testkitState.getSessionHolder( data.getSessionId() ); + Session session = sessionHolder.getSession(); + session.readTransaction( handle( testkitState, sessionHolder ) ); + return retryableDone(); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); - AsyncSession session = sessionState.getSession(); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenCompose( sessionHolder -> + { + AsyncSession session = sessionHolder.getSession(); - AsyncTransactionWork> workWrapper = tx -> - { - String txId = testkitState.addAsyncTransaction( tx ); - testkitState.getResponseWriter().accept( retryableTry( txId ) ); - CompletableFuture txWorkFuture = new CompletableFuture<>(); - sessionState.setTxWorkFuture( txWorkFuture ); - return txWorkFuture; - }; + AsyncTransactionWork> workWrapper = tx -> + { + String txId = testkitState.addAsyncTransactionHolder( new AsyncTransactionHolder( sessionHolder, tx ) ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture txWorkFuture = new CompletableFuture<>(); + sessionHolder.setTxWorkFuture( txWorkFuture ); + return txWorkFuture; + }; - return session.readTransactionAsync( workWrapper ) - .thenApply( nothing -> retryableDone() ); + return session.readTransactionAsync( workWrapper ); + } ) + .thenApply( nothing -> retryableDone() ); } @Override public Mono processRx( TestkitState testkitState ) { - RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); - RxTransactionWork> workWrapper = tx -> - { - String txId = testkitState.addRxTransaction( tx ); - testkitState.getResponseWriter().accept( retryableTry( txId ) ); - CompletableFuture tryResult = new CompletableFuture<>(); - sessionState.setTxWorkFuture( tryResult ); - return Mono.fromCompletionStage( tryResult ); - }; + return testkitState.getRxSessionHolder( data.getSessionId() ) + .flatMap( sessionHolder -> + { + RxTransactionWork> workWrapper = tx -> + { + String txId = testkitState.addRxTransactionHolder( new RxTransactionHolder( sessionHolder, tx ) ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionHolder.setTxWorkFuture( tryResult ); + return Mono.fromCompletionStage( tryResult ); + }; - return Mono.fromDirect( sessionState.getSession().readTransaction( workWrapper ) ) - .then( Mono.just( retryableDone() ) ); + return Mono.fromDirect( sessionHolder.getSession().readTransaction( workWrapper ) ); + } ) + .then( Mono.just( retryableDone() ) ); } - private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) + private TransactionWork handle( TestkitState testkitState, SessionHolder sessionHolder ) { return tx -> { - String txId = testkitState.addTransaction( tx ); + String txId = testkitState.addTransactionHolder( new TransactionHolder( sessionHolder, tx ) ); testkitState.getResponseWriter().accept( retryableTry( txId ) ); CompletableFuture txWorkFuture = new CompletableFuture<>(); - sessionState.setTxWorkFuture( txWorkFuture ); + sessionHolder.setTxWorkFuture( txWorkFuture ); try { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 3928c5b44c..e943726de7 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -22,6 +22,10 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.ResultCursorHolder; +import neo4j.org.testkit.backend.holder.ResultHolder; +import neo4j.org.testkit.backend.holder.RxResultHolder; +import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.Result; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -48,7 +52,8 @@ public class SessionRun implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - Session session = testkitState.getSessionStates().get( data.getSessionId() ).getSession(); + SessionHolder sessionHolder = testkitState.getSessionHolder( data.getSessionId() ); + Session session = sessionHolder.getSession(); Query query = Optional.ofNullable( data.params ) .map( params -> new Query( data.cypher, data.params ) ) .orElseGet( () -> new Query( data.cypher ) ); @@ -56,8 +61,7 @@ public TestkitResponse process( TestkitState testkitState ) Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); org.neo4j.driver.Result result = session.run( query, transactionConfig.build() ); - String id = testkitState.newId(); - testkitState.getResults().put( id, result ); + String id = testkitState.addResultHolder( new ResultHolder( sessionHolder, result ) ); return createResponse( id ); } @@ -65,42 +69,50 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - AsyncSession session = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession(); - Query query = Optional.ofNullable( data.params ) - .map( params -> new Query( data.cypher, data.params ) ) - .orElseGet( () -> new Query( data.cypher ) ); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); - Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenCompose( sessionHolder -> + { + AsyncSession session = sessionHolder.getSession(); + Query query = Optional.ofNullable( data.params ) + .map( params -> new Query( data.cypher, data.params ) ) + .orElseGet( () -> new Query( data.cypher ) ); + TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); + Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); + Optional.ofNullable( data.getTimeout() ) + .ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); - return session.runAsync( query, transactionConfig.build() ) - .thenApply( resultCursor -> - { - String id = testkitState.newId(); - testkitState.getResultCursors().put( id, resultCursor ); - return createResponse( id ); - } ); + return session.runAsync( query, transactionConfig.build() ) + .thenApply( resultCursor -> + { + String id = testkitState.addAsyncResultHolder( + new ResultCursorHolder( sessionHolder, resultCursor ) ); + return createResponse( id ); + } ); + } ); } @Override public Mono processRx( TestkitState testkitState ) { - RxSession session = testkitState.getRxSessionStates().get( data.getSessionId() ).getSession(); - Query query = Optional.ofNullable( data.params ) - .map( params -> new Query( data.cypher, data.params ) ) - .orElseGet( () -> new Query( data.cypher ) ); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); - Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); + return testkitState.getRxSessionHolder( data.getSessionId() ) + .flatMap( sessionHolder -> + { + RxSession session = sessionHolder.getSession(); + Query query = Optional.ofNullable( data.params ) + .map( params -> new Query( data.cypher, data.params ) ) + .orElseGet( () -> new Query( data.cypher ) ); + TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); + Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); + Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); - RxResult result = session.run( query, transactionConfig.build() ); - String id = testkitState.newId(); - testkitState.getRxResults().put( id, result ); + RxResult result = session.run( query, transactionConfig.build() ); + String id = testkitState.addRxResultHolder( new RxResultHolder( sessionHolder, result ) ); - // The keys() method causes RUN message exchange. - // However, it does not currently report errors. - return Mono.fromDirect( result.keys() ) - .map( ignored -> createResponse( id ) ); + // The keys() method causes RUN message exchange. + // However, it does not currently report errors. + return Mono.fromDirect( result.keys() ) + .map( ignored -> createResponse( id ) ); + } ); } private Result createResponse( String resultId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index fda177afbd..65caff854e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -20,10 +20,11 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.AsyncSessionState; -import neo4j.org.testkit.backend.RxSessionState; -import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; +import neo4j.org.testkit.backend.holder.RxTransactionHolder; +import neo4j.org.testkit.backend.holder.SessionHolder; +import neo4j.org.testkit.backend.holder.TransactionHolder; import neo4j.org.testkit.backend.messages.responses.RetryableDone; import neo4j.org.testkit.backend.messages.responses.RetryableTry; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -31,7 +32,6 @@ import reactor.core.publisher.Mono; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -52,60 +52,64 @@ public class SessionWriteTransaction implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.getSessionId(), null ) ) - .map( sessionState -> - { - Session session = sessionState.getSession(); - session.writeTransaction( handle( testkitState, sessionState ) ); - return retryableDone(); - } ).orElseThrow( () -> new RuntimeException( "Could not find session" ) ); + SessionHolder sessionHolder = testkitState.getSessionHolder( data.getSessionId() ); + Session session = sessionHolder.getSession(); + session.writeTransaction( handle( testkitState, sessionHolder ) ); + return retryableDone(); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); - AsyncSession session = sessionState.getSession(); + return testkitState.getAsyncSessionHolder( data.getSessionId() ) + .thenCompose( sessionHolder -> + { + AsyncSession session = sessionHolder.getSession(); - AsyncTransactionWork> workWrapper = - tx -> - { - String txId = testkitState.addAsyncTransaction( tx ); - testkitState.getResponseWriter().accept( retryableTry( txId ) ); - CompletableFuture tryResult = new CompletableFuture<>(); - sessionState.setTxWorkFuture( tryResult ); - return tryResult; - }; + AsyncTransactionWork> workWrapper = + tx -> + { + String txId = + testkitState.addAsyncTransactionHolder( new AsyncTransactionHolder( sessionHolder, tx ) ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionHolder.setTxWorkFuture( tryResult ); + return tryResult; + }; - return session.writeTransactionAsync( workWrapper ) - .thenApply( nothing -> retryableDone() ); + return session.writeTransactionAsync( workWrapper ); + } ) + .thenApply( nothing -> retryableDone() ); } @Override public Mono processRx( TestkitState testkitState ) { - RxSessionState sessionState = testkitState.getRxSessionStates().get( data.getSessionId() ); - RxTransactionWork> workWrapper = tx -> - { - String txId = testkitState.addRxTransaction( tx ); - testkitState.getResponseWriter().accept( retryableTry( txId ) ); - CompletableFuture tryResult = new CompletableFuture<>(); - sessionState.setTxWorkFuture( tryResult ); - return Mono.fromCompletionStage( tryResult ); - }; + return testkitState.getRxSessionHolder( data.getSessionId() ) + .flatMap( sessionHolder -> + { + RxTransactionWork> workWrapper = tx -> + { + String txId = testkitState.addRxTransactionHolder( new RxTransactionHolder( sessionHolder, tx ) ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionHolder.setTxWorkFuture( tryResult ); + return Mono.fromCompletionStage( tryResult ); + }; - return Mono.fromDirect( sessionState.getSession().writeTransaction( workWrapper ) ) - .then( Mono.just( retryableDone() ) ); + return Mono.fromDirect( sessionHolder.getSession().writeTransaction( workWrapper ) ); + } ) + .then( Mono.just( retryableDone() ) ); } - private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) + private TransactionWork handle( TestkitState testkitState, SessionHolder sessionHolder ) { return tx -> { - String txId = testkitState.addTransaction( tx ); + String txId = testkitState.addTransactionHolder( new TransactionHolder( sessionHolder, tx ) ); testkitState.getResponseWriter().accept( retryableTry( txId ) ); CompletableFuture txWorkFuture = new CompletableFuture<>(); - sessionState.setTxWorkFuture( txWorkFuture ); + sessionHolder.setTxWorkFuture( txWorkFuture ); try { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 788464a749..90df5705ab 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -50,6 +50,10 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.RoutingV3\\..*$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_should_reject_server_using_verify_connectivity_bolt_3x0$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_supports_bolt_3x0", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationSessionRun\\.test_all_v3$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationSessionRun\\.test_discards_on_session_close$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationTxRun\\.test_batch_v3$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationTxRun\\.test_all_v3$", skipMessage ); // Current limitations (require further investigation or bug fixing) skipMessage = "Does not report RUN FAILURE"; @@ -59,35 +63,17 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_on_tx_run$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRetry\\..*$", "Unfinished results consumption" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRetryClustering\\..*$", "Unfinished results consumption" ); - skipMessage = "Does not support PULL pipelining"; - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_read_tx_and_rediscovery_until_success$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_read_tx_until_success_on_error$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_tx_and_rediscovery_until_success$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_tx_until_success_on_error$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_reader_using_session_run$", - skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_reader_using_tx_run$", - skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_session_run$", - skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_using_tx_function$", - skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_using_tx_function$", - skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_change_using_tx_function$", - skipMessage ); - skipMessage = "Custom fetch size not supported"; - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_accept_custom_fetch_size_using_driver_configuration$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_pull_all_when_fetch_is_minus_one_using_driver_configuration", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_pull_custom_size_and_then_all_using_session_configuration$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_accept_custom_fetch_size_using_session_configuration$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationSessionRun\\..*$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestIterationTxRun\\..*$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\..*$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_rollback_tx_on_session_close_consumed_result$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$", + "Does not support partially consumed state" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function$", "Commit failure leaks outside function" ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( + "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$", + "Rollback failures following commit failure" ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( + "^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function$", + "Rollback failures following commit failure" ); skipMessage = "Requires investigation"; REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage ); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index ef2ccddf13..b06602eb47 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -36,7 +36,7 @@ public class TransactionClose implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - testkitState.getTransaction( data.getTxId() ).close(); + testkitState.getTransactionHolder( data.getTxId() ).getTransaction().close(); return createResponse( data.getTxId() ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index eb76d4b650..4af0187078 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -27,8 +27,6 @@ import java.util.concurrent.CompletionStage; -import org.neo4j.driver.async.AsyncTransaction; - @Getter @Setter public class TransactionCommit implements TestkitRequest @@ -38,22 +36,22 @@ public class TransactionCommit implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - testkitState.getTransaction( data.getTxId() ).commit(); + testkitState.getTransactionHolder( data.getTxId() ).getTransaction().commit(); return createResponse( data.getTxId() ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( AsyncTransaction::commitAsync ) + return testkitState.getAsyncTransactionHolder( data.getTxId() ).thenCompose( tx -> tx.getTransaction().commitAsync() ) .thenApply( ignored -> createResponse( data.getTxId() ) ); } @Override public Mono processRx( TestkitState testkitState ) { - return testkitState.getRxTransaction( data.getTxId() ) - .flatMap( tx -> Mono.fromDirect( tx.commit() ) ) + return testkitState.getRxTransactionHolder( data.getTxId() ) + .flatMap( tx -> Mono.fromDirect( tx.getTransaction().commit() ) ) .then( Mono.just( createResponse( data.getTxId() ) ) ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index c695f1d870..f31b44cf22 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -27,8 +27,6 @@ import java.util.concurrent.CompletionStage; -import org.neo4j.driver.async.AsyncTransaction; - @Getter @Setter public class TransactionRollback implements TestkitRequest @@ -38,22 +36,22 @@ public class TransactionRollback implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - testkitState.getTransaction( data.getTxId() ).rollback(); + testkitState.getTransactionHolder( data.getTxId() ).getTransaction().rollback(); return createResponse( data.getTxId() ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransaction( data.getTxId() ).thenCompose( AsyncTransaction::rollbackAsync ) + return testkitState.getAsyncTransactionHolder( data.getTxId() ).thenCompose( tx -> tx.getTransaction().rollbackAsync() ) .thenApply( ignored -> createResponse( data.getTxId() ) ); } @Override public Mono processRx( TestkitState testkitState ) { - return testkitState.getRxTransaction( data.getTxId() ) - .flatMap( tx -> Mono.fromDirect( tx.rollback() ) ) + return testkitState.getRxTransactionHolder( data.getTxId() ) + .flatMap( tx -> Mono.fromDirect( tx.getTransaction().rollback() ) ) .then( Mono.just( createResponse( data.getTxId() ) ) ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index f61b2d7824..2c584bca82 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -22,6 +22,10 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.ResultCursorHolder; +import neo4j.org.testkit.backend.holder.ResultHolder; +import neo4j.org.testkit.backend.holder.RxResultHolder; +import neo4j.org.testkit.backend.holder.TransactionHolder; import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.Result; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -42,40 +46,43 @@ public class TransactionRun implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - org.neo4j.driver.Result result = - testkitState.getTransaction( data.getTxId() ).run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ); - String resultId = testkitState.newId(); - testkitState.getResults().put( resultId, result ); + TransactionHolder transactionHolder = testkitState.getTransactionHolder( data.getTxId() ); + org.neo4j.driver.Result result = transactionHolder.getTransaction() + .run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ); + String resultId = testkitState.addResultHolder( new ResultHolder( transactionHolder, result ) ); return createResponse( resultId ); } @Override public CompletionStage processAsync( TestkitState testkitState ) { - return testkitState.getAsyncTransaction( data.getTxId() ) - .thenCompose( tx -> tx.runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) - .thenApply( resultCursor -> - { - String resultId = testkitState.newId(); - testkitState.getResultCursors().put( resultId, resultCursor ); - return createResponse( resultId ); - } ); + return testkitState.getAsyncTransactionHolder( data.getTxId() ) + .thenCompose( transactionHolder -> transactionHolder.getTransaction() + .runAsync( data.getCypher(), + data.getParams() != null ? data.getParams() : Collections.emptyMap() ) + .thenApply( resultCursor -> + { + String resultId = testkitState.addAsyncResultHolder( + new ResultCursorHolder( transactionHolder, + resultCursor ) ); + return createResponse( resultId ); + } ) ); } @Override public Mono processRx( TestkitState testkitState ) { - String resultId = testkitState.newId(); - return testkitState.getRxTransaction( data.getTxId() ) - .flatMap( tx -> + return testkitState.getRxTransactionHolder( data.getTxId() ) + .flatMap( transactionHolder -> { - RxResult result = tx.run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ); - testkitState.getRxResults().put( resultId, result ); + RxResult result = transactionHolder.getTransaction() + .run( data.getCypher(), + data.getParams() != null ? data.getParams() : Collections.emptyMap() ); + String resultId = testkitState.addRxResultHolder( new RxResultHolder( transactionHolder, result ) ); // The keys() method causes RUN message exchange. // However, it does not currently report errors. - return Mono.fromDirect( result.keys() ); - } ) - .map( ignored -> createResponse( resultId ) ); + return Mono.fromDirect( result.keys() ).then( Mono.just( createResponse( resultId ) ) ); + } ); } protected Result createResponse( String resultId ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java index 7a03d43ae8..6b2c0a7899 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java @@ -37,7 +37,7 @@ public class VerifyConnectivity implements TestkitRequest public TestkitResponse process( TestkitState testkitState ) { String id = data.getDriverId(); - testkitState.getDrivers().get( id ).verifyConnectivity(); + testkitState.getDriverHolder( id ).getDriver().verifyConnectivity(); return createResponse( id ); } @@ -45,7 +45,8 @@ public TestkitResponse process( TestkitState testkitState ) public CompletionStage processAsync( TestkitState testkitState ) { String id = data.getDriverId(); - return testkitState.getDrivers().get( id ) + return testkitState.getDriverHolder( id ) + .getDriver() .verifyConnectivityAsync() .thenApply( ignored -> createResponse( id ) ); }