Skip to content

Commit 940a51f

Browse files
authored
Merge pull request #374 from ktoso/wip-209-210-improve
+tck #362 wait for request signal in 209, and new additional tests
2 parents 8d4b33b + 9910b8b commit 940a51f

File tree

4 files changed

+72
-59
lines changed

4 files changed

+72
-59
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+27-57
Original file line numberDiff line numberDiff line change
@@ -249,79 +249,49 @@ public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterH
249249

250250
@Override @Test
251251
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
252-
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
253-
@Override
252+
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
253+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
254254
public void run(BlackboxTestStage stage) throws Throwable {
255-
final Publisher<T> pub = new Publisher<T>() {
256-
@Override public void subscribe(final Subscriber<? super T> s) {
257-
s.onSubscribe(new Subscription() {
258-
private boolean completed = false;
259-
260-
@Override public void request(long n) {
261-
if (!completed) {
262-
completed = true;
263-
s.onComplete(); // Publisher now realises that it is in fact already completed
264-
}
265-
}
266-
267-
@Override public void cancel() {
268-
// noop, ignore
269-
}
270-
});
271-
}
272-
};
273-
274-
final Subscriber<T> sub = createSubscriber();
275-
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
276-
277-
pub.subscribe(probe);
278-
triggerRequest(sub);
279-
probe.expectCompletion();
280-
probe.expectNone();
281-
282-
env.verifyNoAsyncErrorsNoDelay();
255+
triggerRequest(stage.subProxy().sub());
256+
final long notUsed = stage.expectRequest(); // received request signal
257+
stage.sub().onComplete();
258+
stage.subProxy().expectCompletion();
283259
}
284260
});
285261
}
286262

287263
@Override @Test
288264
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
289-
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
290-
@Override
265+
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
266+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
291267
public void run(BlackboxTestStage stage) throws Throwable {
292-
final Publisher<T> pub = new Publisher<T>() {
293-
@Override
294-
public void subscribe(final Subscriber<? super T> s) {
295-
s.onSubscribe(new Subscription() {
296-
@Override public void request(long n) {
297-
// do nothing...
298-
}
299-
@Override public void cancel() {
300-
// do nothing...
301-
}
302-
});
303-
// immediately complete
304-
s.onComplete();
305-
}
306-
};
307-
308-
final Subscriber<T> sub = createSubscriber();
309-
final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub);
310-
311-
pub.subscribe(probe);
312-
probe.expectCompletion();
313-
314-
env.verifyNoAsyncErrorsNoDelay();
268+
final Subscriber<? super T> sub = stage.sub();
269+
sub.onComplete();
270+
stage.subProxy().expectCompletion();
315271
}
316272
});
317273
}
318274

319275
@Override @Test
320276
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
321277
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
322-
@Override
323-
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
278+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
324279
public void run(BlackboxTestStage stage) throws Throwable {
280+
triggerRequest(stage.subProxy().sub());
281+
final long notUsed = stage.expectRequest(); // received request signal
282+
stage.sub().onError(new TestException()); // in response to that, we fail
283+
stage.subProxy().expectError(Throwable.class);
284+
}
285+
});
286+
}
287+
288+
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
289+
@Override @Test
290+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
291+
blackboxSubscriberTest(new BlackboxTestStageTestRun() {
292+
@Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
293+
public void run(BlackboxTestStage stage) throws Throwable {
294+
325295
stage.sub().onError(new TestException());
326296
stage.subProxy().expectError(Throwable.class);
327297
}

Diff for: tck/src/main/java/org/reactivestreams/tck/support/SubscriberBlackboxVerificationRules.java

+14
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,20 @@ public interface SubscriberBlackboxVerificationRules {
216216
* </ul>
217217
*/
218218
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable;
219+
220+
/**
221+
* Asks for a {@code Subscriber}, signals {@code onSubscribe} followed by an {@code onError} synchronously.
222+
* <p>
223+
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#2.10'>2.10</a>
224+
* <p>
225+
* If this test fails, the following could be checked within the {@code Subscriber} implementation:
226+
* <ul>
227+
* <li>if the {@code Subscriber} throws an unchecked exception from its {@code onSubscribe} or
228+
* {@code onError} methods.
229+
* </ul>
230+
*/
231+
void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable;
232+
219233
/**
220234
* Currently, this test is skipped because it would require analyzing what the {@code Subscriber} implementation
221235
* does.

Diff for: tck/src/test/java/org/reactivestreams/tck/SingleElementPublisherTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
package org.reactivestreams.tck;
1313

1414
import org.reactivestreams.Publisher;
15+
import org.reactivestreams.Subscriber;
16+
import org.reactivestreams.Subscription;
1517
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
1618
import org.testng.annotations.AfterClass;
1719
import org.testng.annotations.BeforeClass;

Diff for: tck/src/test/java/org/reactivestreams/tck/SubscriberBlackboxVerificationTest.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
149149
// don't even request()
150150
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
151151
}
152-
}, "did not call `registerOnComplete()`");
152+
}, "Did not receive expected `request` call within");
153153
}
154154

155155
@Test
@@ -160,11 +160,38 @@ public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalW
160160
}
161161

162162
@Test
163-
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
163+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldPass_withRequestingSubscriber() throws Throwable {
164+
customSubscriberVerification(new NoopSubscriber() {
165+
@Override
166+
public void onSubscribe(Subscription s) {
167+
s.request(1); // request anything
168+
}
169+
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
170+
}
171+
172+
@Test
173+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withNoopSubscriber() throws Throwable {
174+
requireTestFailure(new ThrowingRunnable() {
175+
@Override
176+
public void run() throws Throwable {
177+
customSubscriberVerification(new NoopSubscriber() {
178+
// not requesting, so we can't test the "request followed by failure" scenario
179+
}).required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall();
180+
}
181+
}, "Did not receive expected `request` call within");
182+
}
183+
184+
@Test
185+
public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall_shouldFail_withThrowingInsideOnError() throws Throwable {
164186
requireTestFailure(new ThrowingRunnable() {
165187
@Override public void run() throws Throwable {
166188

167189
customSubscriberVerification(new NoopSubscriber() {
190+
@Override
191+
public void onSubscribe(Subscription s) {
192+
s.request(1);
193+
}
194+
168195
@Override public void onError(Throwable t) {
169196
// this is wrong in many ways (incl. spec violation), but aims to simulate user code which "blows up" when handling the onError signal
170197
throw new RuntimeException("Wrong, don't do this!", t); // don't do this

0 commit comments

Comments
 (0)