Skip to content

Commit fc1c5bf

Browse files
mp911degregturn
authored andcommitted
Add DiscardOnCancel operator
FluxDiscardOnCancel replays source signals unless cancelling the subscription. On cancellation, the subscriber requests Long.MAX_VALUE to drain the source and discard elements that are emitted afterwards. [closes #222]
1 parent f01cc3e commit fc1c5bf

9 files changed

+483
-8
lines changed

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.util.ReferenceCountUtil;
20+
import io.netty.util.ReferenceCounted;
1921
import io.r2dbc.postgresql.api.PostgresqlStatement;
2022
import io.r2dbc.postgresql.client.Binding;
2123
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
@@ -26,6 +28,7 @@
2628
import io.r2dbc.postgresql.message.backend.NoData;
2729
import io.r2dbc.postgresql.util.Assert;
2830
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
31+
import io.r2dbc.postgresql.util.Operators;
2932
import reactor.core.publisher.Flux;
3033

3134
import java.util.ArrayList;
@@ -177,11 +180,16 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
177180

178181
ExceptionFactory factory = ExceptionFactory.withSql(sql);
179182
return this.statementCache.getName(this.bindings.first(), sql)
180-
.flatMapMany(name -> ExtendedQueryMessageFlow
181-
.execute(Flux.fromIterable(this.bindings.bindings), this.context.getClient(), this.portalNameSupplier, name, sql, this.forceBinary))
183+
.flatMapMany(name -> {
184+
return ExtendedQueryMessageFlow
185+
.execute(Flux.fromIterable(this.bindings.bindings), this.context.getClient(), this.portalNameSupplier, name, sql, this.forceBinary)
186+
})
182187
.filter(RESULT_FRAME_FILTER)
183188
.windowUntil(CloseComplete.class::isInstance)
184-
.map(messages -> PostgresqlResult.toResult(this.context, messages, factory));
189+
.map(messages -> PostgresqlResult.toResult(this.context, messages, factory))
190+
.cast(io.r2dbc.postgresql.api.PostgresqlResult.class)
191+
.as(Operators::discardOnCancel)
192+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
185193
}
186194

187195
private int getIndex(String identifier) {

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.r2dbc.postgresql.client.TransactionStatus;
2626
import io.r2dbc.postgresql.codec.Codecs;
2727
import io.r2dbc.postgresql.util.Assert;
28+
import io.r2dbc.postgresql.util.Operators;
2829
import io.r2dbc.spi.Connection;
2930
import io.r2dbc.spi.IsolationLevel;
3031
import io.r2dbc.spi.ValidationDepth;
@@ -337,6 +338,7 @@ private static Function<TransactionStatus, String> getTransactionIsolationLevelQ
337338

338339
private Mono<Void> useTransactionStatus(Function<TransactionStatus, Publisher<?>> f) {
339340
return Flux.defer(() -> f.apply(this.client.getTransactionStatus()))
341+
.as(Operators::discardOnCancel)
340342
.then();
341343
}
342344

@@ -345,9 +347,10 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
345347
}
346348

347349
private Publisher<?> exchange(String sql) {
348-
ExceptionFactory exceptionFactory = ExceptionFactory.withSql("BEGIN");
350+
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
349351
return SimpleQueryMessageFlow.exchange(this.client, sql)
350-
.handle(exceptionFactory::handleErrorResponse);
352+
.handle(exceptionFactory::handleErrorResponse)
353+
.as(Operators::discardOnCancel);
351354
}
352355

353356
/**

src/main/java/io/r2dbc/postgresql/PostgresqlResult.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.util.AbstractReferenceCounted;
1920
import io.netty.util.ReferenceCountUtil;
21+
import io.netty.util.ReferenceCounted;
2022
import io.r2dbc.postgresql.message.backend.BackendMessage;
2123
import io.r2dbc.postgresql.message.backend.CommandComplete;
2224
import io.r2dbc.postgresql.message.backend.DataRow;
@@ -38,7 +40,7 @@
3840
/**
3941
* An implementation of {@link Result} representing the results of a query against a PostgreSQL database.
4042
*/
41-
final class PostgresqlResult implements io.r2dbc.postgresql.api.PostgresqlResult {
43+
final class PostgresqlResult extends AbstractReferenceCounted implements io.r2dbc.postgresql.api.PostgresqlResult {
4244

4345
private static final Predicate<BackendMessage> TAKE_UNTIL = or(CommandComplete.class::isInstance, EmptyQueryResponse.class::isInstance, PortalSuspended.class::isInstance);
4446

@@ -104,6 +106,18 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
104106
});
105107
}
106108

109+
@Override
110+
protected void deallocate() {
111+
112+
// drain messages for cleanup
113+
this.getRowsUpdated().subscribe();
114+
}
115+
116+
@Override
117+
public ReferenceCounted touch(Object hint) {
118+
return this;
119+
}
120+
107121
@Override
108122
public String toString() {
109123
return "PostgresqlResult{" +

src/main/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatement.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.util.ReferenceCountUtil;
20+
import io.netty.util.ReferenceCounted;
1921
import io.r2dbc.postgresql.api.PostgresqlStatement;
2022
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
2123
import io.r2dbc.postgresql.message.backend.BackendMessage;
@@ -24,6 +26,7 @@
2426
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2527
import io.r2dbc.postgresql.util.Assert;
2628
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
29+
import io.r2dbc.postgresql.util.Operators;
2730
import reactor.core.publisher.Flux;
2831
import reactor.util.annotation.Nullable;
2932

@@ -120,7 +123,10 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
120123
return SimpleQueryMessageFlow
121124
.exchange(this.context.getClient(), sql)
122125
.windowUntil(WINDOW_UNTIL)
123-
.map(dataRow -> PostgresqlResult.toResult(this.context, dataRow, factory));
126+
.map(dataRow -> PostgresqlResult.toResult(this.context, dataRow, factory))
127+
.cast(io.r2dbc.postgresql.api.PostgresqlResult.class)
128+
.as(Operators::discardOnCancel)
129+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
124130
}
125131

