Skip to content

Kotlin 1.3.72 - Race Condition when using FlowAsPublisher with a multi-threaded consumer #2109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
marc-christian-schulze opened this issue Jun 27, 2020 · 16 comments
Assignees
Labels

Comments

@marc-christian-schulze
Copy link

marc-christian-schulze commented Jun 27, 2020

I'm facing some weird race condition when wrapping a Flow into a Publisher. Since I was first suspecting the issue on the consumer side (using S3 client of the AWS SDK 2) there is also a ticket in their repo:
aws/aws-sdk-java-v2#953

My flow is created like this:

fun readFromFile(file: Path): Flow<ByteBuffer> = flow {
    val channel = AsynchronousFileChannel.open(file)
    channel.use {
        var filePosition = 0L
        while(true) {
            val buf = ByteBuffer.allocate(4096)
            val bytesRead = it.aRead(buf, filePosition)
            if(bytesRead <= 0)
                break
            filePosition += bytesRead
            buf.flip()
            // the following delay seems to suppress the race-condition
            // delay(10)
            emit(buf)
        }
    }
}

suspend fun AsynchronousFileChannel.aRead(buf: ByteBuffer, position: Long): Int =
        suspendCoroutine { cont ->
            read(buf, position, Unit, object : CompletionHandler<Int, Unit> {
                override fun completed(bytesRead: Int, attachment: Unit) {
                    cont.resume(bytesRead)
                }

                override fun failed(exception: Throwable, attachment: Unit) {
                    cont.resumeWithException(exception)
                }
            })
        }

And consumed like this:

s3client.putObject(PutObjectRequest.builder()
    .bucket(s3config.bucket.get())
    .key("test")
    .contentLength(inputFile.length())
    .contentType(MediaType.APPLICATION_OCTET_STREAM)
    .build(),
    AsyncRequestBody.fromPublisher(readFromFile(inputFile.toPath()).asPublisher()))

However, on the consumer side I can observe that the items of the flow are emitted by multiple threads and even concurrently which results in transmission failure since the ByteBuffers might be written in the wrong order.

Delaying the producing flow seems to suppress the race condition and the ByteBuffers are written in the correct order. Also when first collecting the flow like in the following, it is working:

val chunks = ArrayList<ByteBuffer>()
val c = readFromFile(inputFile.toPath())
c.collect {
  chunks.add(it)
}
... and then creating a Publisher out of it
AsyncRequestBody.fromPublisher(Flowable.fromIterable(chunks))

While debugging the S3 client implementation I saw that underlying IO threads (of netty) might invoke Subscription::request while another thread is executing Subscriber::onNext at the same time. Might this be the cause of the misbehavior? Can it somehow happen that under these circumstances multiple threads could run multiple Continuations of the same coroutine in-parallel?

I would highly appreciate any advice.

@mhernand40
Copy link

Out of curiosity, is the issue also mitigated if instead of using delay(...), you use yield()?

@marc-christian-schulze
Copy link
Author

@mhernand40 using Thread.yield() and kotlinx.coroutines.yield() instead of delay() does not mitigate the issue. The same race condition will still be observable.

@elizarov elizarov self-assigned this Jul 3, 2020
@elizarov elizarov added the flow label Jul 3, 2020
@elizarov
Copy link
Contributor

elizarov commented Jul 3, 2020

I wrote a stress-test that calls Subscription.request concurrently with Subscriber.onNext and check whether it can lead to any kind of reordering in received elements, but it fails to reproduce this issue. I even tried to send multiple requests concurrently. For the record, here's the code: https://gist.github.com/elizarov/63f6d6eb8542e052cc74091d1faab367

It looks like something else is going on there. Is there any chance that you can create a self-contained code that reproduces this problem?

@marc-christian-schulze
Copy link
Author

I've created the following reproducer:
https://github.com/marc-christian-schulze/kotlinx-reproducer-2109

It contains 3 test cases. 2 of them do pass and show that functionally speaking the code seems to be correct. The third test case however, shows the race-condition.

@elizarov
Copy link
Contributor

elizarov commented Jul 7, 2020

Update: I've found how to fix this problem. Now figuring out what exactly is wrong, what part of the Publisher contract is violated (if any) and how to test for it.

elizarov added a commit that referenced this issue Jul 7, 2020
The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too

Fixes #2109
@elizarov elizarov mentioned this issue Jul 16, 2020
1 task
marc-christian-schulze added a commit to marc-christian-schulze/kotlinx-reproducer-2109 that referenced this issue Jul 17, 2020
@marc-christian-schulze
Copy link
Author

@elizarov I highly appreciate all the hard work you've done! However, it seems it did not cover the full issue. I've updated the reproducer to kotlinx 1.3.8 to validate the fix. At first glance all tests passed but after a couple of executions they failed again. It seems your fix has mitigated the issue so that the failure rate drops from ~ 17/20 to ~3/20 but it did not fully resolve it.
Could you please have a second look?

