Skip to content

Commit 0899761

Browse files
jroperviktorklang
authored andcommitted
Wait for demand before signaling onNext (#441)
* Wait for demand before signaling onNext Fixes #277. Fixes two whitebox subscriber tests where they weren't waiting for demand before signaling onNext. * Added unit tests ensuring that requests have been expected
1 parent ffd1ed7 commit 0899761

File tree

2 files changed

+165
-3
lines changed

2 files changed

+165
-3
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java

+6
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCall
267267
public void run(WhiteboxTestStage stage) throws InterruptedException {
268268
stage.puppet().triggerRequest(1);
269269
stage.puppet().signalCancel();
270+
stage.expectRequest();
270271
stage.signalNext();
271272

272273
stage.puppet().triggerRequest(1);
@@ -437,7 +438,12 @@ public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(
437438
@Override
438439
public void run(WhiteboxTestStage stage) throws InterruptedException {
439440
stage.puppet().triggerRequest(2);
441+
long requestedElements = stage.expectRequest();
440442
stage.probe.expectNext(stage.signalNext());
443+
// Some subscribers may only request one element at a time.
444+
if (requestedElements < 2) {
445+
stage.expectRequest();
446+
}
441447
stage.probe.expectNext(stage.signalNext());
442448

443449
stage.probe.expectNone();

Diff for: tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java

+159-3
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@
2323

2424
import java.util.concurrent.ExecutorService;
2525
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.TimeUnit;
2628
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicLong;
2730

2831
/**
2932
* Validates that the TCK's {@link SubscriberWhiteboxVerification} fails with nice human readable errors.
3033
* <b>Important: Please note that all Subscribers implemented in this file are *wrong*!</b>
3134
*/
3235
public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport {
3336

34-
private ExecutorService ex;
35-
@BeforeClass void before() { ex = Executors.newFixedThreadPool(4); }
37+
private ScheduledExecutorService ex;
38+
@BeforeClass void before() { ex = Executors.newScheduledThreadPool(4); }
3639
@AfterClass void after() { if (ex != null) ex.shutdown(); }
3740

3841
@Test
@@ -217,6 +220,51 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
217220
}, "But I thought it's cancelled!");
218221
}
219222

223+
@Test
224+
public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel_shouldWaitForDemandBeforeSignalling() throws Throwable {
225+
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
226+
@Override
227+
public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {
228+
229+
final AtomicBoolean demandRequested = new AtomicBoolean(false);
230+
231+
return new SimpleSubscriberWithProbe(probe) {
232+
@Override public void onSubscribe(final Subscription s) {
233+
this.subscription = s;
234+
probe.registerOnSubscribe(new SubscriberPuppet() {
235+
@Override public void triggerRequest(final long elements) {
236+
ex.schedule(new Runnable() {
237+
@Override
238+
public void run() {
239+
demandRequested.set(true);
240+
subscription.request(elements);
241+
}
242+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
243+
}
244+
245+
@Override public void signalCancel() {
246+
// Delay this too to ensure that cancel isn't invoked before request.
247+
ex.schedule(new Runnable() {
248+
@Override
249+
public void run() {
250+
subscription.cancel();
251+
}
252+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
253+
}
254+
});
255+
}
256+
257+
@Override public void onNext(Integer element) {
258+
if (!demandRequested.get()) {
259+
throw new RuntimeException("onNext signalled without demand!");
260+
}
261+
probe.registerOnNext(element);
262+
}
263+
};
264+
}
265+
}).required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
266+
}
267+
220268
@Test
221269
public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
222270
requireTestFailure(new ThrowingRunnable() {
@@ -299,7 +347,7 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
299347
}
300348

301349
@Test
302-
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldFail() throws Throwable {
350+
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldPass() throws Throwable {
303351
// sanity checks the "happy path", that triggerRequest() propagates the right demand
304352
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
305353
@Override
@@ -309,6 +357,114 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
309357
}).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
310358
}
311359

360+
@Test
361+
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandBeforeSignalling() throws Throwable {
362+
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
363+
@Override
364+
public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {
365+
366+
final AtomicBoolean demandRequested = new AtomicBoolean(false);
367+
return new SimpleSubscriberWithProbe(probe) {
368+
@Override
369+
public void onSubscribe(Subscription s) {
370+
this.subscription = s;
371+
probe.registerOnSubscribe(new SubscriberPuppet() {
372+
@Override
373+
public void triggerRequest(final long elements) {
374+
ex.schedule(new Runnable() {
375+
@Override
376+
public void run() {
377+
demandRequested.set(true);
378+
subscription.request(elements);
379+
}
380+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
381+
}
382+
383+
@Override
384+
public void signalCancel() {
385+
// Delay this too to ensure that cancel isn't invoked before request.
386+
ex.schedule(new Runnable() {
387+
@Override
388+
public void run() {
389+
subscription.cancel();
390+
}
391+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
392+
}
393+
});
394+
}
395+
396+
@Override
397+
public void onNext(Integer element) {
398+
if (!demandRequested.get()) {
399+
throw new RuntimeException("onNext signalled without demand!");
400+
}
401+
probe.registerOnNext(element);
402+
}
403+
};
404+
}
405+
}).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
406+
}
407+
408+
@Test
409+
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandTwiceForOneAtATimeSubscribers() throws Throwable {
410+
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
411+
@Override
412+
public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {
413+
414+
final AtomicLong outstandingRequest = new AtomicLong(0);
415+
final AtomicBoolean demandRequested = new AtomicBoolean();
416+
return new SimpleSubscriberWithProbe(probe) {
417+
@Override
418+
public void onSubscribe(Subscription s) {
419+
this.subscription = s;
420+
probe.registerOnSubscribe(new SubscriberPuppet() {
421+
@Override
422+
public void triggerRequest(final long elements) {
423+
outstandingRequest.getAndAdd(elements);
424+
ex.schedule(new Runnable() {
425+
@Override
426+
public void run() {
427+
demandRequested.set(true);
428+
subscription.request(1);
429+
}
430+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
431+
}
432+
433+
@Override
434+
public void signalCancel() {
435+
// Delay this too to ensure that cancel isn't invoked before request.
436+
ex.schedule(new Runnable() {
437+
@Override
438+
public void run() {
439+
subscription.cancel();
440+
}
441+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
442+
}
443+
});
444+
}
445+
446+
@Override
447+
public void onNext(Integer element) {
448+
if (!demandRequested.getAndSet(false)) {
449+
throw new RuntimeException("onNext signalled without demand!");
450+
}
451+
if (outstandingRequest.decrementAndGet() > 0) {
452+
ex.schedule(new Runnable() {
453+
@Override
454+
public void run() {
455+
demandRequested.set(true);
456+
subscription.request(1);
457+
}
458+
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
459+
}
460+
probe.registerOnNext(element);
461+
}
462+
};
463+
}
464+
}).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
465+
}
466+
467+
312468
// FAILING IMPLEMENTATIONS //
313469

314470
/**

0 commit comments

Comments
 (0)