126132
}

src/main/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2019-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import reactor.core.CoreSubscriber;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.FluxOperator;
24+
import reactor.core.publisher.Operators;
25+
import reactor.util.Logger;
26+
import reactor.util.Loggers;
27+
import reactor.util.context.Context;
28+
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
/**
32+
* A decorating operator that replays signals from its source to a {@link Subscriber} and drains the source upon {@link Subscription#cancel() cancel} and drops data signals until termination.
33+
* Draining data is required to complete a particular request/response window and clear the protocol state as client code expects to start a request/response conversation without any previous
34+
* response state.
35+
*/
36+
class FluxDiscardOnCancel<T> extends FluxOperator<T, T> {
37+
38+
private static final Logger logger = Loggers.getLogger(FluxDiscardOnCancel.class);
39+
40+
private final Runnable cancelConsumer;
41+
42+
FluxDiscardOnCancel(Flux<? extends T> source, Runnable cancelConsumer) {
43+
super(source);
44+
this.cancelConsumer = cancelConsumer;
45+
}
46+
47+
@Override
48+
public void subscribe(CoreSubscriber<? super T> actual) {
49+
this.source.subscribe(new FluxDiscardOnCancelSubscriber<>(actual, this.cancelConsumer));
50+
}
51+
52+
static class FluxDiscardOnCancelSubscriber<T> extends AtomicBoolean implements CoreSubscriber<T>, Subscription {
53+
54+
final CoreSubscriber<T> actual;
55+
56+
final Context ctx;
57+
58+
final Runnable cancelConsumer;
59+
60+
Subscription s;
61+
62+
FluxDiscardOnCancelSubscriber(CoreSubscriber<T> actual, Runnable cancelConsumer) {
63+
64+
this.actual = actual;
65+
this.ctx = actual.currentContext();
66+
this.cancelConsumer = cancelConsumer;
67+
}
68+
69+
@Override
70+
public void onSubscribe(Subscription s) {
71+
72+
if (Operators.validate(this.s, s)) {
73+
this.s = s;
74+
this.actual.onSubscribe(this);
75+
}
76+
}
77+
78+
@Override
79+
public void onNext(T t) {
80+
81+
if (this.get()) {
82+
Operators.onDiscard(t, this.ctx);
83+
return;
84+
}
85+
86+
this.actual.onNext(t);
87+
}
88+
89+
@Override
90+
public void onError(Throwable t) {
91+
if (this.get()) {
92+
Operators.onErrorDropped(t, this.ctx);
93+
} else {
94+
this.actual.onError(t);
95+
}
96+
}
97+
98+
@Override
99+
public void onComplete() {
100+
if (!this.get()) {
101+
this.actual.onComplete();
102+
}
103+
}
104+
105+
@Override
106+
public void request(long n) {
107+
this.s.request(n);
108+
}
109+
110+
@Override
111+
public void cancel() {
112+
113+
if (compareAndSet(false, true)) {
114+
if (logger.isDebugEnabled()) {
115+
logger.debug("received cancel signal");
116+
}
117+
try {
118+
this.cancelConsumer.run();
119+
} catch (Exception e) {
120+
Operators.onErrorDropped(e, this.ctx);
121+
}
122+
this.s.request(Long.MAX_VALUE);
123+
}
124+
}
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2019-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import org.reactivestreams.Subscription;
20+
import reactor.core.publisher.Flux;
21+
22+
/**
23+
* Operator utility.
24+
*
25+
* @since 0.8.1
26+
*/
27+
public final class Operators {
28+
29+
private Operators() {
30+
}
31+
32+
/**
33+
* Replay signals from {@link Flux the source} until cancellation. Drains the source for data signals if the subscriber cancels the subscription.
34+
* <p>
35+
* Draining data is required to complete a particular request/response window and clear the protocol state as client code expects to start a request/response conversation without leaving
36+
* previous frames on the stack.
37+
*
38+
* @param source the source to decorate.
39+
* @param <T> The type of values in both source and output sequences.
40+
* @return decorated {@link Flux}.
41+
*/
42+
public static <T> Flux<T> discardOnCancel(Flux<? extends T> source) {
43+
return new FluxDiscardOnCancel<>(source, () -> {
44+
});
45+
}
46+
47+
/**
48+
* Replay signals from {@link Flux the source} until cancellation. Drains the source for data signals if the subscriber cancels the subscription.
49+
* <p>
50+
* Draining data is required to complete a particular request/response window and clear the protocol state as client code expects to start a request/response conversation without leaving
51+
* previous frames on the stack.
52+
* <p>Propagates the {@link Subscription#cancel()} signal to a {@link Runnable consumer}.
53+
*
54+
* @param source the source to decorate.
55+
* @param cancelConsumer {@link Runnable} notified when the resulting {@link Flux} receives a {@link Subscription#cancel() cancel} signal.
56+
* @param <T> The type of values in both source and output sequences.
57+
* @return decorated {@link Flux}.
58+
*/
59+
public static <T> Flux<T> discardOnCancel(Flux<? extends T> source, Runnable cancelConsumer) {
60+
return new FluxDiscardOnCancel<>(source, cancelConsumer);
61+
}
62+
}

0 commit comments

Comments
 (0)