Skip to content

Commit 53cca10

Browse files
committed
Buffer-less FlatMapCompletionStageProcessor
Signed-off-by: Daniel Kec <[email protected]>
1 parent 52b274b commit 53cca10

File tree

7 files changed

+244
-237
lines changed

7 files changed

+244
-237
lines changed

common/reactive/src/main/java/io/helidon/common/reactive/BaseProcessor.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public abstract class BaseProcessor<T, U> implements Processor<T, U>, Subscripti
3838
private final RequestedCounter requested;
3939
private final AtomicBoolean ready;
4040
private final AtomicBoolean subscribed;
41-
private SubscriberReference<? super U> referencedSubscriber;
4241
private ReentrantLock publisherSequentialLock = new ReentrantLock();
4342
private volatile boolean done;
4443
private Throwable error;
@@ -56,13 +55,11 @@ protected BaseProcessor() {
5655
@Override
5756
public void request(long n) {
5857
StreamValidationUtils.checkRequestParam(n, this::failAndCancel);
59-
StreamValidationUtils.checkRecursionDepth("BaseProcessor.request", 10, () -> {
60-
requested.increment(n, this::failAndCancel);
61-
tryRequest(subscription);
62-
if (done) {
63-
tryComplete();
64-
}
65-
}, (actDepth, t) -> failAndCancel(t));
58+
requested.increment(n, this::failAndCancel);
59+
tryRequest(subscription);
60+
if (done) {
61+
tryComplete();
62+
}
6663
}
6764

6865
@Override
@@ -151,8 +148,6 @@ public void onComplete() {
151148

152149
@Override
153150
public void subscribe(Subscriber<? super U> s) {
154-
// https://github.com/reactive-streams/reactive-streams-jvm#3.13
155-
referencedSubscriber = SubscriberReference.create(s);
156151
try {
157152
publisherSequentialLock.lock();
158153
if (subscriber.register(s)) {
@@ -253,8 +248,6 @@ protected void hookOnComplete() {
253248
*/
254249
protected void hookOnCancel(Flow.Subscription subscription) {
255250
Optional.ofNullable(subscription).ifPresent(Flow.Subscription::cancel);
256-
// https://github.com/reactive-streams/reactive-streams-jvm#3.13
257-
referencedSubscriber.releaseReference();
258251
}
259252

260253
/**

common/reactive/src/main/java/io/helidon/common/reactive/MultiCoupledProcessor.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,16 @@
4343
*/
4444
public class MultiCoupledProcessor<T, R> implements Flow.Processor<T, R>, Multi<R> {
4545

46-
private SubscriberReference<T> passedInSubscriber;
47-
private SubscriberReference<? super R> outletSubscriber;
46+
private SequentialSubscriber<T> passedInSubscriber;
47+
private SequentialSubscriber<? super R> outletSubscriber;
4848
private Flow.Publisher<R> passedInPublisher;
4949
private Flow.Subscriber<? super T> inletSubscriber;
5050
private Flow.Subscription inletSubscription;
5151
private Flow.Subscription passedInPublisherSubscription;
5252
private AtomicBoolean cancelled = new AtomicBoolean(false);
5353

5454
private MultiCoupledProcessor(Flow.Subscriber<T> passedInSubscriber, Flow.Publisher<R> passedInPublisher) {
55-
this.passedInSubscriber = SubscriberReference.create(passedInSubscriber);
55+
this.passedInSubscriber = SequentialSubscriber.create(passedInSubscriber);
5656
this.passedInPublisher = passedInPublisher;
5757
this.inletSubscriber = this;
5858
}
@@ -73,7 +73,7 @@ public static <T, R> MultiCoupledProcessor<T, R> create(Flow.Subscriber<T> passe
7373

7474
@Override
7575
public void subscribe(Flow.Subscriber<? super R> outletSubscriber) {
76-
this.outletSubscriber = SubscriberReference.create(outletSubscriber);
76+
this.outletSubscriber = SequentialSubscriber.create(outletSubscriber);
7777
passedInPublisher.subscribe(new Flow.Subscriber<R>() {
7878

7979
@Override
@@ -137,8 +137,6 @@ public void cancel() {
137137
passedInSubscriber.onComplete();
138138
Optional.ofNullable(inletSubscription).ifPresent(Flow.Subscription::cancel);
139139
passedInPublisherSubscription.cancel();
140-
MultiCoupledProcessor.this.passedInSubscriber.releaseReference();
141-
MultiCoupledProcessor.this.outletSubscriber.releaseReference();
142140
}
143141
});
144142
}
@@ -171,8 +169,6 @@ public void cancel() {
171169
inletSubscription.cancel();
172170
outletSubscriber.onComplete();
173171
passedInPublisherSubscription.cancel();
174-
passedInSubscriber.releaseReference();
175-
outletSubscriber.releaseReference();
176172
}
177173
});
178174
}

common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapProcessor.java

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@ public class MultiFlatMapProcessor<T, X> implements Flow.Processor<T, X>, Multi<
3434

3535
private Function<T, Flow.Publisher<X>> mapper;
3636
private RequestedCounter requestCounter = new RequestedCounter();
37-
private SubscriberReference<? super X> subscriber;
37+
private SequentialSubscriber<? super X> subscriber;
3838
private Flow.Subscription subscription;
3939
private volatile Flow.Subscription innerSubscription;
4040
private volatile Flow.Publisher<X> innerPublisher;
4141
private volatile boolean upstreamsCompleted;
4242
private Optional<Throwable> error = Optional.empty();
43-
private ReentrantLock seqLock = new ReentrantLock();
4443
private ReentrantLock stateLock = new ReentrantLock();
4544

4645

@@ -92,19 +91,17 @@ public void request(long n) {
9291
public void cancel() {
9392
subscription.cancel();
9493
Optional.ofNullable(innerSubscription).ifPresent(Flow.Subscription::cancel);
95-
// https://github.com/reactive-streams/reactive-streams-jvm#3.13
96-
subscriber.releaseReference();
9794
}
9895
}
9996

10097
@Override
10198
public void subscribe(Flow.Subscriber<? super X> subscriber) {
10299
stateLock(() -> {
103-
this.subscriber = SubscriberReference.create(subscriber);
100+
this.subscriber = SequentialSubscriber.create(subscriber);
104101
if (Objects.nonNull(this.subscription)) {
105-
seqLock(() -> subscriber.onSubscribe(new FlatMapSubscription()));
102+
this.subscriber.onSubscribe(new FlatMapSubscription());
106103
}
107-
error.ifPresent(subscriber::onError);
104+
error.ifPresent(this.subscriber::onError);
108105
});
109106
}
110107

@@ -129,18 +126,16 @@ public void onNext(T o) {
129126
innerPublisher = mapper.apply(o);
130127
innerPublisher.subscribe(new InnerSubscriber());
131128
} catch (Throwable t) {
132-
seqLock(() -> {
133-
subscription.cancel();
134-
subscriber.onError(t);
135-
});
129+
subscription.cancel();
130+
subscriber.onError(t);
136131
}
137132
}
138133

139134
@Override
140135
public void onError(Throwable t) {
141136
this.error = Optional.of(t);
142137
if (Objects.nonNull(subscriber)) {
143-
seqLock(() -> subscriber.onError(t));
138+
subscriber.onError(t);
144139
}
145140
}
146141

@@ -160,41 +155,37 @@ private class InnerSubscriber implements Flow.Subscriber<X> {
160155

161156
@Override
162157
public void onSubscribe(Flow.Subscription subscription) {
163-
seqLock(() -> {
164-
if (alreadySubscribed.getAndSet(true)) {
165-
subscription.cancel();
166-
return;
167-
}
168-
innerSubscription = subscription;
169-
innerSubscription.request(requestCounter.get());
170-
});
158+
if (alreadySubscribed.getAndSet(true)) {
159+
subscription.cancel();
160+
return;
161+
}
162+
innerSubscription = subscription;
163+
innerSubscription.request(requestCounter.get());
171164
}
172165

173166
@Override
174167
public void onNext(X item) {
175168
Objects.requireNonNull(item);
176169
stateLock(() -> {
177170
if (requestCounter.tryDecrement()) {
178-
seqLock(() -> subscriber.onNext(item));
171+
subscriber.onNext(item);
179172
}
180173
});
181174
}
182175

183176
@Override
184177
public void onError(Throwable throwable) {
185178
Objects.requireNonNull(throwable);
186-
seqLock(() -> {
187-
subscription.cancel();
188-
subscriber.onError(throwable);
189-
});
179+
subscription.cancel();
180+
subscriber.onError(throwable);
190181
}
191182

192183
@Override
193184
public void onComplete() {
194185
stateLock(() -> {
195186
innerPublisher = null;
196187
if (upstreamsCompleted) {
197-
seqLock(() -> subscriber.onComplete());
188+
subscriber.onComplete();
198189
}
199190
if (requestCounter.get() > 0) {
200191
subscription.request(1);
@@ -203,19 +194,6 @@ public void onComplete() {
203194
}
204195
}
205196

206-
/**
207-
* OnSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
208-
* https://github.com/reactive-streams/reactive-streams-jvm#1.3
209-
*/
210-
private void seqLock(Runnable runnable) {
211-
try {
212-
seqLock.lock();
213-
runnable.run();
214-
} finally {
215-
seqLock.unlock();
216-
}
217-
}
218-
219197
/**
220198
* Protect critical sections when working with states of innerPublisher.
221199
*/
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.concurrent.Flow;
21+
import java.util.concurrent.locks.ReentrantLock;
22+
23+
/**
24+
* Wrapper {@link Flow.Subscriber} ensuring {@code OnSubscribe}, {@code onNext}, {@code onError}
25+
* and {@code onComplete} to be signaled serially.
26+
*
27+
* @param <T> Type of the item
28+
* @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#1.3">
29+
* https://github.com/reactive-streams/reactive-streams-jvm#1.3</a>
30+
*/
31+
public class SequentialSubscriber<T> implements Flow.Subscriber<T> {
32+
private Flow.Subscriber<T> subscriber;
33+
private ReentrantLock seqLock = new ReentrantLock();
34+
35+
private SequentialSubscriber(Flow.Subscriber<T> subscriber) {
36+
this.subscriber = subscriber;
37+
}
38+
39+
/**
40+
* Wrapper {@link Flow.Subscriber} ensuring {@code OnSubscribe}, {@code onNext}, {@code onError}
41+
* and {@code onComplete} to be signaled serially.
42+
*
43+
* @param subscriber {@link Flow.Subscriber} to be wrapped.
44+
* @param <T> item type
45+
* @return new {@link SequentialSubscriber}
46+
*/
47+
public static <T> SequentialSubscriber<T> create(Flow.Subscriber<T> subscriber) {
48+
return new SequentialSubscriber<>(subscriber);
49+
}
50+
51+
@Override
52+
public void onSubscribe(Flow.Subscription subscription) {
53+
seqLock(() -> subscriber.onSubscribe(subscription));
54+
}
55+
56+
@Override
57+
public void onNext(T item) {
58+
seqLock(() -> subscriber.onNext(item));
59+
}
60+
61+
@Override
62+
public void onError(Throwable throwable) {
63+
seqLock(() -> subscriber.onError(throwable));
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
seqLock(() -> subscriber.onComplete());
69+
}
70+
71+
/**
72+
* OnSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
73+
* https://github.com/reactive-streams/reactive-streams-jvm#1.3
74+
*/
75+
private void seqLock(Runnable runnable) {
76+
try {
77+
seqLock.lock();
78+
runnable.run();
79+
} finally {
80+
seqLock.unlock();
81+
}
82+
}
83+
}

common/reactive/src/main/java/io/helidon/common/reactive/SubscriberReference.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)