Skip to content

Commit 1836379

Browse files
committed
+tck reactive-streams#362 wait for request signal in 209, and new additional tests
1 parent 2879ce0 commit 1836379

File tree

4 files changed

+62
-48
lines changed

4 files changed

+62
-48
lines changed

tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+30-46
Original file line numberDiff line numberDiff line change
@@ -244,61 +244,29 @@ public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterH
244244
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
245245
@Override @Test
246246
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
247-
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
248-
@Override
247+
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
248+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
249249
public void run(BlackboxTestStage stage) throws Throwable {
250-
final Publisher<T> pub = new Publisher<T>() {
251-
@Override public void subscribe(final Subscriber<? super T> s) {
252-
s.onSubscribe(new Subscription() {
253-
private boolean completed = false;
254-
255-
@Override public void request(long n) {
256-
if (!completed) {
257-
completed = true;
258-
s.onComplete(); // Publisher now realises that it is in fact already completed
259-
}
260-
}
261-
262-
@Override public void cancel() {
263-
// noop, ignore
264-
}
265-
});
266-
}
267-
};
268-
269-
final Subscriber<T> sub = createSubscriber();
270-
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
271-
272-
pub.subscribe(probe);
250+
final Subscriber<? super T> sub = stage.sub();
251+
273252
triggerRequest(sub);
274-
probe.expectCompletion();
275-
probe.expectNone();
276-
277-
env.verifyNoAsyncErrorsNoDelay();
253+
final long notUsed = stage.expectRequest(); // received request signal
254+
sub.onComplete();
255+
stage.subProxy().expectCompletion();
278256
}
279257
});
280258
}
281259

282260
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9
283261
@Override @Test
284262
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
285-
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
286-
@Override
263+
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
264+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
287265
public void run(BlackboxTestStage stage) throws Throwable {
288-
final Publisher<T> pub = new Publisher<T>() {
289-
@Override
290-
public void subscribe(Subscriber<? super T> s) {
291-
s.onComplete();
292-
}
293-
};
294-
295-
final Subscriber<T> sub = createSubscriber();
296-
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
297-
298-
pub.subscribe(probe);
299-
probe.expectCompletion();
266+
final Subscriber<? super T> sub = stage.sub();
300267

301-
env.verifyNoAsyncErrorsNoDelay();
268+
sub.onComplete();
269+
stage.subProxy().expectCompletion();
302270
}
303271
});
304272
}
@@ -307,9 +275,25 @@ public void subscribe(Subscriber<? super T> s) {
307275
@Override @Test
308276
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
309277
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
310-
@Override
311-
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
278+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
279+
public void run(BlackboxTestStage stage) throws Throwable {
280+
final Subscriber<? super T> sub = stage.sub();
281+
282+
triggerRequest(sub);
283+
final long notUsed = stage.expectRequest(); // received request signal
284+
sub.onError(new TestException()); // in response to that, we fail
285+
stage.subProxy().expectError(Throwable.class);
286+
}
287+
});
288+
}
289+
290+
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
291+
@Override @Test
292+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
293+
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
294+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
312295
public void run(BlackboxTestStage stage) throws Throwable {
296+
313297
stage.sub().onError(new TestException());
314298
stage.subProxy().expectError(Throwable.class);
315299
}

tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public interface SubscriberBlackboxVerificationRules {
2828
void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable;
2929
void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable;
3030
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable;
31+
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable;
3132
void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception;
3233
void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable;
3334
void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception;

tck/src/test/java/org/reactivestreams/tck/SingleElementPublisherTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
package org.reactivestreams.tck;
1313

1414
import org.reactivestreams.Publisher;
15+
import org.reactivestreams.Subscriber;
16+
import org.reactivestreams.Subscription;
1517
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
1618
import org.testng.annotations.AfterClass;
1719
import org.testng.annotations.BeforeClass;

tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
119119
// don't even request()
120120
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
121121
}
122-
}, "did not call `registerOnComplete()`");
122+
}, "Did not receive expected `request` call within");
123123
}
124124

125125
@Test
@@ -130,11 +130,38 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
130130
}
131131

132132
@Test
133-
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
133+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldPass_withRequestingSubscriber() throws Throwable {
134+
customSubscriberVerification(new NoopSubscriber() {
135+
@Override
136+
public void onSubscribe(Subscription s) {
137+
s.request(1); // request anything
138+
}
139+
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
140+
}
141+
142+
@Test
143+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withNoopSubscriber() throws Throwable {
144+
requireTestFailure(new ThrowingRunnable() {
145+
@Override
146+
public void run() throws Throwable {
147+
customSubscriberVerification(new NoopSubscriber() {
148+
// not requesting, so we can't test the "request followed by failure" scenario
149+
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
150+
}
151+
}, "Did not receive expected `request` call within");
152+
}
153+
154+
@Test
155+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withThrowingInsideOnError() throws Throwable {
134156
requireTestFailure(new ThrowingRunnable() {
135157
@Override public void run() throws Throwable {
136158

137159
customSubscriberVerification(new NoopSubscriber() {
160+
@Override
161+
public void onSubscribe(Subscription s) {
162+
s.request(1);
163+
}
164+
138165
@Override public void onError(Throwable t) {
139166
// this is wrong in many ways (incl. spec violation), but aims to simulate user code which "blows up" when handling the onError signal
140167
throw new RuntimeException("Wrong, don't do this!", t); // don't do this

0 commit comments

Comments
 (0)