Skip to content

=tck #427 TCK should stress subscriber rule 2.5 in concurrent setting #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -231,6 +234,77 @@ public String toString() {
}};
}

@Override @Test
public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently() throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Travis may not have the proper concurrency to verify this. I'm constantly facing the problem testing race conditions in RxJava requiring two dedicated cores running practically at the same time to trigger small windows around CAS instructions.

This could be the reason the build failed: https://travis-ci.org/reactive-streams/reactive-streams-jvm/jobs/352388267#L1483

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, quite likely it'll be a pain to have travis execute this as intended. Let's discuss on the ticket first though if this PR makes sense or not, I'm fine with just dropping if if we agree this simply rides on the Publisher's guarnatees spec-wise

blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
@Override
public void run(BlackboxTestStage stage) throws Throwable {
final Subscriber<T> sub = createSubscriber();

final AtomicInteger cancellationCounter = new AtomicInteger(0);
final AtomicReference<Subscription> firstAcceptedSubscription = new AtomicReference<Subscription>(null);
final AtomicReference<Subscription> illegallyAcceptedSubscription = new AtomicReference<Subscription>(null);

final int tasksToSubmit = 8 * Runtime.getRuntime().availableProcessors();

Callable<Void> invokeOnSubscribe = new Callable<Void>() {
final Subscriber<? super T> targetSubscriber = sub;

@Override
public Void call() throws Exception {
targetSubscriber.onSubscribe(new Subscription() {

// we keep around the thread name to help identify which thread's subscription was accepted first,
// and then illegally second
final String nameWithThread = "Subscription<concurrently issued on thread " + Thread.currentThread().getName() + ">";

@Override public void request(long elements) {
if (firstAcceptedSubscription.compareAndSet(null, this)) {
env.debug(String.format("Accepted subscription %s (first)", firstAcceptedSubscription.get()));
// first one accepted, this is legal
// <empty on purpose, the effect we need has been achieved by the CAS above)
} else {
// seems two subscriptions were issued a request() signal concurrently;
// this is not legal, since only ONE should have been accepted; and the other one cancelled.
env.flop(String.format("Subscriber %s ilegally accepted and issued request() to two subscriptions, violating rule 2.5; " +
"The accepted subscriptions are: %s and %s", sub, this, firstAcceptedSubscription.get()));
}
}

@Override public void cancel() {
int cancelledCounter = cancellationCounter.incrementAndGet();
env.debug(String.format("Subscription %s rejected; All but one subscriptions are expected to be rejected here; " +
"Already rejected subscriptions: %d out of %d total subscriptions.", this, cancelledCounter, tasksToSubmit));
}

@Override public String toString() { return nameWithThread; }
});
return null;
}
};

List<Callable<Void>> invokeOnSubscribeConcurrentlyTasks = new ArrayList<Callable<Void>>(tasksToSubmit);
for (int i = 0; i < tasksToSubmit; i++) {
invokeOnSubscribeConcurrentlyTasks.add(invokeOnSubscribe);
}

List<Future<Void>> futures = Executors.newFixedThreadPool(16).invokeAll(invokeOnSubscribeConcurrentlyTasks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this Executor shut down when no longer needed? Also having more threads than cores may actually reduce the chance of hitting the desired concurrency effect because of the increased context switching among threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should; first let's agree if this PR makes sense or not -- as we started in the ticket, if not, no reason to spend time polishing it ;-)

for (Future<Void> future : futures) {
future.get(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS);
}

try {
env.verifyNoAsyncErrors();
} finally {
Subscription accepted = firstAcceptedSubscription.get();
if (accepted != null) accepted.cancel();
}

// sendCompletion(); // we're done, complete the subscriber under test
}
});
}

@Override @Test
public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
notVerified(); // cannot be meaningfully tested, or can it?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.testng.Assert.assertTrue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,25 @@ public interface SubscriberBlackboxVerificationRules {
* </ul>
*/
void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception;

/**
* Asks for a {@code Subscriber}, signals {@code onSubscribe} from different threads ...
* <p>
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#2.5'>2.5</a>
* <p>
* Notes:
* <ul>
* <li>The test doesn't signal any other events than {@code onSubscribe} and may cause resource leak in
* {@code Subscriber}s that expect a finite {@code Publisher}.
* </ul>
* <p>
* If this test fails, the following could be checked within the {@code Subscriber} implementation:
* <ul>
* <li>if the {@code Subscribe.onSubscribe} implementation actually tries to detect multiple calls to it,</li>
* <li>if the second {@code Subscription} is cancelled asynchronously and that takes longer time than
* the {@code TestEnvironment}'s timeout permits.</li>
* </ul>
*/
void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently() throws Throwable;
/**
* Currently, this test is skipped because it requires more control over the {@code Subscriber} implementation
* to make it cancel the {@code Subscription} for some external condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Validates that the TCK's {@link org.reactivestreams.tck.SubscriberBlackboxVerification} fails with nice human readable errors.
Expand Down Expand Up @@ -137,10 +138,68 @@ public void onComplete() {
completion.countDown();
}
}).required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal();

completion.await(1, TimeUnit.SECONDS);
}

@Test
public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently_shouldFail() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customSubscriberVerification(
// THIS IS AN INCORRECT IMPLEMENTATION SINCE THE subscription ACCESS IS NOT SYNCHRONIZED:
new NoopSubscriber() {

Subscription sub = null; // on purpose missing volatile or any external synchronization

@Override
public void onSubscribe(Subscription s) {
if (this.sub == null) { // since sub is not volatile, this is racy, and we may accept (illegally) multiple subscriptions

// This sleep is only here to make the "test testing the TCK" with this illegal implementation
// more shockinly expose the potential race condition here. In general implementations which naively
// check for sub == null, and then assign / assume this accept was valid, are prone to the race
// condition of multiple publishers sending them a subscription concurrently, thus one may accidentally
// accept two subscriptions -- which is illegal
//
// The sleep is not necessary to expose this race, however it makes this test testing the TCK more reliable on CI.
try { Thread.sleep(10); }
catch (InterruptedException ignored) {}

this.sub = s;
s.request(1);
} else {
s.cancel();
}
}
}
).required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently();
}
}, "ilegally accepted and issued request() to two subscriptions, violating rule 2.5; The accepted subscriptions are");
}

@Test
public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently_shouldPass() throws Throwable {
customSubscriberVerification(
new NoopSubscriber() {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();

@Override
public void onSubscribe(Subscription s) {
if (sub.compareAndSet(null, s)) {
// successfully accepted subscription, may issue requests to it
s.request(1);
// s.cancel(); // this is fine
} else {
// all others must be cancelled
s.cancel();
}
}
}
).required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently();
}


@Test
public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
Expand Down