-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Kotlin 1.3.72 - Race Condition when using FlowAsPublisher with a multi-threaded consumer #2109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Out of curiosity, is the issue also mitigated if instead of using |
@mhernand40 using |
I wrote a stress-test that calls It looks like something else is going on there. Is there any chance that you can create a self-contained code that reproduces this problem? |
I've created the following reproducer: It contains 3 test cases. 2 of them do pass and show that functionally speaking the code seems to be correct. The third test case however, shows the race-condition. |
Update: I've found how to fix this problem. Now figuring out what exactly is wrong, what part of the |
The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes #2109
@elizarov I highly appreciate all the hard work you've done! However, it seems it did not cover the full issue. I've updated the reproducer to kotlinx 1.3.8 to validate the fix. At first glance all tests passed but after a couple of executions they failed again. It seems your fix has mitigated the issue so that the failure rate drops from ~ 17/20 to ~3/20 but it did not fully resolve it. |
Sure. I'll take a lot. I have your reproducer code around to check. |
@marc-christian-schulze I cannot reproduce it in your test framework. I've added a loop so that it repeatedly loops and uploads a file and it works for thousands of iterations like a charm. How do you manage to still reproduce it? Are you sure that you are running version |
@elizarov now you made me curious whether I was really using 1.3.8. o.O I've therefore added a Dockerfile to the reproducer to run the tests in an isolated reproducible environment. It again reports 2 out of 22 failed tests on my machine. Maybe it's harder to reproduce on machines with different number of cores than my machine has? I was running the example on an AMD Phenom II X6 (with 6 physical cores). |
If this the same code or something else? I was trying it under Mac OS on Mac Book. l'll try it on a more powerful machine (albeit running on Windows). Maybe it is something that reproduces only under a Linux scheduler, though. |
Same code, just build and run in the container to make sure Gradle is not pulling in any unwanted dependency from the host machine. |
I run the
I hope this helps you. |
I has something to do with Linux. It reproduces inside docker running on Mac OS for me. I'll check it out. |
Update so far:
I still do not understand what is going on there, though. Somehow the fact that |
I think I discovered the spot in the S3 api code, cf. The underlying netty ctx is wrapped into the above-mentioned class. It seems they were trying to somehow sort the incoming tasks (including the writeAndFlush invocation) but do this based on whether the calling thread belongs to the event loop. Since the unconfined dispatchers runs the first coroutine using the calling thread (the netty io thread), the following writeAndFlush will be executed immediately although there might be items in the queue that should go first. This now looks to me that the S3 api has a bug. |
@marc-christian-schulze Thanks for finding the root cause. To confirm it I've created a standalone reproducer that does not use Kotlin I've also created #2155 that will help to work around these kind of problems. |
The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes Kotlin#2109
I'm facing some weird race condition when wrapping a
Flow
into aPublisher
. Since I was first suspecting the issue on the consumer side (using S3 client of the AWS SDK 2) there is also a ticket in their repo:aws/aws-sdk-java-v2#953
My flow is created like this:
And consumed like this:
However, on the consumer side I can observe that the items of the flow are emitted by multiple threads and even concurrently which results in transmission failure since the
ByteBuffer
s might be written in the wrong order.Delaying the producing flow seems to suppress the race condition and the
ByteBuffer
s are written in the correct order. Also when firstcollect
ing the flow like in the following, it is working:While debugging the S3 client implementation I saw that underlying IO threads (of netty) might invoke
Subscription::request
while another thread is executingSubscriber::onNext
at the same time. Might this be the cause of the misbehavior? Can it somehow happen that under these circumstances multiple threads could run multipleContinuation
s of the same coroutine in-parallel?I would highly appreciate any advice.
The text was updated successfully, but these errors were encountered: