Skip to content

Commit 5a654b0

Browse files
mp911degregturn
authored andcommitted
Add DiscardOnCancel operator
FluxDiscardOnCancel replays source signals unless cancelling the subscription. On cancellation, the subscriber requests Long.MAX_VALUE to drain the source and discard elements that are emitted afterwards. [closes #222]
1 parent b565065 commit 5a654b0

9 files changed

+487
-12
lines changed

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.util.ReferenceCountUtil;
20+
import io.netty.util.ReferenceCounted;
1921
import io.r2dbc.postgresql.api.PostgresqlStatement;
2022
import io.r2dbc.postgresql.client.Binding;
2123
import io.r2dbc.postgresql.client.Client;
@@ -28,6 +30,7 @@
2830
import io.r2dbc.postgresql.message.backend.NoData;
2931
import io.r2dbc.postgresql.util.Assert;
3032
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
33+
import io.r2dbc.postgresql.util.Operators;
3134
import reactor.core.publisher.Flux;
3235

3336
import java.util.ArrayList;
@@ -182,11 +185,16 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
182185

183186
ExceptionFactory factory = ExceptionFactory.withSql(sql);
184187
return this.statementCache.getName(this.bindings.first(), sql)
185-
.flatMapMany(name -> ExtendedQueryMessageFlow
186-
.execute(Flux.fromIterable(this.bindings.bindings), this.client, this.portalNameSupplier, name, sql, this.forceBinary))
188+
.flatMapMany(name -> {
189+
return ExtendedQueryMessageFlow
190+
.execute(Flux.fromIterable(this.bindings.bindings), this.client, this.portalNameSupplier, name, sql, this.forceBinary);
191+
})
187192
.filter(RESULT_FRAME_FILTER)
188193
.windowUntil(CloseComplete.class::isInstance)
189-
.map(messages -> PostgresqlResult.toResult(this.codecs, messages, factory));
194+
.map(messages -> PostgresqlResult.toResult(this.codecs, messages, factory))
195+
.cast(io.r2dbc.postgresql.api.PostgresqlResult.class)
196+
.as(Operators::discardOnCancel)
197+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
190198
}
191199

192200
private int getIndex(String identifier) {

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import io.r2dbc.postgresql.client.TransactionStatus;
2626
import io.r2dbc.postgresql.codec.Codecs;
2727
import io.r2dbc.postgresql.util.Assert;
28+
import io.r2dbc.postgresql.util.Operators;
2829
import io.r2dbc.spi.Connection;
2930
import io.r2dbc.spi.IsolationLevel;
3031
import io.r2dbc.spi.ValidationDepth;
@@ -334,6 +335,7 @@ private static Function<TransactionStatus, String> getTransactionIsolationLevelQ
334335

335336
private Mono<Void> useTransactionStatus(Function<TransactionStatus, Publisher<?>> f) {
336337
return Flux.defer(() -> f.apply(this.client.getTransactionStatus()))
338+
.as(Operators::discardOnCancel)
337339
.then();
338340
}
339341

@@ -342,9 +344,10 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
342344
}
343345

344346
private Publisher<?> exchange(String sql) {
345-
ExceptionFactory exceptionFactory = ExceptionFactory.withSql("BEGIN");
347+
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
346348
return SimpleQueryMessageFlow.exchange(this.client, sql)
347-
.handle(exceptionFactory::handleErrorResponse);
349+
.handle(exceptionFactory::handleErrorResponse)
350+
.as(Operators::discardOnCancel);
348351
}
349352

350353
/**

src/main/java/io/r2dbc/postgresql/PostgresqlResult.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,9 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.util.AbstractReferenceCounted;
1920
import io.netty.util.ReferenceCountUtil;
21+
import io.netty.util.ReferenceCounted;
2022
import io.r2dbc.postgresql.codec.Codecs;
2123
import io.r2dbc.postgresql.message.backend.BackendMessage;
2224
import io.r2dbc.postgresql.message.backend.CommandComplete;
@@ -39,7 +41,7 @@
3941
/**
4042
* An implementation of {@link Result} representing the results of a query against a PostgreSQL database.
4143
*/
42-
final class PostgresqlResult implements io.r2dbc.postgresql.api.PostgresqlResult {
44+
final class PostgresqlResult extends AbstractReferenceCounted implements io.r2dbc.postgresql.api.PostgresqlResult {
4345

4446
private static final Predicate<BackendMessage> TAKE_UNTIL = or(CommandComplete.class::isInstance, EmptyQueryResponse.class::isInstance, PortalSuspended.class::isInstance);
4547

@@ -105,6 +107,18 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
105107
});
106108
}
107109

110+
@Override
111+
protected void deallocate() {
112+
113+
// drain messages for cleanup
114+
this.getRowsUpdated().subscribe();
115+
}
116+
117+
@Override
118+
public ReferenceCounted touch(Object hint) {
119+
return this;
120+
}
121+
108122
@Override
109123
public String toString() {
110124
return "PostgresqlResult{" +

src/main/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatement.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.netty.util.ReferenceCountUtil;
20+
import io.netty.util.ReferenceCounted;
1921
import io.r2dbc.postgresql.api.PostgresqlStatement;
2022
import io.r2dbc.postgresql.client.Client;
2123
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
@@ -26,6 +28,7 @@
2628
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2729
import io.r2dbc.postgresql.util.Assert;
2830
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
31+
import io.r2dbc.postgresql.util.Operators;
2932
import reactor.core.publisher.Flux;
3033
import reactor.util.annotation.Nullable;
3134

@@ -126,7 +129,10 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
126129
return SimpleQueryMessageFlow
127130
.exchange(this.client, sql)
128131
.windowUntil(WINDOW_UNTIL)
129-
.map(dataRow -> PostgresqlResult.toResult(this.codecs, dataRow, factory));
132+
.map(dataRow -> PostgresqlResult.toResult(this.codecs, dataRow, factory))
133+
.cast(io.r2dbc.postgresql.api.PostgresqlResult.class)
134+
.as(Operators::discardOnCancel)
135+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
130136
}
131137

