diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java index daa2992..5757a6b 100644 --- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java @@ -925,14 +925,11 @@ void addDependent() { protected final Publisher mapSegments( Class segmentType, Function segmentMapper) { - @SuppressWarnings("unchecked") - Publisher removeDependent = (Publisher) dependentCounter.decrement(); + Publisher removeDependent = dependentCounter.decrement(); - return Flux.concatDelayError( + return Publishers.concatTerminal( mapDependentSegments(segmentType, segmentMapper), - removeDependent) - .doOnCancel(() -> - Mono.from(removeDependent).subscribe()); + removeDependent); } /** diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java index 42e575e..0651f80 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java @@ -1026,18 +1026,16 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) { */ final Publisher execute() { - Mono deallocate = - Mono.from(deallocate()).cast(OracleResultImpl.class); + Publisher deallocate = deallocate(); - return Flux.concatDelayError( + return Publishers.concatTerminal( Mono.from(bind()) .thenMany(executeJdbc()) .map(this::getWarnings) .onErrorResume(R2dbcException.class, r2dbcException -> Mono.just(createErrorResult(r2dbcException))) .doOnNext(OracleResultImpl::addDependent), - deallocate) - .doOnCancel(deallocate::subscribe); + deallocate); } /** diff --git a/src/main/java/oracle/r2dbc/impl/Publishers.java b/src/main/java/oracle/r2dbc/impl/Publishers.java new file mode 100644 index 0000000..d395ed2 --- /dev/null +++ b/src/main/java/oracle/r2dbc/impl/Publishers.java @@ -0,0 +1,78 @@ +/* + Copyright (c) 2020, 2022, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package oracle.r2dbc.impl; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Factory methods that create a {@code Publisher}. These methods cover special + * cases which are not already supported by Project Reactor. + */ +class Publishers { + + private Publishers() {} + + /** + * A publisher that immediately emits onNext and onComplete to subscribers + */ + private static final Publisher COMPLETED_PUBLISHER = + Mono.just(new Object()); + + /** + *

+ * Returns a publisher that emits the concatenated signals of a + * {@code publisher} and {@code onTerminationPublisher}. If the + * {@code onTerminationPublisher} emits an error, it will suppress any error + * emitted by the first {@code publisher}. If a subscription to the returned + * publisher is cancelled, the {@code onTerminationPublisher} is subscribed to + * but it can not emit any error through the cancelled subscription. + *

+ * The returned publisher behaves similarly to:

{@code
+   * Flux.concatDelayError(
+   *   publisher,
+   *   onTerminationPublisher)
+   *   .doOnCancel(onTerminationPublisher::subscribe)
+   * }
+ * However, the code above can result in: + *
+   *   reactor.core.Exceptions$StaticThrowable: Operator has been terminated
+   * 
+ * This seems to happen when the concatDelayError publisher receives a cancel + * from a downstream subscriber after it has already received onComplete from + * a upstream publisher. + *

+ * @param publisher First publisher which is subscribed to. + * @param onTerminationPublisher Publisher which is subscribed to when the + * first publisher terminates, or a subcription is cancelled. + * @return The concatenated publisher. + * @param Type of objects emitted to onNext + */ + static Publisher concatTerminal( + Publisher publisher, Publisher onTerminationPublisher) { + return Flux.usingWhen( + COMPLETED_PUBLISHER, + ignored -> publisher, + ignored -> onTerminationPublisher); + } +}