Skip to content

Commit 852f641

Browse files
Merge pull request #134 from oracle/133-operator-terminated
Fix "Operator has been terminated"
2 parents 8e923ae + ec6976e commit 852f641

File tree

3 files changed

+84
-11
lines changed

3 files changed

+84
-11
lines changed

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -925,14 +925,11 @@ void addDependent() {
925925
protected final <T extends Segment, U> Publisher<U> mapSegments(
926926
Class<T> segmentType, Function<? super T, U> segmentMapper) {
927927

928-
@SuppressWarnings("unchecked")
929-
Publisher<U> removeDependent = (Publisher<U>) dependentCounter.decrement();
928+
Publisher<Void> removeDependent = dependentCounter.decrement();
930929

931-
return Flux.concatDelayError(
930+
return Publishers.concatTerminal(
932931
mapDependentSegments(segmentType, segmentMapper),
933-
removeDependent)
934-
.doOnCancel(() ->
935-
Mono.from(removeDependent).subscribe());
932+
removeDependent);
936933
}
937934

938935
/**

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,18 +1026,16 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
10261026
*/
10271027
final Publisher<OracleResultImpl> execute() {
10281028

1029-
Mono<OracleResultImpl> deallocate =
1030-
Mono.from(deallocate()).cast(OracleResultImpl.class);
1029+
Publisher<Void> deallocate = deallocate();
10311030

1032-
return Flux.concatDelayError(
1031+
return Publishers.concatTerminal(
10331032
Mono.from(bind())
10341033
.thenMany(executeJdbc())
10351034
.map(this::getWarnings)
10361035
.onErrorResume(R2dbcException.class, r2dbcException ->
10371036
Mono.just(createErrorResult(r2dbcException)))
10381037
.doOnNext(OracleResultImpl::addDependent),
1039-
deallocate)
1040-
.doOnCancel(deallocate::subscribe);
1038+
deallocate);
10411039
}
10421040

10431041
/**
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright (c) 2020, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package oracle.r2dbc.impl;
23+
24+
import org.reactivestreams.Publisher;
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
28+
/**
29+
* Factory methods that create a {@code Publisher}. These methods cover special
30+
* cases which are not already supported by Project Reactor.
31+
*/
32+
class Publishers {
33+
34+
private Publishers() {}
35+
36+
/**
37+
* A publisher that immediately emits onNext and onComplete to subscribers
38+
*/
39+
private static final Publisher<Object> COMPLETED_PUBLISHER =
40+
Mono.just(new Object());
41+
42+
/**
43+
* <p>
44+
* Returns a publisher that emits the concatenated signals of a
45+
* {@code publisher} and {@code onTerminationPublisher}. If the
46+
* {@code onTerminationPublisher} emits an error, it will suppress any error
47+
* emitted by the first {@code publisher}. If a subscription to the returned
48+
* publisher is cancelled, the {@code onTerminationPublisher} is subscribed to
49+
* but it can not emit any error through the cancelled subscription.
50+
* </p><p>
51+
* The returned publisher behaves similarly to: <pre>{@code
52+
* Flux.concatDelayError(
53+
* publisher,
54+
* onTerminationPublisher)
55+
* .doOnCancel(onTerminationPublisher::subscribe)
56+
* }</pre>
57+
* However, the code above can result in:
58+
* <pre>
59+
* reactor.core.Exceptions$StaticThrowable: Operator has been terminated
60+
* </pre>
61+
* This seems to happen when the concatDelayError publisher receives a cancel
62+
* from a downstream subscriber after it has already received onComplete from
63+
* a upstream publisher.
64+
* </p>
65+
* @param publisher First publisher which is subscribed to.
66+
* @param onTerminationPublisher Publisher which is subscribed to when the
67+
* first publisher terminates, or a subcription is cancelled.
68+
* @return The concatenated publisher.
69+
* @param <T> Type of objects emitted to onNext
70+
*/
71+
static <T> Publisher<T> concatTerminal(
72+
Publisher<T> publisher, Publisher<Void> onTerminationPublisher) {
73+
return Flux.usingWhen(
74+
COMPLETED_PUBLISHER,
75+
ignored -> publisher,
76+
ignored -> onTerminationPublisher);
77+
}
78+
}

0 commit comments

Comments
 (0)