Skip to content

Commit 8f38503

Browse files
committed
Merge pull request reactive-streams#27 from ktoso/tck-more-public-ktoso
+tck add missing `public` modifiers,
2 parents dad6b35 + d8e5420 commit 8f38503

File tree

5 files changed

+74
-68
lines changed

5 files changed

+74
-68
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+23-25
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ public IdentityProcessorVerification(final TestEnvironment env, long publisherSh
4545

4646
this.subscriberVerification = new SubscriberVerification<T>(env) {
4747
@Override
48-
Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
48+
public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
4949
return IdentityProcessorVerification.this.createSubscriber(probe);
5050
}
5151

5252
@Override
53-
Publisher<T> createHelperPublisher(int elements) {
53+
public Publisher<T> createHelperPublisher(int elements) {
5454
return IdentityProcessorVerification.this.createHelperPublisher(elements);
5555
}
5656
};
@@ -156,30 +156,28 @@ public void mustStartProducingWithTheOldestStillAvailableElementForASubscriber()
156156
// must call `onError` on all its subscribers if it encounters a non-recoverable error
157157
@Test
158158
public void mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
159-
new TestSetup(env, testBufferSize) {
160-
{
161-
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
162-
env.subscribe(processor.getPublisher(), sub1);
163-
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
164-
env.subscribe(processor.getPublisher(), sub2);
165-
166-
sub1.requestMore(1);
167-
expectRequestMore();
168-
final T x = sendNextTFromUpstream();
169-
expectNextElement(sub1, x);
170-
sub1.requestMore(1);
159+
new TestSetup(env, testBufferSize) {{
160+
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
161+
env.subscribe(processor.getPublisher(), sub1);
162+
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
163+
env.subscribe(processor.getPublisher(), sub2);
171164

172-
// sub1 now has received and element and has 1 pending
173-
// sub2 has not yet requested anything
165+
sub1.requestMore(1);
166+
expectRequestMore();
167+
final T x = sendNextTFromUpstream();
168+
expectNextElement(sub1, x);
169+
sub1.requestMore(1);
174170

175-
Exception ex = new RuntimeException("Test exception");
176-
sendError(ex);
177-
sub1.expectError(ex);
178-
sub2.expectError(ex);
171+
// sub1 now has received and element and has 1 pending
172+
// sub2 has not yet requested anything
179173

180-
env.verifyNoAsyncErrors();
181-
}
182-
};
174+
Exception ex = new RuntimeException("Test exception");
175+
sendError(ex);
176+
sub1.expectError(ex);
177+
sub2.expectError(ex);
178+
179+
env.verifyNoAsyncErrors();
180+
}};
183181
}
184182

185183
@Test
@@ -498,7 +496,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws
498496

499497
/////////////////////// TEST INFRASTRUCTURE //////////////////////
500498

