4
4
import org .reactivestreams .Publisher ;
5
5
import org .reactivestreams .Subscriber ;
6
6
import org .reactivestreams .Subscription ;
7
- import org .reactivestreams .tck .Annotations .NotVerified ;
8
- import org .reactivestreams .tck .Annotations .Required ;
9
7
import org .reactivestreams .tck .TestEnvironment .ManualPublisher ;
10
8
import org .reactivestreams .tck .TestEnvironment .ManualSubscriber ;
11
9
import org .reactivestreams .tck .TestEnvironment .ManualSubscriberWithSubscriptionSupport ;
@@ -30,21 +28,25 @@ public abstract class IdentityProcessorVerification<T> {
30
28
31
29
////////////////// END OF DELEGATED TO SPECS //////////////////
32
30
33
-
34
- private final int testBufferSize ;
31
+ //
32
+ private final int processorBufferSize ;
35
33
36
34
/**
37
35
* Test class must specify the expected time it takes for the publisher to
38
- * shut itself down when the the last downstream Subscription is cancelled.
39
- * Used by `publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe`.
36
+ * shut itself down when the the last downstream {@code Subscription} is cancelled.
40
37
*/
38
+ @ SuppressWarnings ("unused" )
41
39
public IdentityProcessorVerification (TestEnvironment env , long publisherShutdownTimeoutMillis ) {
42
40
this (env , publisherShutdownTimeoutMillis , TestEnvironment .TEST_BUFFER_SIZE );
43
41
}
44
42
45
- public IdentityProcessorVerification (final TestEnvironment env , long publisherShutdownTimeoutMillis , int testBufferSize ) {
43
+ /**
44
+ * Test class must specify the expected time it takes for the publisher to
45
+ * shut itself down when the the last downstream {@code Subscription} is cancelled.
46
+ */
47
+ public IdentityProcessorVerification (final TestEnvironment env , long publisherShutdownTimeoutMillis , int processorBufferSize ) {
46
48
this .env = env ;
47
- this .testBufferSize = testBufferSize ;
49
+ this .processorBufferSize = processorBufferSize ;
48
50
49
51
this .subscriberVerification = new SubscriberVerification <T >(env ) {
50
52
@ Override
@@ -53,14 +55,14 @@ public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
53
55
}
54
56
55
57
@ Override
56
- public Publisher <T > createHelperPublisher (int elements ) {
58
+ public Publisher <T > createHelperPublisher (long elements ) {
57
59
return IdentityProcessorVerification .this .createHelperPublisher (elements );
58
60
}
59
61
};
60
62
61
63
publisherVerification = new PublisherVerification <T >(env , publisherShutdownTimeoutMillis ) {
62
64
@ Override
63
- public Publisher <T > createPublisher (int elements ) {
65
+ public Publisher <T > createPublisher (long elements ) {
64
66
return IdentityProcessorVerification .this .createPublisher (elements );
65
67
}
66
68
@@ -73,24 +75,31 @@ public Publisher<T> createCompletedStatePublisher() {
73
75
public Publisher <T > createErrorStatePublisher () {
74
76
return IdentityProcessorVerification .this .createErrorStatePublisher ();
75
77
}
78
+
79
+ @ Override
80
+ public long maxElementsFromPublisher () {
81
+ return IdentityProcessorVerification .this .maxElementsFromPublisher ();
82
+ }
76
83
};
77
84
}
78
85
79
86
/**
80
87
* This is the main method you must implement in your test incarnation.
81
88
* It must create a Publisher, which simply forwards all stream elements from its upstream
82
89
* to its downstream. It must be able to internally buffer the given number of elements.
90
+ *
91
+ * @param bufferSize number of elements the processor is required to be able to buffer
83
92
*/
84
93
public abstract Processor <T , T > createIdentityProcessor (int bufferSize );
85
94
86
95
/**
87
96
* Helper method required for running the Publisher rules against a Publisher.
88
97
* It must create a Publisher for a stream with exactly the given number of elements.
89
- * If ` elements` is zero the produced stream must be infinite.
98
+ * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
90
99
* The stream must not produce the same element twice (in case of an infinite stream this requirement
91
100
* is relaxed to only apply to the elements that are actually requested during all tests).
92
101
*/
93
- public abstract Publisher <T > createHelperPublisher (int elements );
102
+ public abstract Publisher <T > createHelperPublisher (long elements );
94
103
95
104
/**
96
105
* Return a Publisher in {@code completed} state in order to run additional tests on it,
@@ -104,30 +113,38 @@ public Publisher<T> createErrorStatePublisher() {
104
113
*/
105
114
public abstract Publisher <T > createErrorStatePublisher ();
106
115
116
+ /**
117
+ * Override and return lower value if your Publisher is only able to produce a set number of elements.
118
+ * For example, if it is designed to return at-most-one element, return {@code 1} from this method.
119
+ */
120
+ public long maxElementsFromPublisher () {
121
+ // general idea is to skip tests that we are unable to run on a given publisher (if it can signal less than we need for a test)
122
+ // see: https://github.com/reactive-streams/reactive-streams/issues/87 for details
123
+ return Long .MAX_VALUE ;
124
+ }
125
+
107
126
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
108
127
109
128
@ BeforeMethod
110
129
public void setUp () throws Exception {
111
- env .clearAsyncErrors ();
130
+ publisherVerification .setUp ();
131
+ subscriberVerification .setUp ();
112
132
}
113
133
114
134
////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
115
- // 4.1
116
- // A Processor represents a processing stage—which is both a Subscriber and a Publisher
117
- // It MUST obey the contracts of both [1]
118
-
119
135
120
- // A Publisher
121
- // must obey all Publisher rules on its producing side
122
- public Publisher <T > createPublisher (int elements ) {
123
- Processor <T , T > processor = createIdentityProcessor (testBufferSize );
136
+ // A Processor
137
+ // must obey all Publisher rules on its publishing side
138
+ public Publisher <T > createPublisher (long elements ) {
139
+ Processor <T , T > processor = createIdentityProcessor (processorBufferSize );
124
140
Publisher <T > pub = createHelperPublisher (elements );
125
141
pub .subscribe (processor );
126
142
return processor ; // we run the PublisherVerification against this
127
143
}
128
144
129
145
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
130
-
146
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
147
+
131
148
@ Test
132
149
public void createPublisher3MustProduceAStreamOfExactly3Elements () throws Throwable {
133
150
publisherVerification .createPublisher3MustProduceAStreamOfExactly3Elements ();
@@ -144,7 +161,7 @@ public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws
144
161
}
145
162
146
163
@ Test
147
- public void spec103_mustSignalOnMethodsSequentially () throws Exception {
164
+ public void spec103_mustSignalOnMethodsSequentially () throws Throwable {
148
165
publisherVerification .spec103_mustSignalOnMethodsSequentially ();
149
166
}
150
167
@@ -213,11 +230,11 @@ public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSu
213
230
publisherVerification .spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber ();
214
231
}
215
232
216
- // A Processor
217
- // must call `onError` on all its subscribers if it encounters a non-recoverable error
233
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
234
+ // for multiple subscribers
218
235
@ Test
219
236
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError () throws Exception {
220
- new TestSetup (env , testBufferSize ) {{
237
+ new TestSetup (env , processorBufferSize ) {{
221
238
ManualSubscriberWithErrorCollection <T > sub1 = new ManualSubscriberWithErrorCollection <T >(env );
222
239
env .subscribe (processor , sub1 );
223
240
ManualSubscriberWithErrorCollection <T > sub2 = new ManualSubscriberWithErrorCollection <T >(env );
@@ -242,11 +259,12 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera
242
259
}
243
260
244
261
////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
262
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
245
263
246
264
// A Processor
247
265
// must obey all Subscriber rules on its consuming side
248
266
public Subscriber <T > createSubscriber (final SubscriberVerification .SubscriberProbe <T > probe ) {
249
- Processor <T , T > processor = createIdentityProcessor (testBufferSize );
267
+ Processor <T , T > processor = createIdentityProcessor (processorBufferSize );
250
268
processor .subscribe (
251
269
new Subscriber <T >() {
252
270
public void onSubscribe (final Subscription subscription ) {
@@ -288,7 +306,7 @@ public void onError(Throwable cause) {
288
306
// must cancel its upstream Subscription if its last downstream Subscription has been cancelled
289
307
@ Test
290
308
public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled () throws Exception {
291
- new TestSetup (env , testBufferSize ) {{
309
+ new TestSetup (env , processorBufferSize ) {{
292
310
ManualSubscriber <T > sub = newSubscriber ();
293
311
sub .cancel ();
294
312
expectCancelling ();
@@ -301,7 +319,7 @@ public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasB
301
319
// must immediately pass on `onError` events received from its upstream to its downstream
302
320
@ Test
303
321
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream () throws Exception {
304
- new TestSetup (env , testBufferSize ) {{
322
+ new TestSetup (env , processorBufferSize ) {{
305
323
ManualSubscriberWithErrorCollection <T > sub = new ManualSubscriberWithErrorCollection <T >(env );
306
324
env .subscribe (processor , sub );
307
325
@@ -317,12 +335,12 @@ public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownst
317
335
// must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet
318
336
@ Test
319
337
public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet () throws Exception {
320
- new TestSetup (env , testBufferSize ) {{
338
+ new TestSetup (env , processorBufferSize ) {{
321
339
ManualSubscriber <T > sub = newSubscriber ();
322
340
final T x = sendNextTFromUpstream ();
323
341
sub .expectNone (50 );
324
342
final T y = sendNextTFromUpstream ();
325
- sub .expectNone (50 );
343
+ sub .expectNone (50 );
326
344
327
345
sub .request (2 );
328
346
sub .expectNext (x );
@@ -337,6 +355,7 @@ public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstr
337
355
}
338
356
339
357
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
358
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
340
359
341
360
@ Test
342
361
public void exerciseHappyPath () throws Throwable {
@@ -520,10 +539,11 @@ public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Thr
520
539
521
540
/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
522
541
523
- // trigger `requestFromUpstream` for elements that have been requested 'long ago'
542
+ // A Processor
543
+ // must trigger `requestFromUpstream` for elements that have been requested 'long ago'
524
544
@ Test
525
545
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo () throws Exception {
526
- new TestSetup (env , testBufferSize ) {{
546
+ new TestSetup (env , processorBufferSize ) {{
527
547
ManualSubscriber <T > sub1 = newSubscriber ();
528
548
sub1 .request (20 );
529
549
@@ -566,19 +586,20 @@ public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() thr
566
586
}};
567
587
}
568
588
569
- // unblock the stream if a 'blocking' subscription has been cancelled
589
+ // A Processor
590
+ // must unblock the stream if a 'blocking' subscription has been cancelled
570
591
@ Test
571
592
@ SuppressWarnings ("unchecked" )
572
593
public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled () throws InterruptedException {
573
- new TestSetup (env , testBufferSize ) {{
594
+ new TestSetup (env , processorBufferSize ) {{
574
595
ManualSubscriber <T > sub1 = newSubscriber ();
575
596
ManualSubscriber <T > sub2 = newSubscriber ();
576
597
577
- sub1 .request (testBufferSize + 1 );
598
+ sub1 .request (processorBufferSize + 1 );
578
599
long pending = 0 ;
579
600
int sent = 0 ;
580
- final T [] tees = (T []) new Object [testBufferSize ];
581
- while (sent < testBufferSize ) {
601
+ final T [] tees = (T []) new Object [processorBufferSize ];
602
+ while (sent < processorBufferSize ) {
582
603
if (pending == 0 ) {
583
604
pending = expectRequest ();
584
605
}
@@ -588,7 +609,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
588
609
pending -= 1 ;
589
610
}
590
611
591
- expectNoRequest (); // because we only have buffer size testBufferSize and sub2 hasn't seen the first value yet
612
+ expectNoRequest (); // because we only have buffer size processorBufferSize and sub2 hasn't seen the first value yet
592
613
sub2 .cancel (); // must "unblock"
593
614
594
615
expectRequest ();
@@ -606,16 +627,14 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
606
627
/////////////////////// TEST INFRASTRUCTURE //////////////////////
607
628
608
629
public abstract class TestSetup extends ManualPublisher <T > {
609
- private ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
630
+ final private ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
610
631
private Set <T > seenTees = new HashSet <T >();
611
632
612
633
final Processor <T , T > processor ;
613
- final int testBufferSize ;
614
634
615
635
public TestSetup (TestEnvironment env , int testBufferSize ) throws InterruptedException {
616
636
super (env );
617
- this .testBufferSize = testBufferSize ;
618
- tees = env .newManualSubscriber (createHelperPublisher (0 ));
637
+ tees = env .newManualSubscriber (createHelperPublisher (Long .MAX_VALUE ));
619
638
processor = createIdentityProcessor (testBufferSize );
620
639
subscribe (processor );
621
640
}
0 commit comments