Skip to content

Update the new reactive API to use Flow API #1295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AbstractReactiveSession(NetworkSession session) {

abstract Publisher<Void> closeTransaction(S transaction, boolean commit);

public Publisher<S> beginTransaction(TransactionConfig config) {
Publisher<S> doBeginTransaction(TransactionConfig config) {
return createSingleItemPublisher(
() -> {
CompletableFuture<S> txFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -115,7 +115,7 @@ public Set<Bookmark> lastBookmarks() {
return session.lastBookmarks();
}

public <T> Publisher<T> close() {
<T> Publisher<T> doClose() {
return createEmptyPublisher(session::closeAsync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
this.tx = tx;
}

public <T> Publisher<T> commit() {
<T> Publisher<T> doCommit() {
return createEmptyPublisher(tx::commitAsync);
}

public <T> Publisher<T> rollback() {
<T> Publisher<T> doRollback() {
return createEmptyPublisher(tx::rollbackAsync);
}

public Publisher<Void> close() {
Publisher<Void> doClose() {
return close(false);
}

public Publisher<Boolean> isOpen() {
Publisher<Boolean> doIsOpen() {
return Mono.just(tx.isOpen());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.neo4j.driver.internal.reactive;

import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.util.Map;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
Expand All @@ -27,7 +30,6 @@
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.reactive.ReactiveQueryRunner;
import org.neo4j.driver.reactive.ReactiveResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
Expand All @@ -37,7 +39,7 @@ default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
Query query = new Query(queryStr, parameters);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
return publisherToFlowPublisher(Mono.error(t));
}
}

Expand All @@ -57,7 +59,7 @@ default Publisher<ReactiveResult> run(String queryStr) {
Query query = new Query(queryStr);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
return publisherToFlowPublisher(Mono.error(t));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.neo4j.driver.internal.reactive;

import java.util.Map;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.neo4j.driver.reactive.ReactiveTransactionContext;
import org.reactivestreams.Publisher;

final class DelegatingReactiveTransactionContext implements ReactiveTransactionContext {
private final ReactiveTransaction delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
package org.neo4j.driver.internal.reactive;

import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
import static reactor.core.publisher.FluxSink.OverflowStrategy.IGNORE;

import java.util.List;
import java.util.concurrent.Flow.Publisher;
import java.util.function.BiConsumer;
import org.neo4j.driver.Record;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
Expand All @@ -47,7 +48,7 @@ public List<String> keys() {

@Override
public Publisher<Record> records() {
return Flux.create(
return publisherToFlowPublisher(Flux.create(
sink -> {
if (cursor.isDone()) {
sink.error(newResultConsumedError());
Expand All @@ -57,24 +58,25 @@ public Publisher<Record> records() {
sink.onRequest(cursor::request);
}
},
IGNORE);
IGNORE));
}

@Override
public Publisher<ResultSummary> consume() {
return Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> {
Throwable error = Futures.completionExceptionCause(summaryCompletionError);
if (summary != null) {
sink.success(summary);
} else {
sink.error(error);
}
}));
return publisherToFlowPublisher(
Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> {
Throwable error = Futures.completionExceptionCause(summaryCompletionError);
if (summary != null) {
sink.success(summary);
} else {
sink.error(error);
}
})));
}

@Override
public Publisher<Boolean> isOpen() {
return Mono.just(!cursor.isDone());
return publisherToFlowPublisher(Mono.just(!cursor.isDone()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.neo4j.driver.internal.reactive;

import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
Expand All @@ -33,7 +37,6 @@
import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.neo4j.driver.reactive.ReactiveTransactionCallback;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
Expand All @@ -48,22 +51,31 @@ ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction)
}

@Override
Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
return ((InternalReactiveTransaction) transaction).close(commit);
}

@Override
public Publisher<ReactiveTransaction> beginTransaction(TransactionConfig config) {
return publisherToFlowPublisher(doBeginTransaction(config));
}

@Override
public <T> Publisher<T> executeRead(
ReactiveTransactionCallback<? extends Publisher<T>> callback, TransactionConfig config) {
return runTransaction(
AccessMode.READ, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config);
return publisherToFlowPublisher(runTransaction(
AccessMode.READ,
tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))),
config));
}

@Override
public <T> Publisher<T> executeWrite(
ReactiveTransactionCallback<? extends Publisher<T>> callback, TransactionConfig config) {
return runTransaction(
AccessMode.WRITE, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config);
return publisherToFlowPublisher(runTransaction(
AccessMode.WRITE,
tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))),
config));
}

@Override
Expand All @@ -80,7 +92,7 @@ public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
cursorStage = Futures.failedFuture(t);
}

return Mono.fromCompletionStage(cursorStage)
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
.then(Mono.error(error)))
Expand All @@ -96,11 +108,16 @@ public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
}
return publisher;
})
.map(InternalReactiveResult::new);
.map(InternalReactiveResult::new));
}