501-
abstract class TestSetup extends ManualPublisher<T> {
499+
public abstract class TestSetup extends ManualPublisher<T> {
502500
private TestEnvironment.ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
503501
private Set<T> seenTees = new HashSet<T>();
504502

@@ -540,7 +538,7 @@ public T sendNextTFromUpstream() throws InterruptedException {
540538
}
541539
}
542540

543-
private class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
541+
public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
544542
TestEnvironment.Promise<Throwable> error;
545543

546544
public ManualSubscriberWithErrorCollection(TestEnvironment env) {

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() {
487487

488488
/////////////////////// TEST INFRASTRUCTURE //////////////////////
489489

490-
interface PublisherTestRun<T> {
490+
public interface PublisherTestRun<T> {
491491
public void run(Publisher<T> pub) throws Throwable;
492492
}
493493

tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java

+28-28
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ protected SubscriberVerification(TestEnvironment env) {
2222
* In order to be meaningfully testable your Subscriber must inform the given
2323
* `SubscriberProbe` of the respective events having been received.
2424
*/
25-
abstract Subscriber<T> createSubscriber(SubscriberProbe<T> probe);
25+
public abstract Subscriber<T> createSubscriber(SubscriberProbe<T> probe);
2626

2727
/**
2828
* Helper method required for generating test elements.
2929
* It must create a Publisher for a stream with exactly the given number of elements.
3030
* If `elements` is zero the produced stream must be infinite.
3131
*/
32-
abstract Publisher<T> createHelperPublisher(int elements);
32+
public abstract Publisher<T> createHelperPublisher(int elements);
3333

3434
////////////////////// TEST SETUP VERIFICATION ///////////////////////////
3535

3636
@Test
37-
void exerciseHappyPath() throws InterruptedException {
37+
public void exerciseHappyPath() throws InterruptedException {
3838
new TestSetup(env) {{
3939
puppet().triggerRequestMore(1);
4040

@@ -63,7 +63,7 @@ void exerciseHappyPath() throws InterruptedException {
6363
// must asynchronously schedule a respective event to the subscriber
6464
// must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
6565
@Test
66-
void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
66+
public void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
6767
// cannot be meaningfully tested, or can it?
6868
}
6969

@@ -72,14 +72,14 @@ void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
7272
// must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
7373
// must consider the Subscription cancelled after having received the event
7474
@Test
75-
void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
75+
public void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
7676
// cannot be meaningfully tested, or can it?
7777
}
7878

7979
// A Subscriber
8080
// must not accept an `onSubscribe` event if it already has an active Subscription
8181
@Test
82-
void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
82+
public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
8383
new TestSetup(env) {{
8484
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
8585
sub().onSubscribe(
@@ -100,7 +100,7 @@ public void cancel() {
100100
// A Subscriber
101101
// must call Subscription::cancel during shutdown if it still has an active Subscription
102102
@Test
103-
void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
103+
public void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
104104
new TestSetup(env) {{
105105
puppet().triggerShutdown();
106106
expectCancelling();
@@ -112,14 +112,14 @@ void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription()
112112
// A Subscriber
113113
// must ensure that all calls on a Subscription take place from the same thread or provide for respective external synchronization
114114
@Test
115-
void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
115+
public void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
116116
// cannot be meaningfully tested, or can it?
117117
}
118118

119119
// A Subscriber
120120
// must be prepared to receive one or more `onNext` events after having called Subscription::cancel
121121
@Test
122-
void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
122+
public void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
123123
new TestSetup(env) {{
124124
puppet().triggerRequestMore(1);
125125
puppet().triggerCancel();
@@ -133,7 +133,7 @@ void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCa
133133
// A Subscriber
134134
// must be prepared to receive an `onComplete` event with a preceding Subscription::requestMore call
135135
@Test
136-
void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
136+
public void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
137137
new TestSetup(env) {{
138138
puppet().triggerRequestMore(1);
139139
sendCompletion();
@@ -146,7 +146,7 @@ void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMo
146146
// A Subscriber
147147
// must be prepared to receive an `onComplete` event without a preceding Subscription::requestMore call
148148
@Test
149-
void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
149+
public void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
150150
new TestSetup(env) {{
151151
sendCompletion();
152152
probe.expectCompletion();
@@ -158,7 +158,7 @@ void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionReques
158158
// A Subscriber
159159
// must be prepared to receive an `onError` event with a preceding Subscription::requestMore call
160160
@Test
161-
void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
161+
public void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
162162
new TestSetup(env) {{
163163
puppet().triggerRequestMore(1);
164164
Exception ex = new RuntimeException("Test exception");
@@ -172,7 +172,7 @@ void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore(
172172
// A Subscriber
173173
// must be prepared to receive an `onError` event without a preceding Subscription::requestMore call
174174
@Test
175-
void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
175+
public void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
176176
new TestSetup(env) {{
177177
Exception ex = new RuntimeException("Test exception");
178178
sendError(ex);
@@ -184,15 +184,15 @@ void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMo
184184
// A Subscriber
185185
// must make sure that all calls on its `onXXX` methods happen-before the processing of the respective events
186186
@Test
187-
void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
187+
public void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
188188
// cannot be meaningfully tested, or can it?
189189
}
190190

191191
/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////
192192

193193
/////////////////////// TEST INFRASTRUCTURE //////////////////////
194194

195-
class TestSetup extends ManualPublisher<T> {
195+
public class TestSetup extends ManualPublisher<T> {
196196
ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
197197
Probe probe;
198198
T lastT = null;
@@ -205,24 +205,24 @@ public TestSetup(TestEnvironment env) throws InterruptedException {
205205
probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
206206
}
207207

208-
Subscriber<T> sub() {
208+
public Subscriber<T> sub() {
209209
return subscriber.get();
210210
}
211211

212-
SubscriberPuppet puppet() {
212+
public SubscriberPuppet puppet() {
213213
return probe.puppet.value();
214214
}
215215

216-
void sendNextTFromUpstream() throws InterruptedException {
216+
public void sendNextTFromUpstream() throws InterruptedException {
217217
sendNext(nextT());
218218
}
219219

220-
T nextT() throws InterruptedException {
220+
public T nextT() throws InterruptedException {
221221
lastT = tees.requestNextElement();
222222
return lastT;
223223
}
224224

225-
class Probe implements SubscriberProbe<T> {
225+
public class Probe implements SubscriberProbe<T> {
226226
Promise<SubscriberPuppet> puppet = new Promise<SubscriberPuppet>(env);
227227
Receptacle<T> elements = new Receptacle<T>(env);
228228
Latch completed = new Latch(env);
@@ -248,30 +248,30 @@ public void registerOnError(Throwable cause) {
248248
error.complete(cause);
249249
}
250250

251-
void expectNext(T expected) throws InterruptedException {
251+
public void expectNext(T expected) throws InterruptedException {
252252
expectNext(expected, env.defaultTimeoutMillis());
253253
}
254254

255-
void expectNext(T expected, long timeoutMillis) throws InterruptedException {
255+
public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
256256
T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
257257
if (!received.equals(expected)) {
258258
env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
259259
}
260260
}
261261

262-
void expectCompletion() throws InterruptedException {
262+
public void expectCompletion() throws InterruptedException {
263263
expectCompletion(env.defaultTimeoutMillis());
264264
}
265265

266-
void expectCompletion(long timeoutMillis) throws InterruptedException {
266+
public void expectCompletion(long timeoutMillis) throws InterruptedException {
267267
completed.expectClose(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
268268
}
269269

270-
void expectError(Throwable expected) throws InterruptedException {
270+
public void expectError(Throwable expected) throws InterruptedException {
271271
expectError(expected, env.defaultTimeoutMillis());
272272
}
273273

274-
void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
274+
public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
275275
error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
276276
if (error.value() != expected) {
277277
env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
@@ -284,7 +284,7 @@ public void verifyNoAsyncErrors() {
284284
}
285285
}
286286

287-
interface SubscriberProbe<T> {
287+
public interface SubscriberProbe<T> {
288288
/**
289289
* Must be called by the test subscriber when it has received the `onSubscribe` event.
290290
*/
@@ -306,7 +306,7 @@ interface SubscriberProbe<T> {
306306
void registerOnError(Throwable cause);
307307
}
308308

309-
interface SubscriberPuppet {
309+
public interface SubscriberPuppet {
310310
void triggerShutdown();
311311

312312
void triggerRequestMore(int elements);

0 commit comments

Comments
 (0)