-
Notifications
You must be signed in to change notification settings - Fork 534
=tck #427 TCK should stress subscriber rule 2.5 in concurrent setting #428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,8 +25,11 @@ | |
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.*; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; | ||
import static org.testng.Assert.assertTrue; | ||
|
@@ -231,6 +234,77 @@ public String toString() { | |
}}; | ||
} | ||
|
||
@Override @Test | ||
public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignalConcurrently() throws Throwable { | ||
blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { | ||
@Override | ||
public void run(BlackboxTestStage stage) throws Throwable { | ||
final Subscriber<T> sub = createSubscriber(); | ||
|
||
final AtomicInteger cancellationCounter = new AtomicInteger(0); | ||
final AtomicReference<Subscription> firstAcceptedSubscription = new AtomicReference<Subscription>(null); | ||
final AtomicReference<Subscription> illegallyAcceptedSubscription = new AtomicReference<Subscription>(null); | ||
|
||
final int tasksToSubmit = 8 * Runtime.getRuntime().availableProcessors(); | ||
|
||
Callable<Void> invokeOnSubscribe = new Callable<Void>() { | ||
final Subscriber<? super T> targetSubscriber = sub; | ||
|
||
@Override | ||
public Void call() throws Exception { | ||
targetSubscriber.onSubscribe(new Subscription() { | ||
|
||
// we keep around the thread name to help identify which thread's subscription was accepted first, | ||
// and then illegally second | ||
final String nameWithThread = "Subscription<concurrently issued on thread " + Thread.currentThread().getName() + ">"; | ||
|
||
@Override public void request(long elements) { | ||
if (firstAcceptedSubscription.compareAndSet(null, this)) { | ||
env.debug(String.format("Accepted subscription %s (first)", firstAcceptedSubscription.get())); | ||
// first one accepted, this is legal | ||
// <empty on purpose, the effect we need has been achieved by the CAS above) | ||
} else { | ||
// seems two subscriptions were issued a request() signal concurrently; | ||
// this is not legal, since only ONE should have been accepted; and the other one cancelled. | ||
env.flop(String.format("Subscriber %s ilegally accepted and issued request() to two subscriptions, violating rule 2.5; " + | ||
"The accepted subscriptions are: %s and %s", sub, this, firstAcceptedSubscription.get())); | ||
} | ||
} | ||
|
||
@Override public void cancel() { | ||
int cancelledCounter = cancellationCounter.incrementAndGet(); | ||
env.debug(String.format("Subscription %s rejected; All but one subscriptions are expected to be rejected here; " + | ||
"Already rejected subscriptions: %d out of %d total subscriptions.", this, cancelledCounter, tasksToSubmit)); | ||
} | ||
|
||
@Override public String toString() { return nameWithThread; } | ||
}); | ||
return null; | ||
} | ||
}; | ||
|
||
List<Callable<Void>> invokeOnSubscribeConcurrentlyTasks = new ArrayList<Callable<Void>>(tasksToSubmit); | ||
for (int i = 0; i < tasksToSubmit; i++) { | ||
invokeOnSubscribeConcurrentlyTasks.add(invokeOnSubscribe); | ||
} | ||
|
||
List<Future<Void>> futures = Executors.newFixedThreadPool(16).invokeAll(invokeOnSubscribeConcurrentlyTasks); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this Executor shut down when no longer needed? Also having more threads than cores may actually reduce the chance of hitting the desired concurrency effect because of the increased context switching among threads. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it should; first let's agree if this PR makes sense or not -- as we started in the ticket, if not, no reason to spend time polishing it ;-) |
||
for (Future<Void> future : futures) { | ||
future.get(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
|
||
try { | ||
env.verifyNoAsyncErrors(); | ||
} finally { | ||
Subscription accepted = firstAcceptedSubscription.get(); | ||
if (accepted != null) accepted.cancel(); | ||
} | ||
|
||
// sendCompletion(); // we're done, complete the subscriber under test | ||
} | ||
}); | ||
} | ||
|
||
@Override @Test | ||
public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { | ||
notVerified(); // cannot be meaningfully tested, or can it? | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Travis may not have the proper concurrency to verify this. I'm constantly facing the problem testing race conditions in RxJava requiring two dedicated cores running practically at the same time to trigger small windows around CAS instructions.
This could be the reason the build failed: https://travis-ci.org/reactive-streams/reactive-streams-jvm/jobs/352388267#L1483
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, quite likely it'll be a pain to have travis execute this as intended. Let's discuss on the ticket first though if this PR makes sense or not, I'm fine with just dropping if if we agree this simply rides on the Publisher's guarnatees spec-wise