You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
privateSubscriptionsubscription; // Obeying rule 3.1, we make this private!
41
+
privatebooleandone; // It's useful to keep track of whether this Subscriber is done or not
42
+
privatefinalExecutorexecutor; // This is the Executor we'll use to be asynchronous, obeying rule 2.2
43
+
44
+
// Only one constructor, and it's only accessible for the subclasses
45
+
protectedAsyncSubscriber(Executorexecutor) {
46
+
if (executor == null) thrownull;
47
+
this.executor = executor;
48
+
}
49
+
50
+
// Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
51
+
// herefor we also need to cancel our `Subscription`.
52
+
privatefinalvoiddone() {
53
+
//On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
54
+
done = true; // If we `foreach` throws an exception, let's consider ourselves done (not accepting more elements)
55
+
if (subscription != null) { // If we are bailing out before we got a `Subscription` there's little need for cancelling it.
56
+
try {
57
+
subscription.cancel(); // Cancel the subscription
58
+
} catch(finalThrowablet) {
59
+
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
60
+
(newIllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
61
+
}
62
+
}
63
+
}
64
+
65
+
// This method is invoked when the OnNext signals arrive
66
+
// Returns whether more elements are desired or not, and if no more elements are desired,
67
+
// for convenience.
68
+
protectedabstractbooleanwhenNext(finalTelement);
69
+
70
+
// This method is invoked when the OnComplete signal arrives
71
+
// override this method to implement your own custom onComplete logic.
72
+
protectedvoidwhenComplete() { }
73
+
74
+
// This method is invoked if the OnError signal arrives
75
+
// override this method to implement your own custom onError logic.
// Getting a null `Subscription` here is not valid so lets just ignore it.
81
+
} elseif (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
82
+
try {
83
+
s.cancel(); // Cancel the additional subscription to follow rule 2.5
84
+
} catch(finalThrowablet) {
85
+
//Subscription.cancel is not allowed to throw an exception, according to rule 3.15
86
+
(newIllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
87
+
}
88
+
} else {
89
+
// We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
90
+
// Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
91
+
subscription = s;
92
+
try {
93
+
// If we want elements, according to rule 2.1 we need to call `request`
94
+
// And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method
95
+
s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
96
+
} catch(finalThrowablet) {
97
+
// Subscription.request is not allowed to throw according to rule 3.16
98
+
(newIllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
99
+
}
100
+
}
101
+
}
102
+
103
+
privatefinalvoidhandleOnNext(finalTelement) {
104
+
if (!done) { // If we aren't already done
105
+
if(subscription == null) { // Check for spec violation of 2.1
106
+
(newIllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
107
+
} else {
108
+
try {
109
+
if (whenNext(element)) {
110
+
try {
111
+
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
112
+
} catch(finalThrowablet) {
113
+
// Subscription.request is not allowed to throw according to rule 3.16
114
+
(newIllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
115
+
}
116
+
} else {
117
+
done(); // This is legal according to rule 2.6
118
+
}
119
+
} catch(finalThrowablet) {
120
+
done();
121
+
try {
122
+
onError(t);
123
+
} catch(finalThrowablet2) {
124
+
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
125
+
(newIllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
126
+
}
127
+
}
128
+
}
129
+
}
130
+
}
131
+
132
+
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
133
+
privatevoidhandleOnComplete() {
134
+
done = true; // Obey rule 2.4
135
+
whenComplete();
136
+
}
137
+
138
+
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
139
+
privatevoidhandleOnError(finalThrowableerror) {
140
+
done = true; // Obey rule 2.4
141
+
whenError(error);
142
+
}
143
+
144
+
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
0 commit comments