Skip to content

Add SubmissionPublisher TCK test #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 30, 2017

Conversation

akarnokd
Copy link
Contributor

This PR adds a SubmissionPublisher TCK test to the flow-tck subproject.

I've tested it with both Java 9.0.1 (via Gradle) and the custom patch from Doug Lea's comment (via IntelliJ's own test runner configured).

@akarnokd
Copy link
Contributor Author

Travis still runs with Java 9.0.0 :(

public void run() {
while (sp.getNumberOfSubscribers() == 0) {
try {
Thread.sleep(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.onSpinWait()? :)

}
}

for (int i = 0; i < elements; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add && sp.getNumberOfSubscribers() > 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if there are no subscribers, the loop will rush through without blocking.

}

for (int i = 0; i < elements; i++) {
sp.submit(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this stop blocking if all subscribers cancel their subscriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so. This test passes for me:

        SubmissionPublisher<Integer> sp = new SubmissionPublisher<>(
                ForkJoinPool.commonPool(), 1);

        TestConsumer<Integer> tc1 = new TestConsumer<>(0L);
        TestConsumer<Integer> tc2 = new TestConsumer<>(0L);

        sp.subscribe(tc1);
        sp.subscribe(tc2);

        CountDownLatch cdl = new CountDownLatch(1);

        SchedulerServices.single().schedule(() -> {
            for (int i = 0; i < 512; i++) {
                sp.submit(i);
            }
            cdl.countDown();
        });

        Thread.sleep(1000);

        assertEquals(1, cdl.getCount());

        tc1.cancel();
        tc2.cancel();

        assertTrue(cdl.await(5, TimeUnit.SECONDS));

@viktorklang
Copy link
Contributor

@akarnokd Cool, thanks @akarnokd!

@reactive-streams/contributors @DougLea Have a look at this! :)

@viktorklang
Copy link
Contributor

@reactive-streams/contributors Merging this tomorrow unless someone requests more time to review.

Copy link
Contributor

@ktoso ktoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks! Very cool to have the SubmissionPublisher tested here.

Perhaps we should eventually look into running against new releases of the JDK in case something changes without anyone noticing? Though I guess Doug would let us know then

@viktorklang
Copy link
Contributor

Great work, @akarnokd! Thank you

@viktorklang viktorklang merged commit 9f2c879 into reactive-streams:master Nov 30, 2017
@viktorklang viktorklang added this to the 1.0.2 milestone Nov 30, 2017
@DougLea
Copy link
Contributor

DougLea commented Nov 30, 2017

Yes, thanks very much @akarnokd !
We hopefully won't change this code very often, but it is nice to know that they will be readily checkable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants