1
+ package org .reactivestreams .tck ;
2
+
3
+ import org .reactivestreams .Publisher ;
4
+ import org .reactivestreams .Subscriber ;
5
+ import org .reactivestreams .Subscription ;
6
+ import org .reactivestreams .tck .TestEnvironment .Latch ;
7
+ import org .reactivestreams .tck .TestEnvironment .ManualPublisher ;
8
+ import org .reactivestreams .tck .TestEnvironment .ManualSubscriber ;
9
+ import org .reactivestreams .tck .TestEnvironment .Promise ;
10
+ import org .reactivestreams .tck .TestEnvironment .Receptacle ;
11
+ import org .testng .annotations .Test ;
12
+
13
+ public abstract class SubscriberVerification <T > {
14
+
15
+ private final TestEnvironment env ;
16
+
17
+ protected SubscriberVerification (TestEnvironment env ) {
18
+ this .env = env ;
19
+ }
20
+
21
+ /**
22
+ * This is the main method you must implement in your test incarnation.
23
+ * It must create a new Subscriber instance to be subjected to the testing logic.
24
+ * <p/>
25
+ * In order to be meaningfully testable your Subscriber must inform the given
26
+ * `SubscriberProbe` of the respective events having been received.
27
+ */
28
+ public abstract Subscriber <T > createSubscriber (SubscriberProbe <T > probe );
29
+
30
+ /**
31
+ * Helper method required for generating test elements.
32
+ * It must create a Publisher for a stream with exactly the given number of elements.
33
+ * If `elements` is zero the produced stream must be infinite.
34
+ */
35
+ public abstract Publisher <T > createHelperPublisher (int elements );
36
+
37
+ ////////////////////// TEST SETUP VERIFICATION ///////////////////////////
38
+
39
+ @ Test
40
+ public void exerciseHappyPath () throws InterruptedException {
41
+ new TestSetup (env ) {{
42
+ puppet ().triggerRequestMore (1 );
43
+
44
+ puppet ().triggerRequestMore (1 );
45
+ int receivedRequests = expectRequestMore ();
46
+ sendNextTFromUpstream ();
47
+ probe .expectNext (lastT );
48
+
49
+ puppet ().triggerRequestMore (1 );
50
+ if (receivedRequests == 1 ) {
51
+ expectRequestMore ();
52
+ }
53
+ sendNextTFromUpstream ();
54
+ probe .expectNext (lastT );
55
+
56
+ puppet ().triggerCancel ();
57
+ expectCancelling ();
58
+
59
+ env .verifyNoAsyncErrors ();
60
+ }};
61
+ }
62
+
63
+ ////////////////////// SPEC RULE VERIFICATION ///////////////////////////
64
+
65
+ // Subscriber::onSubscribe(Subscription), Subscriber::onNext(T)
66
+ // must asynchronously schedule a respective event to the subscriber
67
+ // must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
68
+ @ Test
69
+ public void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent () {
70
+ // cannot be meaningfully tested, or can it?
71
+ }
72
+
73
+ // Subscriber::onComplete, Subscriber::onError(Throwable)
74
+ // must asynchronously schedule a respective event to the Subscriber
75
+ // must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
76
+ // must consider the Subscription cancelled after having received the event
77
+ @ Test
78
+ public void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent () {
79
+ // cannot be meaningfully tested, or can it?
80
+ }
81
+
82
+ // A Subscriber
83
+ // must not accept an `onSubscribe` event if it already has an active Subscription
84
+ @ Test
85
+ public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription () throws InterruptedException {
86
+ new TestSetup (env ) {{
87
+ // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
88
+ sub ().onSubscribe (
89
+ new Subscription () {
90
+ public void request (int elements ) {
91
+ env .flop (String .format ("Subscriber %s illegally called `subscription.requestMore(%s)`" , sub (), elements ));
92
+ }
93
+
94
+ public void cancel () {
95
+ env .flop (String .format ("Subscriber %s illegally called `subscription.cancel()`" , sub ()));
96
+ }
97
+ });
98
+
99
+ env .verifyNoAsyncErrors ();
100
+ }};
101
+ }
102
+
103
+ // A Subscriber
104
+ // must call Subscription::cancel during shutdown if it still has an active Subscription
105
+ @ Test
106
+ public void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription () throws InterruptedException {
107
+ new TestSetup (env ) {{
108
+ puppet ().triggerShutdown ();
109
+ expectCancelling ();
110
+
111
+ env .verifyNoAsyncErrors ();
112
+ }};
113
+ }
114
+
115
+ // A Subscriber
116
+ // must ensure that all calls on a Subscription take place from the same thread or provide for respective external synchronization
117
+ @ Test
118
+ public void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync () {
119
+ // cannot be meaningfully tested, or can it?
120
+ }
121
+
122
+ // A Subscriber
123
+ // must be prepared to receive one or more `onNext` events after having called Subscription::cancel
124
+ @ Test
125
+ public void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel () throws InterruptedException {
126
+ new TestSetup (env ) {{
127
+ puppet ().triggerRequestMore (1 );
128
+ puppet ().triggerCancel ();
129
+ expectCancelling ();
130
+ sendNextTFromUpstream ();
131
+
132
+ env .verifyNoAsyncErrors ();
133
+ }};
134
+ }
135
+
136
+ // A Subscriber
137
+ // must be prepared to receive an `onComplete` event with a preceding Subscription::requestMore call
138
+ @ Test
139
+ public void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore () throws InterruptedException {
140
+ new TestSetup (env ) {{
141
+ puppet ().triggerRequestMore (1 );
142
+ sendCompletion ();
143
+ probe .expectCompletion ();
144
+
145
+ env .verifyNoAsyncErrors ();
146
+ }};
147
+ }
148
+
149
+ // A Subscriber
150
+ // must be prepared to receive an `onComplete` event without a preceding Subscription::requestMore call
151
+ @ Test
152
+ public void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore () throws InterruptedException {
153
+ new TestSetup (env ) {{
154
+ sendCompletion ();
155
+ probe .expectCompletion ();
156
+
157
+ env .verifyNoAsyncErrors ();
158
+ }};
159
+ }
160
+
161
+ // A Subscriber
162
+ // must be prepared to receive an `onError` event with a preceding Subscription::requestMore call
163
+ @ Test
164
+ public void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore () throws InterruptedException {
165
+ new TestSetup (env ) {{
166
+ puppet ().triggerRequestMore (1 );
167
+ Exception ex = new RuntimeException ("Test exception" );
168
+ sendError (ex );
169
+ probe .expectError (ex );
170
+
171
+ env .verifyNoAsyncErrors ();
172
+ }};
173
+ }
174
+
175
+ // A Subscriber
176
+ // must be prepared to receive an `onError` event without a preceding Subscription::requestMore call
177
+ @ Test
178
+ public void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore () throws InterruptedException {
179
+ new TestSetup (env ) {{
180
+ Exception ex = new RuntimeException ("Test exception" );
181
+ sendError (ex );
182
+ probe .expectError (ex );
183
+ env .verifyNoAsyncErrors ();
184
+ }};
185
+ }
186
+
187
+ // A Subscriber
188
+ // must make sure that all calls on its `onXXX` methods happen-before the processing of the respective events
189
+ @ Test
190
+ public void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents () {
191
+ // cannot be meaningfully tested, or can it?
192
+ }
193
+
194
+ /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
195
+
196
+ /////////////////////// TEST INFRASTRUCTURE //////////////////////
197
+
198
+ public class TestSetup extends ManualPublisher <T > {
199
+ ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
200
+ Probe probe ;
201
+ T lastT = null ;
202
+
203
+ public TestSetup (TestEnvironment env ) throws InterruptedException {
204
+ super (env );
205
+ tees = env .newManualSubscriber (createHelperPublisher (0 ));
206
+ probe = new Probe ();
207
+ subscribe (createSubscriber (probe ));
208
+ probe .puppet .expectCompletion (env .defaultTimeoutMillis (), String .format ("Subscriber %s did not `registerOnSubscribe`" , sub ()));
209
+ }
210
+
211
+ public Subscriber <T > sub () {
212
+ return subscriber .get ();
213
+ }
214
+
215
+ public SubscriberPuppet puppet () {
216
+ return probe .puppet .value ();
217
+ }
218
+
219
+ public void sendNextTFromUpstream () throws InterruptedException {
220
+ sendNext (nextT ());
221
+ }
222
+
223
+ public T nextT () throws InterruptedException {
224
+ lastT = tees .requestNextElement ();
225
+ return lastT ;
226
+ }
227
+
228
+ public class Probe implements SubscriberProbe <T > {
229
+ Promise <SubscriberPuppet > puppet = new Promise <SubscriberPuppet >(env );
230
+ Receptacle <T > elements = new Receptacle <T >(env );
231
+ Latch completed = new Latch (env );
232
+ Promise <Throwable > error = new Promise <Throwable >(env );
233
+
234
+ public void registerOnSubscribe (SubscriberPuppet p ) {
235
+ if (!puppet .isCompleted ()) {
236
+ puppet .complete (p );
237
+ } else {
238
+ env .flop (String .format ("Subscriber %s illegally accepted a second Subscription" , sub ()));
239
+ }
240
+ }
241
+
242
+ public void registerOnNext (T element ) {
243
+ elements .add (element );
244
+ }
245
+
246
+ public void registerOnComplete () {
247
+ completed .close ();
248
+ }
249
+
250
+ public void registerOnError (Throwable cause ) {
251
+ error .complete (cause );
252
+ }
253
+
254
+ public void expectNext (T expected ) throws InterruptedException {
255
+ expectNext (expected , env .defaultTimeoutMillis ());
256
+ }
257
+
258
+ public void expectNext (T expected , long timeoutMillis ) throws InterruptedException {
259
+ T received = elements .next (timeoutMillis , String .format ("Subscriber %s did not call `registerOnNext(%s)`" , sub (), expected ));
260
+ if (!received .equals (expected )) {
261
+ env .flop (String .format ("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`" , sub (), received , expected ));
262
+ }
263
+ }
264
+
265
+ public void expectCompletion () throws InterruptedException {
266
+ expectCompletion (env .defaultTimeoutMillis ());
267
+ }
268
+
269
+ public void expectCompletion (long timeoutMillis ) throws InterruptedException {
270
+ completed .expectClose (timeoutMillis , String .format ("Subscriber %s did not call `registerOnComplete()`" , sub ()));
271
+ }
272
+
273
+ public void expectError (Throwable expected ) throws InterruptedException {
274
+ expectError (expected , env .defaultTimeoutMillis ());
275
+ }
276
+
277
+ public void expectError (Throwable expected , long timeoutMillis ) throws InterruptedException {
278
+ error .expectCompletion (timeoutMillis , String .format ("Subscriber %s did not call `registerOnError(%s)`" , sub (), expected ));
279
+ if (error .value () != expected ) {
280
+ env .flop (String .format ("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`" , sub (), error .value (), expected ));
281
+ }
282
+ }
283
+
284
+ public void verifyNoAsyncErrors () {
285
+ env .verifyNoAsyncErrors ();
286
+ }
287
+ }
288
+ }
289
+
290
+ public interface SubscriberProbe <T > {
291
+ /**
292
+ * Must be called by the test subscriber when it has received the `onSubscribe` event.
293
+ */
294
+ void registerOnSubscribe (SubscriberPuppet puppet );
295
+
296
+ /**
297
+ * Must be called by the test subscriber when it has received an`onNext` event.
298
+ */
299
+ void registerOnNext (T element );
300
+
301
+ /**
302
+ * Must be called by the test subscriber when it has received an `onComplete` event.
303
+ */
304
+ void registerOnComplete ();
305
+
306
+ /**
307
+ * Must be called by the test subscriber when it has received an `onError` event.
308
+ */
309
+ void registerOnError (Throwable cause );
310
+ }
311
+
312
+ public interface SubscriberPuppet {
313
+ void triggerShutdown ();
314
+
315
+ void triggerRequestMore (int elements );
316
+
317
+ void triggerCancel ();
318
+ }
319
+ }
0 commit comments