@elizarov
Copy link
Contributor

Sure. I'll take a lot. I have your reproducer code around to check.

@elizarov
Copy link
Contributor

@marc-christian-schulze I cannot reproduce it in your test framework. I've added a loop so that it repeatedly loops and uploads a file and it works for thousands of iterations like a charm. How do you manage to still reproduce it? Are you sure that you are running version 1.3.8?

@marc-christian-schulze
Copy link
Author

@elizarov now you made me curious whether I was really using 1.3.8. o.O I've therefore added a Dockerfile to the reproducer to run the tests in an isolated reproducible environment. It again reports 2 out of 22 failed tests on my machine. Maybe it's harder to reproduce on machines with different number of cores than my machine has? I was running the example on an AMD Phenom II X6 (with 6 physical cores).

@elizarov
Copy link
Contributor

elizarov commented Jul 17, 2020

If this the same code or something else? I was trying it under Mac OS on Mac Book. l'll try it on a more powerful machine (albeit running on Windows). Maybe it is something that reproduces only under a Linux scheduler, though.

@marc-christian-schulze
Copy link
Author

Same code, just build and run in the container to make sure Gradle is not pulling in any unwanted dependency from the host machine.

@fvasco
Copy link
Contributor

fvasco commented Jul 17, 2020

I run the 2a3ccda90221677f3038018c75c91aadff51d241 @ github.com:marc-christian-schulze/kotlinx-reproducer-2109.git, I got on AMD A8-3870:

Starting a Gradle Daemon (subsequent builds will be faster)
> Task :clean UP-TO-DATE
> Task :compileKotlin
> Task :compileJava NO-SOURCE
> Task :processResources
> Task :classes
> Task :inspectClassesForKotlinIC
> Task :jar
> Task :compileTestKotlin
> Task :compileTestJava NO-SOURCE
> Task :processTestResources NO-SOURCE
> Task :testClasses UP-TO-DATE
> Task :test

ExampleResourceTest > repetition 5 of 20 FAILED
    java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
        Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98

ExampleResourceTest > repetition 7 of 20 FAILED
    java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
        Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98

ExampleResourceTest > repetition 11 of 20 FAILED
    java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
        Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98

ExampleResourceTest > repetition 20 of 20 FAILED
    java.util.concurrent.CompletionException at CompletableFutureUtils.java:60
        Caused by: software.amazon.awssdk.core.exception.SdkClientException at SdkClientException.java:98

22 tests completed, 4 failed

> Task :test FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':test'.
> There were failing tests. See the report at: file:///workspace/build/reports/tests/test/index.html

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 3m 12s

I hope this helps you.

@elizarov
Copy link
Contributor

I has something to do with Linux. It reproduces inside docker running on Mac OS for me. I'll check it out.

@elizarov elizarov reopened this Jul 20, 2020
@elizarov
Copy link
Contributor

elizarov commented Jul 20, 2020

Update so far:

  1. I've found a way to consistently reproduce it under Mac OS, by placing .flowOn(Dispatchers.Default) between readFromFile(....) and .asPublisher()
  2. I've found a workaround by using Dispatchers.Default in implementation of asPublisher instead of Dispatchers.Unconfined.

I still do not understand what is going on there, though. Somehow the fact that asFlow internally uses an Unconfined dispatcher causes problem somewhere in the bowels of S3 API code. I see that onNext class jumps from being called from async file threads (named Thread-XX) to being called directly from netty threads (named aws-java-sdk-NettyEventLoop-0-X), yet I cannot identify any Publisher contract violation that could cause any problem to S3 API (all onNext are called sequentially, they are never called before the corresponding request comes). On the other hand, so far I've failed to write a self-contained reproducer that would show that there is some problem with S3 API (it works fine where either all onNext calls are done directly from async file threads or when they are all done from netty threads).

@marc-christian-schulze
Copy link
Author

I think I discovered the spot in the S3 api code, cf.
OrderedWriteChannelHandlerContext

The underlying netty ctx is wrapped into the above-mentioned class.

It seems they were trying to somehow sort the incoming tasks (including the writeAndFlush invocation) but do this based on whether the calling thread belongs to the event loop. Since the unconfined dispatchers runs the first coroutine using the calling thread (the netty io thread), the following writeAndFlush will be executed immediately although there might be items in the queue that should go first.

This now looks to me that the S3 api has a bug.

@elizarov
Copy link
Contributor

@marc-christian-schulze Thanks for finding the root cause. To confirm it I've created a standalone reproducer that does not use Kotlin Flow at all -- it is a slightly intricate implementation of file-reading publisher. It is a fully compliant Publisher implementation that reads one more block that that is being requested to, but waits until request is called to give it back to the subscriber, thus mixing calls from netty and non-netty threads and causing the same problem: marc-christian-schulze/kotlinx-reproducer-2109#1

I've also created #2155 that will help to work around these kind of problems.

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too

Fixes Kotlin#2109
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants