Skip to content

Commit ccc0832

Browse files
committed
Tune completeImmediatly to allow 3.2 in concurrent settings
1 parent e6fb2ec commit ccc0832

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ public void run(Publisher<T> pub) throws Throwable {
489489
ManualSubscriber<T> sub = new ManualSubscriber<T>(env) {
490490
@Override
491491
public void onSubscribe(Subscription subs) {
492-
this.subscription.complete(subs);
492+
this.subscription.completeImmediatly(subs);
493493

494494
subs.request(1);
495495
subs.request(1);
@@ -505,7 +505,8 @@ public void onNext(T element) {
505505

506506
env.subscribe(pub, sub);
507507

508-
env.verifyNoAsyncErrors();
508+
long delay = env.defaultTimeoutMillis();
509+
env.verifyNoAsyncErrors(delay);
509510
}
510511
});
511512
}
@@ -514,7 +515,7 @@ public void onNext(T element) {
514515
@Required @Test
515516
@Additional(implement = "boundedDepthOfOnNextAndRequestRecursion")
516517
public void spec303_mustNotAllowUnboundedRecursion() throws Throwable {
517-
long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
518+
final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1;
518519

519520
activePublisherTest(oneMoreThanBoundedLimit, new PublisherTestRun<T>() {
520521
@Override

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ public Promise(TestEnvironment env) {
605605
}
606606

607607
private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
608-
volatile private T _value = null;
608+
private volatile T _value = null;
609609

610610
public T value() {
611611
if (isCompleted()) {
@@ -632,7 +632,8 @@ public void complete(T value) {
632632
* Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way
633633
*/
634634
public void completeImmediatly(T value) {
635-
_value = value;
635+
complete(value); // complete!
636+
_value = value; // immediatly!
636637
}
637638

638639
public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {

0 commit comments

Comments
 (0)