4
4
import org .reactivestreams .Publisher ;
5
5
import org .reactivestreams .Subscriber ;
6
6
import org .reactivestreams .Subscription ;
7
- import org .reactivestreams .tck .Annotations .NotVerified ;
8
- import org .reactivestreams .tck .Annotations .Required ;
9
7
import org .reactivestreams .tck .TestEnvironment .ManualPublisher ;
10
8
import org .reactivestreams .tck .TestEnvironment .ManualSubscriber ;
11
9
import org .reactivestreams .tck .TestEnvironment .ManualSubscriberWithSubscriptionSupport ;
@@ -30,21 +28,25 @@ public abstract class IdentityProcessorVerification<T> {
30
28
31
29
////////////////// END OF DELEGATED TO SPECS //////////////////
32
30
33
-
34
- private final int testBufferSize ;
31
+ //
32
+ private final int processorBufferSize ;
35
33
36
34
/**
37
35
* Test class must specify the expected time it takes for the publisher to
38
- * shut itself down when the the last downstream Subscription is cancelled.
39
- * Used by `publisherSubscribeWhenInShutDownStateMustTriggerOnErrorAndNotOnSubscribe`.
36
+ * shut itself down when the the last downstream {@code Subscription} is cancelled.
40
37
*/
38
+ @ SuppressWarnings ("unused" )
41
39
public IdentityProcessorVerification (TestEnvironment env , long publisherShutdownTimeoutMillis ) {
42
40
this (env , publisherShutdownTimeoutMillis , TestEnvironment .TEST_BUFFER_SIZE );
43
41
}
44
42
45
- public IdentityProcessorVerification (final TestEnvironment env , long publisherShutdownTimeoutMillis , int testBufferSize ) {
43
+ /**
44
+ * Test class must specify the expected time it takes for the publisher to
45
+ * shut itself down when the the last downstream {@code Subscription} is cancelled.
46
+ */
47
+ public IdentityProcessorVerification (final TestEnvironment env , long publisherShutdownTimeoutMillis , int processorBufferSize ) {
46
48
this .env = env ;
47
- this .testBufferSize = testBufferSize ;
49
+ this .processorBufferSize = processorBufferSize ;
48
50
49
51
this .subscriberVerification = new SubscriberVerification <T >(env ) {
50
52
@ Override
@@ -53,14 +55,14 @@ public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
53
55
}
54
56
55
57
@ Override
56
- public Publisher <T > createHelperPublisher (int elements ) {
58
+ public Publisher <T > createHelperPublisher (long elements ) {
57
59
return IdentityProcessorVerification .this .createHelperPublisher (elements );
58
60
}
59
61
};
60
62
61
63
publisherVerification = new PublisherVerification <T >(env , publisherShutdownTimeoutMillis ) {
62
64
@ Override
63
- public Publisher <T > createPublisher (int elements ) {
65
+ public Publisher <T > createPublisher (long elements ) {
64
66
return IdentityProcessorVerification .this .createPublisher (elements );
65
67
}
66
68
@@ -80,17 +82,19 @@ public Publisher<T> createErrorStatePublisher() {
80
82
* This is the main method you must implement in your test incarnation.
81
83
* It must create a Publisher, which simply forwards all stream elements from its upstream
82
84
* to its downstream. It must be able to internally buffer the given number of elements.
85
+ *
86
+ * @param bufferSize number of elements the processor is required to be able to buffer
83
87
*/
84
88
public abstract Processor <T , T > createIdentityProcessor (int bufferSize );
85
89
86
90
/**
87
91
* Helper method required for running the Publisher rules against a Publisher.
88
92
* It must create a Publisher for a stream with exactly the given number of elements.
89
- * If ` elements` is zero the produced stream must be infinite.
93
+ * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
90
94
* The stream must not produce the same element twice (in case of an infinite stream this requirement
91
95
* is relaxed to only apply to the elements that are actually requested during all tests).
92
96
*/
93
- public abstract Publisher <T > createHelperPublisher (int elements );
97
+ public abstract Publisher <T > createHelperPublisher (long elements );
94
98
95
99
/**
96
100
* Return a Publisher in {@code completed} state in order to run additional tests on it,
@@ -112,22 +116,19 @@ public void setUp() throws Exception {
112
116
}
113
117
114
118
////////////////////// PUBLISHER RULES VERIFICATION ///////////////////////////
115
- // 4.1
116
- // A Processor represents a processing stage—which is both a Subscriber and a Publisher
117
- // It MUST obey the contracts of both [1]
118
-
119
119
120
- // A Publisher
121
- // must obey all Publisher rules on its producing side
122
- public Publisher <T > createPublisher (int elements ) {
123
- Processor <T , T > processor = createIdentityProcessor (testBufferSize );
120
+ // A Processor
121
+ // must obey all Publisher rules on its publishing side
122
+ public Publisher <T > createPublisher (long elements ) {
123
+ Processor <T , T > processor = createIdentityProcessor (processorBufferSize );
124
124
Publisher <T > pub = createHelperPublisher (elements );
125
125
pub .subscribe (processor );
126
126
return processor ; // we run the PublisherVerification against this
127
127
}
128
128
129
129
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER //////////////////////
130
-
130
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
131
+
131
132
@ Test
132
133
public void createPublisher3MustProduceAStreamOfExactly3Elements () throws Throwable {
133
134
publisherVerification .createPublisher3MustProduceAStreamOfExactly3Elements ();
@@ -144,7 +145,7 @@ public void spec102_maySignalLessThanRequestedAndTerminateSubscription() throws
144
145
}
145
146
146
147
@ Test
147
- public void spec103_mustSignalOnMethodsSequentially () throws Exception {
148
+ public void spec103_mustSignalOnMethodsSequentially () throws Throwable {
148
149
publisherVerification .spec103_mustSignalOnMethodsSequentially ();
149
150
}
150
151
@@ -213,11 +214,11 @@ public void spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSu
213
214
publisherVerification .spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber ();
214
215
}
215
216
216
- // A Processor
217
- // must call `onError` on all its subscribers if it encounters a non-recoverable error
217
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
218
+ // for multiple subscribers
218
219
@ Test
219
220
public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError () throws Exception {
220
- new TestSetup (env , testBufferSize ) {{
221
+ new TestSetup (env , processorBufferSize ) {{
221
222
ManualSubscriberWithErrorCollection <T > sub1 = new ManualSubscriberWithErrorCollection <T >(env );
222
223
env .subscribe (processor , sub1 );
223
224
ManualSubscriberWithErrorCollection <T > sub2 = new ManualSubscriberWithErrorCollection <T >(env );
@@ -242,11 +243,12 @@ public void spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecovera
242
243
}
243
244
244
245
////////////////////// SUBSCRIBER RULES VERIFICATION ///////////////////////////
246
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
245
247
246
248
// A Processor
247
249
// must obey all Subscriber rules on its consuming side
248
250
public Subscriber <T > createSubscriber (final SubscriberVerification .SubscriberProbe <T > probe ) {
249
- Processor <T , T > processor = createIdentityProcessor (testBufferSize );
251
+ Processor <T , T > processor = createIdentityProcessor (processorBufferSize );
250
252
processor .subscribe (
251
253
new Subscriber <T >() {
252
254
public void onSubscribe (final Subscription subscription ) {
@@ -288,7 +290,7 @@ public void onError(Throwable cause) {
288
290
// must cancel its upstream Subscription if its last downstream Subscription has been cancelled
289
291
@ Test
290
292
public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasBeenCancelled () throws Exception {
291
- new TestSetup (env , testBufferSize ) {{
293
+ new TestSetup (env , processorBufferSize ) {{
292
294
ManualSubscriber <T > sub = newSubscriber ();
293
295
sub .cancel ();
294
296
expectCancelling ();
@@ -301,7 +303,7 @@ public void mustCancelItsUpstreamSubscriptionIfItsLastDownstreamSubscriptionHasB
301
303
// must immediately pass on `onError` events received from its upstream to its downstream
302
304
@ Test
303
305
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream () throws Exception {
304
- new TestSetup (env , testBufferSize ) {{
306
+ new TestSetup (env , processorBufferSize ) {{
305
307
ManualSubscriberWithErrorCollection <T > sub = new ManualSubscriberWithErrorCollection <T >(env );
306
308
env .subscribe (processor , sub );
307
309
@@ -317,7 +319,7 @@ public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownst
317
319
// must be prepared to receive incoming elements from its upstream even if a downstream subscriber has not requested anything yet
318
320
@ Test
319
321
public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstreamSubscriberHasNotRequestedYet () throws Exception {
320
- new TestSetup (env , testBufferSize ) {{
322
+ new TestSetup (env , processorBufferSize ) {{
321
323
ManualSubscriber <T > sub = newSubscriber ();
322
324
final T x = sendNextTFromUpstream ();
323
325
sub .expectNone (50 );
@@ -337,6 +339,7 @@ public void mustBePreparedToReceiveIncomingElementsFromItsUpstreamEvenIfADownstr
337
339
}
338
340
339
341
/////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER //////////////////////
342
+ // Verifies rule: https://github.com/reactive-streams/reactive-streams#4.1
340
343
341
344
@ Test
342
345
public void exerciseHappyPath () throws Throwable {
@@ -520,10 +523,11 @@ public void spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Thr
520
523
521
524
/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
522
525
523
- // trigger `requestFromUpstream` for elements that have been requested 'long ago'
526
+ // A Processor
527
+ // must trigger `requestFromUpstream` for elements that have been requested 'long ago'
524
528
@ Test
525
529
public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo () throws Exception {
526
- new TestSetup (env , testBufferSize ) {{
530
+ new TestSetup (env , processorBufferSize ) {{
527
531
ManualSubscriber <T > sub1 = newSubscriber ();
528
532
sub1 .request (20 );
529
533
@@ -566,19 +570,20 @@ public void mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() thr
566
570
}};
567
571
}
568
572
569
- // unblock the stream if a 'blocking' subscription has been cancelled
573
+ // A Processor
574
+ // must unblock the stream if a 'blocking' subscription has been cancelled
570
575
@ Test
571
576
@ SuppressWarnings ("unchecked" )
572
577
public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled () throws InterruptedException {
573
- new TestSetup (env , testBufferSize ) {{
578
+ new TestSetup (env , processorBufferSize ) {{
574
579
ManualSubscriber <T > sub1 = newSubscriber ();
575
580
ManualSubscriber <T > sub2 = newSubscriber ();
576
581
577
- sub1 .request (testBufferSize + 1 );
582
+ sub1 .request (processorBufferSize + 1 );
578
583
long pending = 0 ;
579
584
int sent = 0 ;
580
- final T [] tees = (T []) new Object [testBufferSize ];
581
- while (sent < testBufferSize ) {
585
+ final T [] tees = (T []) new Object [processorBufferSize ];
586
+ while (sent < processorBufferSize ) {
582
587
if (pending == 0 ) {
583
588
pending = expectRequest ();
584
589
}
@@ -588,7 +593,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
588
593
pending -= 1 ;
589
594
}
590
595
591
- expectNoRequest (); // because we only have buffer size testBufferSize and sub2 hasn't seen the first value yet
596
+ expectNoRequest (); // because we only have buffer size processorBufferSize and sub2 hasn't seen the first value yet
592
597
sub2 .cancel (); // must "unblock"
593
598
594
599
expectRequest ();
@@ -606,16 +611,14 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
606
611
/////////////////////// TEST INFRASTRUCTURE //////////////////////
607
612
608
613
public abstract class TestSetup extends ManualPublisher <T > {
609
- private ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
614
+ final private ManualSubscriber <T > tees ; // gives us access to an infinite stream of T values
610
615
private Set <T > seenTees = new HashSet <T >();
611
616
612
617
final Processor <T , T > processor ;
613
- final int testBufferSize ;
614
618
615
619
public TestSetup (TestEnvironment env , int testBufferSize ) throws InterruptedException {
616
620
super (env );
617
- this .testBufferSize = testBufferSize ;
618
- tees = env .newManualSubscriber (createHelperPublisher (0 ));
621
+ tees = env .newManualSubscriber (createHelperPublisher (Long .MAX_VALUE ));
619
622
processor = createIdentityProcessor (testBufferSize );
620
623
subscribe (processor );
621
624
}
0 commit comments