132138
}

src/main/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2019-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import reactor.core.CoreSubscriber;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.FluxOperator;
24+
import reactor.core.publisher.Operators;
25+
import reactor.util.Logger;
26+
import reactor.util.Loggers;
27+
import reactor.util.context.Context;
28+
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
/**
32+
* A decorating operator that replays signals from its source to a {@link Subscriber} and drains the source upon {@link Subscription#cancel() cancel} and drops data signals until termination.
33+
* Draining data is required to complete a particular request/response window and clear the protocol state as client code expects to start a request/response conversation without any previous
34+
* response state.
35+
*/
36+
class FluxDiscardOnCancel<T> extends FluxOperator<T, T> {
37+
38+
private static final Logger logger = Loggers.getLogger(FluxDiscardOnCancel.class);
39+
40+
private final Runnable cancelConsumer;
41+
42+
FluxDiscardOnCancel(Flux<? extends T> source, Runnable cancelConsumer) {
43+
super(source);
44+
this.cancelConsumer = cancelConsumer;
45+
}
46+
47+
@Override
48+
public void subscribe(CoreSubscriber<? super T> actual) {
49+
this.source.subscribe(new FluxDiscardOnCancelSubscriber<>(actual, this.cancelConsumer));
50+
}
51+
52+
static class FluxDiscardOnCancelSubscriber<T> extends AtomicBoolean implements CoreSubscriber<T>, Subscription {
53+
54+
final CoreSubscriber<T> actual;
55+
56+
final Context ctx;
57+
58+
final Runnable cancelConsumer;
59+
60+
Subscription s;
61+
62+
FluxDiscardOnCancelSubscriber(CoreSubscriber<T> actual, Runnable cancelConsumer) {
63+
64+
this.actual = actual;
65+
this.ctx = actual.currentContext();
66+
this.cancelConsumer = cancelConsumer;
67+
}
68+
69+
@Override
70+
public void onSubscribe(Subscription s) {
71+
72+
if (Operators.validate(this.s, s)) {
73+
this.s = s;
74+
this.actual.onSubscribe(this);
75+
}
76+
}
77+
78+
@Override
79+
public void onNext(T t) {
80+
81+
if (this.get()) {
82+
Operators.onDiscard(t, this.ctx);
83+
return;
84+
}
85+
86+
this.actual.onNext(t);
87+
}
88+
89+
@Override
90+
public void onError(Throwable t) {
91+
if (this.get()) {
92+
Operators.onErrorDropped(t, this.ctx);
93+
} else {
94+
this.actual.onError(t);
95+
}
96+
}
97+
98+
@Override
99+
public void onComplete() {
100+
if (!this.get()) {
101+
this.actual.onComplete();
102+
}
103+
}
104+
105+
@Override
106+
public void request(long n) {
107+
this.s.request(n);
108+
}
109+
110+
@Override
111+
public void cancel() {
112+
113+
if (compareAndSet(false, true)) {
114+
if (logger.isDebugEnabled()) {
115+
logger.debug("received cancel signal");
116+
}
117+
try {
118+
this.cancelConsumer.run();
119+
} catch (Exception e) {
120+
Operators.onErrorDropped(e, this.ctx);
121+
}
122+
this.s.request(Long.MAX_VALUE);
123+
}
124+
}
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2019-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.util;
18+
19+
import org.reactivestreams.Subscription;
20+
import reactor.core.publisher.Flux;
21+
22+
/**
23+
* Operator utility.
24+
*
25+
* @since 0.8.1
26+
*/
27+
public final class Operators {
28+
29+
private Operators() {
30+
}
31+
32+
/**
33+
* Replay signals from {@link Flux the source} until cancellation. Drains the source for data signals if the subscriber cancels the subscription.
34+
* <p>
35+
* Draining data is required to complete a particular request/response window and clear the protocol state as client code expects to start a request/response conversation without leaving
36+
* previous frames on the stack.
37+
*
38+
* @param source the source to decorate.
39+
* @param <T> The type of values in both source and output sequences.
40+
* @return decorated {@link Flux}.
41+
*/
42+
public static <T> Flux<T> discardOnCancel(Flux<? extends T> source) {
43+
return new FluxDiscardOnCancel<>(source, () -> {
44+
});
45+
}
46+
47+
/**
48+
* Replay signals from {@link Flux the source} until cancellation. Drains the source for data signals if the subscriber cancels the subscription.
49+
* <p>
50+
* Draining data is required to complete a particular request/response window and clear the protocol state as client code expects to start a request/response conversation without leaving
51+
* previous frames on the stack.
52+
* <p>Propagates the {@link Subscription#cancel()} signal to a {@link Runnable consumer}.
53+
*
54+
* @param source the source to decorate.
55+
* @param cancelConsumer {@link Runnable} notified when the resulting {@link Flux} receives a {@link Subscription#cancel() cancel} signal.
56+
* @param <T> The type of values in both source and output sequences.
57+
* @return decorated {@link Flux}.
58+
*/
59+
public static <T> Flux<T> discardOnCancel(Flux<? extends T> source, Runnable cancelConsumer) {
60+
return new FluxDiscardOnCancel<>(source, cancelConsumer);
61+
}
62+
}

0 commit comments

Comments
 (0)