Skip to content

Commit 64ab75f

Browse files
authored
Merge pull request #372 from ktoso/wip-complete-in-201
=tck #362 signal onComplete in 201 blackbox verification
2 parents 9b463ab + a0eb3f5 commit 64ab75f

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

Diff for: tck/src/main/java/org/reactivestreams/tck/SubscriberBlackboxVerification.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,25 @@ public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() t
9595
@Override
9696
public void run(BlackboxTestStage stage) throws InterruptedException {
9797
triggerRequest(stage.subProxy().sub());
98-
final long n = stage.expectRequest();// assuming subscriber wants to consume elements...
98+
final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
99+
final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand
99100

100101
// should cope with up to requested number of elements
101-
for (int i = 0; i < n; i++)
102+
for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
102103
stage.signalNext();
104+
105+
// we complete after `signalsToEmit` (which can be less than `requested`),
106+
// which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
107+
stage.sendCompletion();
108+
}
109+
110+
/**
111+
* In order to allow some "skid" and not check state on each iteration,
112+
* only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
113+
*/
114+
private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
115+
if (i % checkInterval == 0) return stage.isCancelled();
116+
else return false;
103117
}
104118
});
105119
}

Diff for: tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+4
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,10 @@ public void expectCancelling() throws InterruptedException {
775775
public void expectCancelling(long timeoutMillis) throws InterruptedException {
776776
cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
777777
}
778+
779+
public boolean isCancelled() throws InterruptedException {
780+
return cancelled.isClosed();
781+
}
778782
}
779783

780784
/**

0 commit comments

Comments
 (0)