diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
index daa2992..5757a6b 100644
--- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
@@ -925,14 +925,11 @@ void addDependent() {
protected final Publisher mapSegments(
Class segmentType, Function super T, U> segmentMapper) {
- @SuppressWarnings("unchecked")
- Publisher removeDependent = (Publisher) dependentCounter.decrement();
+ Publisher removeDependent = dependentCounter.decrement();
- return Flux.concatDelayError(
+ return Publishers.concatTerminal(
mapDependentSegments(segmentType, segmentMapper),
- removeDependent)
- .doOnCancel(() ->
- Mono.from(removeDependent).subscribe());
+ removeDependent);
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
index 42e575e..0651f80 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
@@ -1026,18 +1026,16 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
*/
final Publisher execute() {
- Mono deallocate =
- Mono.from(deallocate()).cast(OracleResultImpl.class);
+ Publisher deallocate = deallocate();
- return Flux.concatDelayError(
+ return Publishers.concatTerminal(
Mono.from(bind())
.thenMany(executeJdbc())
.map(this::getWarnings)
.onErrorResume(R2dbcException.class, r2dbcException ->
Mono.just(createErrorResult(r2dbcException)))
.doOnNext(OracleResultImpl::addDependent),
- deallocate)
- .doOnCancel(deallocate::subscribe);
+ deallocate);
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/Publishers.java b/src/main/java/oracle/r2dbc/impl/Publishers.java
new file mode 100644
index 0000000..d395ed2
--- /dev/null
+++ b/src/main/java/oracle/r2dbc/impl/Publishers.java
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2020, 2022, Oracle and/or its affiliates.
+
+ This software is dual-licensed to you under the Universal Permissive License
+ (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
+ 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
+ either license.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package oracle.r2dbc.impl;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Factory methods that create a {@code Publisher}. These methods cover special
+ * cases which are not already supported by Project Reactor.
+ */
+class Publishers {
+
+ private Publishers() {}
+
+ /**
+ * A publisher that immediately emits onNext and onComplete to subscribers
+ */
+ private static final Publisher
+ * @param publisher First publisher which is subscribed to.
+ * @param onTerminationPublisher Publisher which is subscribed to when the
+ * first publisher terminates, or a subcription is cancelled.
+ * @return The concatenated publisher.
+ * @param Type of objects emitted to onNext
+ */
+ static Publisher concatTerminal(
+ Publisher publisher, Publisher onTerminationPublisher) {
+ return Flux.usingWhen(
+ COMPLETED_PUBLISHER,
+ ignored -> publisher,
+ ignored -> onTerminationPublisher);
+ }
+}