4
4
import org .reactivestreams .Publisher ;
5
5
import org .reactivestreams .Subscriber ;
6
6
import org .reactivestreams .Subscription ;
7
- import org .reactivestreams .tck .Annotations .NotVerified ;
8
- import org .reactivestreams .tck .Annotations .Required ;
9
7
import org .reactivestreams .tck .TestEnvironment .ManualPublisher ;
10
8
import org .reactivestreams .tck .TestEnvironment .ManualSubscriber ;
11
9
import org .reactivestreams .tck .TestEnvironment .ManualSubscriberWithSubscriptionSupport ;
@@ -30,21 +28,25 @@ public abstract class IdentityProcessorVerification<T> {
30
28
31
29
////////////////// END OF DELEGATED TO SPECS //////////////////
32
30
33
-
34
- private final int testBufferSize ;
31
+ //
32
+ private final int processorBufferSize ;
35
33
36
34
/**
37
35
* Test class must specify the expected time it takes for the publisher to
38
- * shut itself down when the the last downstream Subscription is cancelled.
39
- * Used by `publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe`.
36
+ * shut itself down when the the last downstream {@code Subscription} is cancelled.
40
37
*/
38
+ @ SuppressWarnings ("unused" )
41
39
public IdentityProcessorVerification (TestEnvironment env , long publisherShutdownTimeoutMillis ) {
42
40
this (env , publisherShutdownTimeoutMillis , TestEnvironment .TEST_BUFFER_SIZE );
43
41
}
44
42
45
- public IdentityProcessorVerification (final TestEnvironment env , long publisherShutdownTimeoutMillis , int testBufferSize ) {
43
+ /**
44
+ * Test class must specify the expected time it takes for the publisher to
45
+ * shut itself down when the the last downstream {@code Subscription} is cancelled.
46
+ */
47
+ public IdentityProcessorVerification (final TestEnvironment env , long publisherShutdownTimeoutMillis , int processorBufferSize ) {
46
48
this .env = env ;
47
- this .testBufferSize = testBufferSize ;
49
+ this .processorBufferSize = processorBufferSize ;
48
50
49
51
this .subscriberVerification = new SubscriberVerification <T >(env ) {
50
52
@ Override
@@ -53,14 +55,14 @@ public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
53
55
}
54
56
55
57
@ Override
56
- public Publisher <T > createHelperPublisher (int elements ) {
58
+ public Publisher <T > createHelperPublisher (long elements ) {
57
59
return IdentityProcessorVerification .this .createHelperPublisher (elements );
58
60
}
59
61
};
60
62
61
63
publisherVerification = new PublisherVerification <T >(env , publisherShutdownTimeoutMillis ) {
62
64
@ Override
63
- public Publisher <T > createPublisher (int elements ) {
65
+ public Publisher <T > createPublisher (long elements ) {
64
66
return IdentityProcessorVerification .this .createPublisher (elements );
65
67
}
66
68
@@ -73,24 +75,31 @@ public Publisher<T> createCompletedStatePublisher() {
73
75
public Publisher <T > createErrorStatePublisher () {
74
76
return IdentityProcessorVerification .this .createErrorStatePublisher ();
75
77
}
78
+
79
+ @ Override
80
+ public long maxElementsFromPublisher () {
81
+ return IdentityProcessorVerification .this .maxElementsFromPublisher ();
82
+ }
76
83
};
77
84
}
78
85
79
86
/**
80
87
* This is the main method you must implement in your test incarnation.
81
88
* It must create a Publisher, which simply forwards all stream elements from its upstream
82
89
* to its downstream. It must be able to internally buffer the given number of elements.
90
+ *
91
+ * @param bufferSize number of elements the processor is required to be able to buffer
83
92
*/
84
93
public abstract Processor <T , T > createIdentityProcessor (int bufferSize );
85
94
86
95
/**
87
96
* Helper method required for running the Publisher rules against a Publisher.
88
97
* It must create a Publisher for a stream with exactly the given number of elements.
89
- * If ` elements` is zero the produced stream must be infinite.
98
+ * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
90
99
* The stream must not produce the same element twice (in case of an infinite stream this requirement
91
100
* is relaxed to only apply to the elements that are actually requested during all tests).
92
101
*/
93
- public abstract Publisher <T > createHelperPublisher (int elements );
102
+ public abstract Publisher <T > createHelperPublisher (long elements );
94
103
95
104
/**
96
105
* Return a Publisher in {@code completed} state in order to run additional tests on it,
@@ -104,30 +113,43 @@ public Publisher<T> createErrorStatePublisher() {
104
113
*/
105
114
public abstract Publisher <T > createErrorStatePublisher ();
106
115
116
+ /**
117
+ * Override and return lower value if your Publisher is only able to produce a set number of elements.
118
+ * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
119
+ */
120
+ public long maxElementsFromPublisher () {
121
+ // general idea is to skip tests that we are unable to run on a given publisher (if it can signal less than we need for a test)
122
+ // see: https://github.com/reactive-streams/reactive-streams/issues/87 for details
123
+ return Long .MAX_VALUE ;
124
+ }
125
+
107
126
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
108
127
109
128
@ BeforeMethod
110
129
public void setUp () throws Exception {
111
- env .clearAsyncErrors ();
130
+ publisherVerification .setUp ();
131
+ subscriberVerification .setUp ();
112
132
}
113
133
114
134
////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
115
- // 4.1
116
- // A Processor represents a processing stage—which is both a Subscriber and a Publisher
117
- // It MUST obey the contracts of both [1]
118
135
119
-
120
- // A Publisher
121
- // must obey all Publisher rules on its producing side
122
- public Publisher <T > createPublisher (int elements ) {
123
- Processor <T , T > processor = createIdentityProcessor (testBufferSize );
136
+ // A Processor
137
+ // must obey all Publisher rules on its publishing side
138
+ public Publisher <T > createPublisher (long elements ) {
139
+ Processor <T , T > processor = createIdentityProcessor (processorBufferSize );
124
140
Publisher <T > pub = createHelperPublisher (elements );
125
141
pub .subscribe (processor );
126
142
return processor ; // we run the PublisherVerification against this
127
143
}
128
144
129
145
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
130
-
146
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
147
+
148
+ @ Test
149
+ public void createPublisher1MustProduceAStreamOfExactly1Element () throws Throwable {
150
+ publisherVerification .createPublisher1MustProduceAStreamOfExactly1Element ();
151
+ }
152
+
131
153
@ Test
132
154
public void createPublisher3MustProduceAStreamOfExactly3Elements () throws Throwable {
133
155
publisherVerification .createPublisher3MustProduceAStreamOfExactly3Elements ();
@@ -144,7 +166,7 @@ public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws
144
166
}
145
167
146
168
@ Test
147
- public void spec103_mustSignalOnMethodsSequentially () throws Exception {
169
+ public void spec103_mustSignalOnMethodsSequentially () throws Throwable {
148
170
publisherVerification .spec103_mustSignalOnMethodsSequentially ();
149
171
}
150
172
@@ -213,11 +235,11 @@ public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSu
213
235
publisherVerification .spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber ();
214
236
}
215
237
216
- // A Processor
217
- // must call `onError` on all its subscribers if it encounters a non-recoverable error
238
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
239
+ // for multiple subscribers
218
240
@ Test
219
241
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError () throws Exception {
220
- new TestSetup (env , testBufferSize ) {{
242
+ new TestSetup (env , processorBufferSize ) {{
221
243
ManualSubscriberWithErrorCollection <T > sub1 = new ManualSubscriberWithErrorCollection <T >(env );
222
244
env .subscribe (processor , sub1 );
223
245
ManualSubscriberWithErrorCollection <T > sub2 = new ManualSubscriberWithErrorCollection <T >(env );
@@ -242,11 +264,12 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera
242
264
}
243
265
244
266
////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
267
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
245
268
246
269
// A Processor
247
270
// must obey all Subscriber rules on its consuming side
248
271
public Subscriber <T > createSubscriber (final SubscriberVerification .SubscriberProbe <T > probe ) {
249
- Processor <T , T > processor = createIdentityProcessor (testBufferSize );
272
+ Processor <T , T > processor = createIdentityProcessor (processorBufferSize );
250
273
processor .subscribe (
251
274
new Subscriber <T >() {
252
275
public void onSubscribe (final Subscription subscription ) {
@@ -288,7 +311,7 @@ public void onError(Throwable cause) {
288
311
// must cancel its upstream Subscription if its last downstream Subscription has been cancelled
289
312
@ Test
290
313
public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled () throws Exception {
291
- new TestSetup (env , testBufferSize ) {{
314
+ new TestSetup (env , processorBufferSize ) {{
292
315
ManualSubscriber <T > sub = newSubscriber ();
293
316
sub .cancel ();
294
317
expectCancelling ();
@@ -301,7 +324,7 @@ public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasB
301
324
// must immediately pass on `onError` events received from its upstream to its downstream
302
325
@ Test
303
326
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream () throws Exception {
304
- new TestSetup (env , testBufferSize ) {{
327
+ new TestSetup (env , processorBufferSize ) {{
305
328
ManualSubscriberWithErrorCollection <T > sub = new ManualSubscriberWithErrorCollection <T >(env );
306
329
env .subscribe (processor , sub );
307
330
@@ -317,12 +340,12 @@ public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownst
317
340
// must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet
318
341
@ Test
319
342
public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet () throws Exception {
320
- new TestSetup (env , testBufferSize ) {{
343
+ new TestSetup (env , processorBufferSize ) {{
321
344
ManualSubscriber <T > sub = newSubscriber ();
322
345
final T x = sendNextTFromUpstream ();
323
346
sub .expectNone (50 );
324
347
final T y = sendNextTFromUpstream ();
325
- sub .expectNone (50 );
348
+ sub .expectNone (50 );
326
349
327
350
sub .request (2 );
328
351
sub .expectNext (x );
@@ -337,6 +360,7 @@ public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstr
337
360
}
338
361
339
362
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
363
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
340
364
341
365
@ Test
342
366
public void exerciseHappyPath () throws Throwable {
@@ -520,10 +544,11 @@ public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Thr
520
544
521
545
/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
522
546
523
- // trigger `requestFromUpstream` for elements that have been requested 'long ago'
547
+ // A Processor
548
+ // must trigger `requestFromUpstream` for elements that have been requested 'long ago'
524
549
@ Test
525
550
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo () throws Exception {
526
- new TestSetup (env , testBufferSize ) {{
551
+ new TestSetup (env , processorBufferSize ) {{
527
552
ManualSubscriber <T > sub1 = newSubscriber ();
528
553
sub1 .request (20 );
529
554
@@ -566,19 +591,20 @@ public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() thr
566
591
}};
567
592
}
568
593
569
- // unblock the stream if a 'blocking' subscription has been cancelled
594
+ // A Processor
595
+ // must unblock the stream if a 'blocking' subscription has been cancelled
570
596
@ Test
571
597
@ SuppressWarnings ("unchecked" )
572
598
public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled () throws InterruptedException {
573
- new TestSetup (env , testBufferSize ) {{
599
+ new TestSetup (env , processorBufferSize ) {{
574
600
ManualSubscriber <T > sub1 = newSubscriber ();
575
601
ManualSubscriber <T > sub2 = newSubscriber ();
576
602
577
- sub1 .request (testBufferSize + 1 );
603
+ sub1 .request (processorBufferSize + 1 );
578
604
long pending = 0 ;
579
605
int sent = 0 ;
580
- final T [] tees = (T []) new Object [testBufferSize ];
581
- while (sent < testBufferSize ) {
606
+ final T [] tees = (T []) new Object [processorBufferSize ];
607
+ while (sent < processorBufferSize ) {
582
608
if (pending == 0 ) {
583
609
pending = expectRequest ();
584
610
}
@@ -588,7 +614,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
588
614
pending -= 1 ;
589
615
}
590
616
591
- expectNoRequest (); // because we only have buffer size testBufferSize and sub2 hasn't seen the first value yet
617
+ expectNoRequest (); // because we only have buffer size processorBufferSize and sub2 hasn't seen the first value yet
592
618
sub2 .cancel (); // must "unblock"
593
619
594
620
expectRequest ();
@@ -606,16 +632,14 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
606
632
/////////////////////// TEST INFRASTRUCTURE //////////////////////
607
633
608
634
public abstract class TestSetup extends ManualPublisher <T > {
609
- private ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
635
+ final private ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
610
636
private Set <T > seenTees = new HashSet <T >();
611
637
612
638
final Processor <T , T > processor ;
613
- final int testBufferSize ;
614
639
615
640
public TestSetup (TestEnvironment env , int testBufferSize ) throws InterruptedException {
616
641
super (env );
617
- this .testBufferSize = testBufferSize ;
618
- tees = env .newManualSubscriber (createHelperPublisher (0 ));
642
+ tees = env .newManualSubscriber (createHelperPublisher (Long .MAX_VALUE ));
619
643
processor = createIdentityProcessor (testBufferSize );
620
644
subscribe (processor );
621
645
}
0 commit comments