Skip to content

Commit fb3a7d6

Browse files
committed
cancelled being true here is impossible because the only way to get that true is by submitting a Subscription to the Subscriber that happens once and under this condition.
1 parent b3eff1d commit fb3a7d6

File tree

1 file changed

+26
-28
lines changed

1 file changed

+26
-28
lines changed

examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java

+26-28
Original file line numberDiff line numberDiff line change
@@ -126,38 +126,36 @@ private void doCancel() {
126126
}
127127

128128
// Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe`
129-
// we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread.
129+
// we execute it asynchronously, this is to avoid executing the user code (`supplier`) on the calling thread.
130130
// It also makes it easier to follow rule 1.9
131131
private void doSubscribe() {
132-
if (!cancelled) {
133-
// Deal with setting up the subscription with the subscriber
134-
try {
135-
subscriber.onSubscribe(this);
136-
} catch (final Throwable t) { // Due diligence to obey 2.13
137-
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
138-
}
132+
// Deal with setting up the subscription with the subscriber
133+
try {
134+
subscriber.onSubscribe(this);
135+
} catch (final Throwable t) { // Due diligence to obey 2.13
136+
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
137+
}
139138

140-
// Deal with already complete iterators promptly
141-
boolean hasElements = false;
142-
try {
143-
// Try to fetch an element from a stream to ensure the stream is not empty,
144-
// this will be sent by the first calling of doSend
145-
nextElementToBeSent = supplier.get();
146-
hasElements = nextElementToBeSent != null;
147-
} catch (final Throwable t) {
148-
terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4
149-
return;
150-
}
139+
// Deal with already complete iterators promptly
140+
boolean hasElements = false;
141+
try {
142+
// Try to fetch an element from a stream to ensure the stream is not empty,
143+
// this will be sent by the first calling of doSend
144+
nextElementToBeSent = supplier.get();
145+
hasElements = nextElementToBeSent != null;
146+
} catch (final Throwable t) {
147+
terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4
148+
return;
149+
}
151150

152-
// If we don't have anything to deliver, we're already done, so lets do the right thing and
153-
// not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3
154-
if (!hasElements) {
155-
try {
156-
doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled
157-
subscriber.onComplete();
158-
} catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this.
159-
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err);
160-
}
151+
// If we don't have anything to deliver, we're already done, so lets do the right thing and
152+
// not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3
153+
if (!hasElements) {
154+
try {
155+
doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled
156+
subscriber.onComplete();
157+
} catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this.
158+
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err);
161159
}
162160
}
163161
}

0 commit comments

Comments
 (0)