Skip to content

Commit e3ead54

Browse files
committed
Complete removal of buffers, miscounting BaseProcessor fix
Signed-off-by: Daniel Kec <[email protected]>
1 parent 53cca10 commit e3ead54

File tree

18 files changed

+658
-320
lines changed

18 files changed

+658
-320
lines changed

common/reactive/src/main/java/io/helidon/common/reactive/BaseProcessor.java

Lines changed: 71 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,47 @@
3131
* @param <T> subscribed type (input)
3232
* @param <U> published type (output)
3333
*/
34-
public abstract class BaseProcessor<T, U> implements Processor<T, U>, Subscription {
34+
public abstract class BaseProcessor<T, U> implements Processor<T, U>, Subscription, StrictProcessor {
3535

3636
private Subscription subscription;
3737
private final SingleSubscriberHolder<U> subscriber;
3838
private final RequestedCounter requested;
39+
private final RequestedCounter ableToSubmit;
40+
private final ReentrantLock subscriptionLock = new ReentrantLock();
3941
private final AtomicBoolean ready;
4042
private final AtomicBoolean subscribed;
41-
private ReentrantLock publisherSequentialLock = new ReentrantLock();
4243
private volatile boolean done;
4344
private Throwable error;
45+
private boolean strictMode = DEFAULT_STRICT_MODE;
46+
4447

4548
/**
4649
* Generic processor used for the implementation of {@link Multi} and {@link Single}.
4750
*/
4851
protected BaseProcessor() {
49-
requested = new RequestedCounter();
52+
requested = new RequestedCounter(strictMode);
53+
ableToSubmit = new RequestedCounter(strictMode);
5054
ready = new AtomicBoolean();
5155
subscribed = new AtomicBoolean();
5256
subscriber = new SingleSubscriberHolder<>();
5357
}
5458

59+
@Override
60+
public BaseProcessor<T, U> strictMode(boolean strictMode) {
61+
this.strictMode = strictMode;
62+
return this;
63+
}
64+
5565
@Override
5666
public void request(long n) {
57-
StreamValidationUtils.checkRequestParam(n, this::failAndCancel);
58-
requested.increment(n, this::failAndCancel);
59-
tryRequest(subscription);
67+
ableToSubmit.increment(n, this::failAndCancel);
68+
subscriptionLock(() -> {
69+
if (subscription != null && !subscriber.isClosed()) {
70+
subscription.request(n);
71+
} else {
72+
requested.increment(n, this::failAndCancel);
73+
}
74+
});
6075
if (done) {
6176
tryComplete();
6277
}
@@ -74,39 +89,27 @@ public void cancel() {
7489

7590
@Override
7691
public void onSubscribe(Subscription s) {
77-
try {
78-
// https://github.com/reactive-streams/reactive-streams-jvm#1.3
79-
publisherSequentialLock.lock();
80-
// https://github.com/reactive-streams/reactive-streams-jvm#2.13
81-
Objects.requireNonNull(s);
92+
Objects.requireNonNull(s);
93+
subscriptionLock(() -> {
8294
if (subscription == null) {
8395
this.subscription = s;
8496
tryRequest(s);
8597
} else {
86-
// https://github.com/reactive-streams/reactive-streams-jvm#2.5
8798
s.cancel();
8899
}
89-
} finally {
90-
publisherSequentialLock.unlock();
91-
}
100+
});
92101
}
93102

94103
@Override
95104
public void onNext(T item) {
105+
if (done) {
106+
throw new IllegalStateException("Subscriber is closed!");
107+
}
108+
Objects.requireNonNull(item);
96109
try {
97-
publisherSequentialLock.lock();
98-
if (done) {
99-
throw new IllegalStateException("Subscriber is closed!");
100-
}
101-
// https://github.com/reactive-streams/reactive-streams-jvm#2.13
102-
Objects.requireNonNull(item);
103-
try {
104-
hookOnNext(item);
105-
} catch (Throwable ex) {
106-
failAndCancel(ex);
107-
}
108-
} finally {
109-
publisherSequentialLock.unlock();
110+
hookOnNext(item);
111+
} catch (Throwable ex) {
112+
failAndCancel(ex);
110113
}
111114
}
112115

@@ -116,7 +119,6 @@ public void onNext(T item) {
116119
* @param ex Exception to be reported downstream
117120
*/
118121
protected void fail(Throwable ex) {
119-
// https://github.com/reactive-streams/reactive-streams-jvm#2.13
120122
Objects.requireNonNull(ex);
121123
done = true;
122124
if (error == null) {
@@ -147,19 +149,20 @@ public void onComplete() {
147149
}
148150

149151
@Override
150-
public void subscribe(Subscriber<? super U> s) {
151-
try {
152-
publisherSequentialLock.lock();
152+
public void subscribe(Subscriber<? super U> sub) {
153+
subscriptionLock(() -> {
154+
var s = sub;
155+
if (strictMode) {
156+
s = SequentialSubscriber.create(sub);
157+
}
153158
if (subscriber.register(s)) {
154159
ready.set(true);
155160
s.onSubscribe(this);
156161
if (done) {
157162
tryComplete();
158163
}
159164
}
160-
} finally {
161-
publisherSequentialLock.unlock();
162-
}
165+
});
163166
}
164167

165168
/**
@@ -172,31 +175,13 @@ protected Optional<Subscription> getSubscription() {
172175
return Optional.ofNullable(subscription);
173176
}
174177

175-
/**
176-
* Processor's {@link SingleSubscriberHolder}.
177-
*
178-
* @return {@link SingleSubscriberHolder}
179-
*/
180-
protected SingleSubscriberHolder<U> getSubscriber() {
181-
return subscriber;
182-
}
183-
184-
/**
185-
* Returns {@link RequestedCounter} with information about requested vs. submitted items.
186-
*
187-
* @return {@link RequestedCounter}
188-
*/
189-
protected RequestedCounter getRequestedCounter() {
190-
return requested;
191-
}
192-
193178
/**
194179
* Submit an item to the subscriber.
195180
*
196181
* @param item item to be submitted
197182
*/
198183
protected void submit(U item) {
199-
if (requested.tryDecrement()) {
184+
if (ableToSubmit.tryDecrement()) {
200185
try {
201186
subscriber.get().onNext(item);
202187
} catch (InterruptedException ex) {
@@ -257,10 +242,10 @@ protected void hookOnCancel(Flow.Subscription subscription) {
257242
*/
258243
protected final void doSubscribe(Publisher<U> delegate) {
259244
if (subscribed.compareAndSet(false, true)) {
260-
delegate.subscribe(new Subscriber<U>() {
245+
delegate.subscribe(new Subscriber<>() {
261246
@Override
262247
public void onSubscribe(Subscription subscription) {
263-
tryRequest(subscription);
248+
tryRequest(ableToSubmit, subscription);
264249
}
265250

266251
@Override
@@ -311,11 +296,37 @@ protected void tryComplete() {
311296
* @param subscription {@link Flow.Subscription} to make a request from
312297
*/
313298
protected void tryRequest(Subscription subscription) {
314-
if (subscription != null && !subscriber.isClosed()) {
315-
long n = requested.get();
316-
if (n > 0) {
317-
subscription.request(n);
299+
tryRequest(requested, subscription);
300+
}
301+
302+
private void tryRequest(RequestedCounter counter, Subscription subscription) {
303+
if (subscription == null || subscriber.isClosed()) {
304+
return;
305+
}
306+
307+
long n;
308+
try {
309+
counter.lock();
310+
n = counter.get();
311+
if (n < 1) {
312+
return;
318313
}
314+
subscription.request(n);
315+
} finally {
316+
counter.unlock();
317+
}
318+
}
319+
320+
private void subscriptionLock(Runnable guardedBlock) {
321+
if (!strictMode) {
322+
guardedBlock.run();
323+
return;
324+
}
325+
try {
326+
subscriptionLock.lock();
327+
guardedBlock.run();
328+
} finally {
329+
subscriptionLock.unlock();
319330
}
320331
}
321332
}

common/reactive/src/main/java/io/helidon/common/reactive/MultiFilterProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ protected void hookOnNext(T item) {
4848
if (predicate.test(item)) {
4949
submit(item);
5050
} else {
51-
getRequestedCounter().increment(1, this::fail);
51+
tryRequest(getSubscription().get());
5252
}
5353
}
5454
}

common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapProcessor.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,18 @@
3030
* @param <T> input item type
3131
* @param <X> output item type
3232
*/
33-
public class MultiFlatMapProcessor<T, X> implements Flow.Processor<T, X>, Multi<X> {
33+
public class MultiFlatMapProcessor<T, X> implements Flow.Processor<T, X>, Multi<X>, StrictProcessor {
3434

3535
private Function<T, Flow.Publisher<X>> mapper;
3636
private RequestedCounter requestCounter = new RequestedCounter();
37-
private SequentialSubscriber<? super X> subscriber;
37+
private Flow.Subscriber<? super X> subscriber;
3838
private Flow.Subscription subscription;
3939
private volatile Flow.Subscription innerSubscription;
4040
private volatile Flow.Publisher<X> innerPublisher;
41-
private volatile boolean upstreamsCompleted;
41+
private AtomicBoolean upstreamsCompleted = new AtomicBoolean(false);
4242
private Optional<Throwable> error = Optional.empty();
4343
private ReentrantLock stateLock = new ReentrantLock();
44+
private boolean strictMode = BaseProcessor.DEFAULT_STRICT_MODE;
4445

4546

4647
private MultiFlatMapProcessor() {
@@ -76,10 +77,16 @@ public static <T, U> MultiFlatMapProcessor<T, U> fromPublisherMapper(Function<T,
7677
return flatMapProcessor;
7778
}
7879

80+
@Override
81+
public MultiFlatMapProcessor<T, X> strictMode(boolean strictMode) {
82+
this.strictMode = strictMode;
83+
return this;
84+
}
85+
7986
private class FlatMapSubscription implements Flow.Subscription {
8087
@Override
8188
public void request(long n) {
82-
stateLock(() -> requestCounter.increment(n, MultiFlatMapProcessor.this::onError));
89+
requestCounter.increment(n, MultiFlatMapProcessor.this::onError);
8390
if (Objects.nonNull(innerSubscription)) {
8491
innerSubscription.request(n);
8592
} else {
@@ -96,8 +103,8 @@ public void cancel() {
96103

97104
@Override
98105
public void subscribe(Flow.Subscriber<? super X> subscriber) {
106+
this.subscriber = SequentialSubscriber.create(subscriber);
99107
stateLock(() -> {
100-
this.subscriber = SequentialSubscriber.create(subscriber);
101108
if (Objects.nonNull(this.subscription)) {
102109
this.subscriber.onSubscribe(new FlatMapSubscription());
103110
}
@@ -141,8 +148,8 @@ public void onError(Throwable t) {
141148

142149
@Override
143150
public void onComplete() {
151+
upstreamsCompleted.set(true);
144152
stateLock(() -> {
145-
upstreamsCompleted = true;
146153
if (requestCounter.get() == 0 || Objects.isNull(innerPublisher)) {
147154
//Have to wait for all Publishers to be finished
148155
subscriber.onComplete();
@@ -166,11 +173,9 @@ public void onSubscribe(Flow.Subscription subscription) {
166173
@Override
167174
public void onNext(X item) {
168175
Objects.requireNonNull(item);
169-
stateLock(() -> {
170-
if (requestCounter.tryDecrement()) {
171-
subscriber.onNext(item);
172-
}
173-
});
176+
if (requestCounter.tryDecrement()) {
177+
subscriber.onNext(item);
178+
}
174179
}
175180

176181
@Override
@@ -182,22 +187,31 @@ public void onError(Throwable throwable) {
182187

183188
@Override
184189
public void onComplete() {
190+
if (upstreamsCompleted.get()) {
191+
subscriber.onComplete();
192+
}
185193
stateLock(() -> {
186194
innerPublisher = null;
187-
if (upstreamsCompleted) {
188-
subscriber.onComplete();
189-
}
195+
});
196+
try {
197+
requestCounter.lock();
190198
if (requestCounter.get() > 0) {
191199
subscription.request(1);
192200
}
193-
});
201+
} finally {
202+
requestCounter.unlock();
203+
}
194204
}
195205
}
196206

197207
/**
198208
* Protect critical sections when working with states of innerPublisher.
199209
*/
200210
private void stateLock(Runnable runnable) {
211+
if (!strictMode) {
212+
runnable.run();
213+
return;
214+
}
201215
try {
202216
stateLock.lock();
203217
runnable.run();

0 commit comments

Comments
 (0)