diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java deleted file mode 100644 index f8ca02c18d..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; - -public class RxBlockingSubscriber implements Subscriber -{ - private final CompletableFuture subscriptionFuture = new CompletableFuture<>(); - private final CompletableFuture completionFuture = new CompletableFuture<>(); - private CompletableFuture> nextSignalConsumerFuture; - - public void setNextSignalConsumer( CompletableFuture nextSignalConsumer ) - { - nextSignalConsumerFuture.complete( nextSignalConsumer ); - } - - public CompletionStage getSubscriptionStage() - { - return subscriptionFuture; - } - - public CompletionStage getCompletionStage() - { - return completionFuture; - } - - @Override - public void onSubscribe( Subscription s ) - { - nextSignalConsumerFuture = new CompletableFuture<>(); - subscriptionFuture.complete( s ); - } - - @Override - public void onNext( T t ) - { - blockUntilNextSignalConsumer().complete( t ); - } - - @Override - public void onError( Throwable t ) - { - completionFuture.completeExceptionally( t ); - } - - @Override - public void onComplete() - { - completionFuture.complete( null ); - } - - private CompletableFuture blockUntilNextSignalConsumer() - { - CompletableFuture nextSignalConsumer; - try - { - nextSignalConsumer = nextSignalConsumerFuture.get(); - } - catch ( Throwable throwable ) - { - throw new RuntimeException( "Failed waiting for next signal consumer", throwable ); - } - nextSignalConsumerFuture = new CompletableFuture<>(); - return nextSignalConsumer; - } -} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java new file mode 100644 index 0000000000..4137b464a2 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java @@ -0,0 +1,241 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +/** + * Buffered subscriber for testing purposes. + *

+ * It consumes incoming signals as soon as they arrive and prevents publishing thread from getting blocked. + *

+ * The consumed signals can be retrieved one-by-one using {@link #next()}. It calls upstream {@link org.reactivestreams.Subscription#request(long)} with + * configured fetch size only when next signal is requested and no signals are expected to be emitted either because they have not been requested yet or the + * previous demand has been satisfied. + * + * @param + */ +public class RxBufferedSubscriber extends BaseSubscriber +{ + private final Lock lock = new ReentrantLock(); + private final long fetchSize; + private final CompletableFuture subscriptionFuture = new CompletableFuture<>(); + private final FluxSink itemsSink; + private final OneSignalSubscriber itemsSubscriber; + private long pendingItems; + private boolean nextInProgress; + + public RxBufferedSubscriber( long fetchSize ) + { + this.fetchSize = fetchSize; + AtomicReference> sinkRef = new AtomicReference<>(); + itemsSubscriber = new OneSignalSubscriber<>(); + Flux.create( fluxSink -> + { + sinkRef.set( fluxSink ); + fluxSink.onRequest( ignored -> requestFromUpstream() ); + } ).subscribe( itemsSubscriber ); + itemsSink = sinkRef.get(); + } + + /** + * Returns a {@link Mono} of next signal from this subscription. + *

+ * If necessary, a request with configured fetch size is made for more signals to be published. + *

+ * Only a single in progress request is supported at a time. The returned {@link Mono} must succeed or error before next call is permitted. + *

+ * Both empty successful completion and error completion indicate the completion of the subscribed publisher. This method must not be called after this. + * + * @return the {@link Mono} of next signal. + */ + public Mono next() + { + executeWithLock( lock, () -> + { + if ( nextInProgress ) + { + throw new IllegalStateException( "Only one in progress next is allowed at a time" ); + } + return nextInProgress = true; + } ); + return Mono.fromCompletionStage( subscriptionFuture ) + .then( Mono.create( itemsSubscriber::requestNext ) ) + .doOnSuccess( ignored -> executeWithLock( lock, () -> nextInProgress = false ) ) + .doOnError( ignored -> executeWithLock( lock, () -> nextInProgress = false ) ); + } + + @Override + protected void hookOnSubscribe( Subscription subscription ) + { + subscriptionFuture.complete( subscription ); + } + + @Override + protected void hookOnNext( T value ) + { + executeWithLock( lock, () -> pendingItems-- ); + itemsSink.next( value ); + } + + @Override + protected void hookOnComplete() + { + itemsSink.complete(); + } + + @Override + protected void hookOnError( Throwable throwable ) + { + itemsSink.error( throwable ); + } + + private void requestFromUpstream() + { + boolean moreItemsPending = executeWithLock( lock, () -> + { + boolean morePending; + if ( pendingItems > 0 ) + { + morePending = true; + } + else + { + pendingItems = fetchSize; + morePending = false; + } + return morePending; + } ); + if ( moreItemsPending ) + { + return; + } + Subscription subscription = subscriptionFuture.getNow( null ); + if ( subscription == null ) + { + throw new IllegalStateException( "Upstream subscription must not be null at this stage" ); + } + subscription.request( fetchSize ); + } + + public static T executeWithLock( Lock lock, Supplier supplier ) + { + lock.lock(); + try + { + return supplier.get(); + } + finally + { + lock.unlock(); + } + } + + private static class OneSignalSubscriber extends BaseSubscriber + { + private final Lock lock = new ReentrantLock(); + private MonoSink sink; + private boolean emitted; + private boolean done; + private Throwable throwable; + + public void requestNext( MonoSink sink ) + { + boolean done = executeWithLock( lock, () -> + { + this.sink = sink; + emitted = false; + return this.done; + } ); + + if ( done ) + { + if ( throwable != null ) + { + this.sink.error( throwable ); + } + else + { + this.sink.success(); + } + } + else + { + upstream().request( 1 ); + } + } + + @Override + protected void hookOnSubscribe( Subscription subscription ) + { + // left empty to prevent requesting signals immediately + } + + @Override + protected void hookOnNext( T value ) + { + MonoSink sink = executeWithLock( lock, () -> + { + emitted = true; + return this.sink; + } ); + sink.success( value ); + } + + @Override + protected void hookOnComplete() + { + MonoSink sink = executeWithLock( lock, () -> + { + done = true; + return !emitted ? this.sink : null; + } ); + if ( sink != null ) + { + sink.success(); + } + } + + @Override + protected void hookOnError( Throwable throwable ) + { + MonoSink sink = executeWithLock( lock, () -> + { + done = true; + this.throwable = throwable; + return !emitted ? this.sink : null; + } ); + if ( sink != null ) + { + sink.error( throwable ); + } + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java index 56631f33eb..fd1c290c4b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.RxBlockingSubscriber; +import neo4j.org.testkit.backend.RxBufferedSubscriber; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -31,23 +31,21 @@ public class RxResultHolder extends AbstractResultHolder { @Setter - private RxBlockingSubscriber subscriber; + private RxBufferedSubscriber subscriber; @Getter private final AtomicLong requestedRecordsCounter = new AtomicLong(); public RxResultHolder( RxSessionHolder sessionHolder, RxResult result ) { super( sessionHolder, result ); - sessionHolder.setResultHolder( this ); } public RxResultHolder( RxTransactionHolder transactionHolder, RxResult result ) { super( transactionHolder, result ); - transactionHolder.getSessionHolder().setResultHolder( this ); } - public Optional> getSubscriber() + public Optional> getSubscriber() { return Optional.ofNullable( subscriber ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java index 607ce07229..73e66b2920 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java @@ -18,25 +18,13 @@ */ package neo4j.org.testkit.backend.holder; -import lombok.Setter; - -import java.util.Optional; - import org.neo4j.driver.SessionConfig; import org.neo4j.driver.reactive.RxSession; public class RxSessionHolder extends AbstractSessionHolder { - @Setter - private RxResultHolder resultHolder; - public RxSessionHolder( DriverHolder driverHolder, RxSession session, SessionConfig config ) { super( driverHolder, session, config ); } - - public Optional getResultHolder() - { - return Optional.ofNullable( resultHolder ); - } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index 7f83c5a665..6a434ca3d5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -20,14 +20,13 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.RxBlockingSubscriber; +import neo4j.org.testkit.backend.RxBufferedSubscriber; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.responses.NullRecord; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.neo4j.driver.Record; @@ -68,66 +67,22 @@ public Mono processRx( TestkitState testkitState ) return testkitState.getRxResultHolder( data.getResultId() ) .flatMap( resultHolder -> { - CompletionStage responseStage = getSubscriberStage( resultHolder ) - .thenCompose( subscriber -> requestRecordsWhenPreviousAreConsumed( subscriber, resultHolder ) ) - .thenCompose( subscriber -> consumeNextRecordOrCompletionSignal( subscriber, resultHolder ) ) - .thenApply( this::createResponseNullSafe ); - return Mono.fromCompletionStage( responseStage ); - } ); - } - - private CompletionStage> getSubscriberStage( RxResultHolder resultHolder ) - { - return resultHolder.getSubscriber() - .>>map( CompletableFuture::completedFuture ) - .orElseGet( () -> - { - RxBlockingSubscriber subscriber = new RxBlockingSubscriber<>(); - CompletionStage> subscriberStage = - subscriber.getSubscriptionStage() - .thenApply( subscription -> + RxBufferedSubscriber subscriber = + resultHolder.getSubscriber() + .orElseGet( () -> { - resultHolder.setSubscriber( subscriber ); - return subscriber; + RxBufferedSubscriber subscriberInstance = + new RxBufferedSubscriber<>( + getFetchSize( resultHolder ) ); + resultHolder.setSubscriber( subscriberInstance ); + resultHolder.getResult().records() + .subscribe( subscriberInstance ); + return subscriberInstance; } ); - resultHolder.getResult().records().subscribe( subscriber ); - return subscriberStage; - } ); - } - - private CompletionStage> requestRecordsWhenPreviousAreConsumed( - RxBlockingSubscriber subscriber, RxResultHolder resultHolder - ) - { - return resultHolder.getRequestedRecordsCounter().get() > 0 - ? CompletableFuture.completedFuture( subscriber ) - : subscriber.getSubscriptionStage() - .thenApply( subscription -> - { - long fetchSize = getFetchSize( resultHolder ); - subscription.request( fetchSize ); - resultHolder.getRequestedRecordsCounter().addAndGet( fetchSize ); - return subscriber; - } ); - } - - private CompletionStage consumeNextRecord( RxBlockingSubscriber subscriber, RxResultHolder resultHolder ) - { - CompletableFuture recordConsumer = new CompletableFuture<>(); - subscriber.setNextSignalConsumer( recordConsumer ); - return recordConsumer.thenApply( record -> - { - resultHolder.getRequestedRecordsCounter().decrementAndGet(); - return record; - } ); - } - - private CompletionStage consumeNextRecordOrCompletionSignal( RxBlockingSubscriber subscriber, RxResultHolder resultHolder ) - { - return CompletableFuture.anyOf( - consumeNextRecord( subscriber, resultHolder ).toCompletableFuture(), - subscriber.getCompletionStage().toCompletableFuture() - ).thenApply( Record.class::cast ); + return subscriber.next() + .map( this::createResponse ) + .defaultIfEmpty( NullRecord.builder().build() ); + } ); } private long getFetchSize( RxResultHolder resultHolder ) @@ -138,7 +93,7 @@ private long getFetchSize( RxResultHolder resultHolder ) return fetchSize == -1 ? Long.MAX_VALUE : fetchSize; } - private neo4j.org.testkit.backend.messages.responses.Record createResponse( Record record ) + private neo4j.org.testkit.backend.messages.responses.TestkitResponse createResponse( Record record ) { return neo4j.org.testkit.backend.messages.responses.Record.builder() .data( neo4j.org.testkit.backend.messages.responses.Record.RecordBody.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index e0c0015ba0..d3680a3f49 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -20,18 +20,12 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.RxBlockingSubscriber; import neo4j.org.testkit.backend.TestkitState; -import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicLong; - -import org.neo4j.driver.Record; @Setter @Getter @@ -58,126 +52,15 @@ public CompletionStage processAsync( TestkitState testkitState public Mono processRx( TestkitState testkitState ) { return testkitState.getRxSessionHolder( data.getSessionId() ) - .flatMap( sessionHolder -> sessionHolder.getResultHolder() - .map( this::consumeRequestedDemandAndCancelIfSubscribed ) - .orElse( Mono.empty() ) - .then( Mono.fromDirect( sessionHolder.getSession().close() ) ) ) + .flatMap( sessionHolder -> Mono.fromDirect( sessionHolder.getSession().close() ) ) .then( Mono.just( createResponse() ) ); } - private Mono consumeRequestedDemandAndCancelIfSubscribed( RxResultHolder resultHolder ) - { - return resultHolder.getSubscriber() - .map( subscriber -> Mono.fromCompletionStage( consumeRequestedDemandAndCancelIfSubscribed( resultHolder, subscriber ) ) ) - .orElse( Mono.empty() ); - } - - private CompletionStage consumeRequestedDemandAndCancelIfSubscribed( RxResultHolder resultHolder, RxBlockingSubscriber subscriber ) - { - if ( subscriber.getCompletionStage().toCompletableFuture().isDone() ) - { - return CompletableFuture.completedFuture( null ); - } - - return new DemandConsumer<>( subscriber, resultHolder.getRequestedRecordsCounter() ) - .getCompletedStage() - .thenCompose( completionReason -> - { - CompletionStage result; - switch ( completionReason ) - { - case REQUESTED_DEMAND_CONSUMED: - result = subscriber.getSubscriptionStage().thenApply( subscription -> - { - subscription.cancel(); - return null; - } ); - break; - case RECORD_STREAM_EXHAUSTED: - result = CompletableFuture.completedFuture( null ); - break; - default: - result = new CompletableFuture<>(); - result.toCompletableFuture() - .completeExceptionally( new RuntimeException( "Unexpected completion reason: " + completionReason ) ); - } - return result; - } ); - } - private Session createResponse() { return Session.builder().data( Session.SessionBody.builder().id( data.getSessionId() ).build() ).build(); } - private static class DemandConsumer - { - private final RxBlockingSubscriber subscriber; - private final AtomicLong unfulfilledDemandCounter; - @Getter - private final CompletableFuture completedStage = new CompletableFuture<>(); - - private enum CompletionReason - { - REQUESTED_DEMAND_CONSUMED, - RECORD_STREAM_EXHAUSTED - } - - private DemandConsumer( RxBlockingSubscriber subscriber, AtomicLong unfulfilledDemandCounter ) - { - this.subscriber = subscriber; - this.unfulfilledDemandCounter = unfulfilledDemandCounter; - - subscriber.getCompletionStage().whenComplete( this::onComplete ); - long unfulfilledDemand = this.unfulfilledDemandCounter.get(); - if ( unfulfilledDemand == 0 ) - { - completedStage.complete( CompletionReason.REQUESTED_DEMAND_CONSUMED ); - } - else if ( unfulfilledDemand > 0 ) - { - setupNextSignalConsumer(); - } - } - - private void setupNextSignalConsumer() - { - CompletableFuture consumer = new CompletableFuture<>(); - subscriber.setNextSignalConsumer( consumer ); - consumer.whenComplete( this::onNext ); - } - - private void onNext( T ignored, Throwable throwable ) - { - if ( throwable != null ) - { - completedStage.completeExceptionally( throwable ); - return; - } - - if ( unfulfilledDemandCounter.decrementAndGet() > 0 ) - { - setupNextSignalConsumer(); - } - else - { - completedStage.complete( CompletionReason.REQUESTED_DEMAND_CONSUMED ); - } - } - - private void onComplete( Void ignored, Throwable throwable ) - { - if ( throwable != null ) - { - completedStage.completeExceptionally( throwable ); - } - else - { - completedStage.complete( CompletionReason.RECORD_STREAM_EXHAUSTED ); - } - } - } - @Setter @Getter private static class SessionCloseBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index e81fc42ce2..71f39e72ac 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -63,13 +63,11 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_managed_tx_retry$", skipMessage ); - skipMessage = "Requires investigation"; - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_broken_transaction_should_not_break_session$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_does_not_update_last_bookmark_on_failure$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_interwoven_queries$", skipMessage ); + skipMessage = "Does not support multiple concurrent result streams on session level"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage ); } private StartTestBody data; diff --git a/testkit-tests/pom.xml b/testkit-tests/pom.xml index 3e421ea3dd..15b06fd824 100644 --- a/testkit-tests/pom.xml +++ b/testkit-tests/pom.xml @@ -159,7 +159,10 @@ ${project.build.directory}/testkit-rx reactive - --configs 4.2-cluster,4.0-community,4.1-enterprise ${testkit.args} + --configs 4.0-enterprise-neo4j 4.1-enterprise-neo4j 4.2-community-bolt 4.2-community-neo4j + 4.2-enterprise-bolt 4.2-enterprise-neo4j 4.2-enterprise-cluster-neo4j 4.3-community-bolt 4.3-community-neo4j + 4.3-enterprise-bolt 4.3-enterprise-neo4j 4.3-enterprise-cluster-neo4j ${testkit.args} + ${testkit.rx.name.pattern}>