@Override
public Set<Bookmark> lastBookmarks() {
return new HashSet<>(session.lastBookmarks());
}

@Override
public <T> Publisher<T> close() {
return publisherToFlowPublisher(doClose());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package org.neo4j.driver.internal.reactive;

import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class InternalReactiveTransaction extends AbstractReactiveTransaction
Expand All @@ -43,7 +45,7 @@ public Publisher<ReactiveResult> run(Query query) {
cursorStage = Futures.failedFuture(t);
}

return Mono.fromCompletionStage(cursorStage)
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
.flatMap(cursor -> {
Mono<RxResultCursor> publisher;
Throwable runError = cursor.getRunError();
Expand All @@ -55,7 +57,7 @@ public Publisher<ReactiveResult> run(Query query) {
}
return publisher;
})
.map(InternalReactiveResult::new);
.map(InternalReactiveResult::new));
}

/**
Expand All @@ -66,6 +68,26 @@ public Publisher<ReactiveResult> run(Query query) {
* @return {@code RESET} response publisher
*/
public Publisher<Void> interrupt() {
return Mono.fromCompletionStage(tx.interruptAsync());
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync()));
}

@Override
public <T> Publisher<T> commit() {
return publisherToFlowPublisher(doCommit());
}

@Override
public <T> Publisher<T> rollback() {
return publisherToFlowPublisher(doRollback());
}

@Override
public Publisher<Void> close() {
return publisherToFlowPublisher(doClose());
}

@Override
public Publisher<Boolean> isOpen() {
return publisherToFlowPublisher(doIsOpen());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal.reactive;

import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.AccessMode;
Expand Down Expand Up @@ -54,6 +52,11 @@ Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
return ((InternalRxTransaction) transaction).close(commit);
}

@Override
public Publisher<RxTransaction> beginTransaction(TransactionConfig config) {
return doBeginTransaction(config);
}

@Override
public <T> Publisher<T> readTransaction(RxTransactionWork<? extends Publisher<T>> work) {
return readTransaction(work, TransactionConfig.empty());
Expand Down Expand Up @@ -128,6 +131,6 @@ public Bookmark lastBookmark() {

@Override
public <T> Publisher<T> close() {
return createEmptyPublisher(session::closeAsync);
return doClose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;
import org.reactivestreams.Publisher;

@Deprecated
public class InternalRxTransaction extends AbstractReactiveTransaction implements RxTransaction {
Expand Down Expand Up @@ -53,4 +54,24 @@ public RxResult run(Query query) {
return cursorFuture;
});
}

@Override
public <T> Publisher<T> commit() {
return doCommit();
}

@Override
public <T> Publisher<T> rollback() {
return doRollback();
}

@Override
public Publisher<Void> close() {
return doClose();
}

@Override
public Publisher<Boolean> isOpen() {
return doIsOpen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.neo4j.driver.reactive;

import java.util.Map;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.reactivestreams.Publisher;

/**
* Common interface for components that can execute Neo4j queries using Reactive API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.neo4j.driver.reactive;

import java.util.List;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* A reactive result provides a reactive way to execute query on the server and receives records back. This reactive result consists of a result key publisher,
Expand Down
Loading