From 8b05ca0378f96c99f017346eabb14c84c14a5a56 Mon Sep 17 00:00:00 2001
From: "Michael A. McMahon"
Date: Tue, 19 Sep 2023 17:07:20 -0700
Subject: [PATCH 1/2] Use concatTerminal Publisher
---
.../oracle/r2dbc/impl/OracleResultImpl.java | 9 +--
.../r2dbc/impl/OracleStatementImpl.java | 8 +-
.../java/oracle/r2dbc/impl/Publishers.java | 78 +++++++++++++++++++
3 files changed, 84 insertions(+), 11 deletions(-)
create mode 100644 src/main/java/oracle/r2dbc/impl/Publishers.java
diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
index daa2992..5757a6b 100644
--- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
@@ -925,14 +925,11 @@ void addDependent() {
protected final Publisher mapSegments(
Class segmentType, Function super T, U> segmentMapper) {
- @SuppressWarnings("unchecked")
- Publisher removeDependent = (Publisher) dependentCounter.decrement();
+ Publisher removeDependent = dependentCounter.decrement();
- return Flux.concatDelayError(
+ return Publishers.concatTerminal(
mapDependentSegments(segmentType, segmentMapper),
- removeDependent)
- .doOnCancel(() ->
- Mono.from(removeDependent).subscribe());
+ removeDependent);
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
index 42e575e..0651f80 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
@@ -1026,18 +1026,16 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
*/
final Publisher execute() {
- Mono deallocate =
- Mono.from(deallocate()).cast(OracleResultImpl.class);
+ Publisher deallocate = deallocate();
- return Flux.concatDelayError(
+ return Publishers.concatTerminal(
Mono.from(bind())
.thenMany(executeJdbc())
.map(this::getWarnings)
.onErrorResume(R2dbcException.class, r2dbcException ->
Mono.just(createErrorResult(r2dbcException)))
.doOnNext(OracleResultImpl::addDependent),
- deallocate)
- .doOnCancel(deallocate::subscribe);
+ deallocate);
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/Publishers.java b/src/main/java/oracle/r2dbc/impl/Publishers.java
new file mode 100644
index 0000000..e354215
--- /dev/null
+++ b/src/main/java/oracle/r2dbc/impl/Publishers.java
@@ -0,0 +1,78 @@
+/*
+ Copyright (c) 2020, 2022, Oracle and/or its affiliates.
+
+ This software is dual-licensed to you under the Universal Permissive License
+ (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
+ 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
+ either license.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package oracle.r2dbc.impl;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Factory methods that create a {@code Publisher}. These methods cover special
+ * cases which are not already supported by Project Reactor.
+ */
+class Publishers {
+
+ private Publishers() {}
+
+ /**
+ * A publisher that immediately emits onNext and onComplete to subscribers
+ */
+ private static final Publisher
+ * @param publisher First publisher which is subscribed to.
+ * @param onTerminationPublisher Publisher which is subscribed to when the
+ * first publisher terminates, or a subcription is cancelled.
+ * @return The concatenated publisher.
+ * @param Type of objects emitted to onNext
+ */
+ static Publisher concatTerminal(
+ Publisher publisher, Publisher onTerminationPublisher) {
+ return Flux.usingWhen(
+ COMPLETED_PUBLISHER,
+ ignored -> publisher,
+ ignored -> onTerminationPublisher);
+ }
+}
From ec6976e253e8f8f51396e7141a6d8071b06edf1d Mon Sep 17 00:00:00 2001
From: "Michael A. McMahon"
Date: Tue, 19 Sep 2023 17:23:43 -0700
Subject: [PATCH 2/2] Fix typo
---
src/main/java/oracle/r2dbc/impl/Publishers.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/oracle/r2dbc/impl/Publishers.java b/src/main/java/oracle/r2dbc/impl/Publishers.java
index e354215..d395ed2 100644
--- a/src/main/java/oracle/r2dbc/impl/Publishers.java
+++ b/src/main/java/oracle/r2dbc/impl/Publishers.java
@@ -46,7 +46,7 @@ private Publishers() {}
* {@code onTerminationPublisher} emits an error, it will suppress any error
* emitted by the first {@code publisher}. If a subscription to the returned
* publisher is cancelled, the {@code onTerminationPublisher} is subscribed to
- * it can not emit any error through the cancelled subscription.
+ * but it can not emit any error through the cancelled subscription.
*