-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[Critical] All Channel implementations leak every passed object #326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
What type of objects are you seen in memory? for (i in 0 until 50_000_000) {
async(DefaultDispatcher) { [Critical] There is a valid use case for creating a 50_000_000 coroutines?
[Critical] This is a blocking call on non-blocking dispatcher |
Channels don't leak all passed objects. In your example, you create Then you consume from this channel using one consumer. Moreover, this consumer invokes relatively slow operations (writing to stdout). As a consequence, your consumer just can't keep up with producers and all sent elements are buffered in the unlimited channel. Solutions:
In general, it's not a channel issue, if you replace channel with, for example, |
Channels do leak all passed objects in memory in some way. You can add No, there is no problem with LinkedBlockingQueue or raw RxJava. My real use case has a much slower data producer. I'm processing about 56_000_000 of double collections -- it's all slow calculation results. There is no problem with speed of consuming. I'm waiting for You can pass little objects instead of big collections and still have a memory leak. It's just slower to get OOM. Try this (you'll get OOM exception): import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
fun main(args: Array<String>) {
val channel: Channel<ArrayList<Double>> = RendezvousChannel()
launch {
val I1 = 50_000_000
val I2 = 5_000
for (i in 0 until I1) {
async(DefaultDispatcher) {
Thread.sleep(50)
val numbers = ArrayList<Double>()
for (k in 1..I2)
numbers.add(k * 1.0)
channel.send(numbers)
}
}
}
runBlocking {
//val counter = AtomicInteger()
channel.consume {
for (element in this) {
if (element.isEmpty())
throw IllegalStateException()
}
}
}
} |
Can you share the alternative implementations? |
Please provide rx sample, you're probably using operators with backpressure mechanism. I've checked, OOM with unlimited If you'll use What you need is backpressure mechanism. E.g. you you'll rewrite your sample this way:
On my machine, it never exceeds 150 mb to process ~2kk of tasks. But in general this approach is not intuitive, what will be really helpful is working pool pattern #172, so your example could be hypothetically rewritten as something like
|
@qwwdfsad yeah seems that better backpressure control can reduce memory usage tremendously. And also I left too many simultaneous tasks for this reproduce example and OOM was definitely from the number of created coroutines, not getting to OOM from the sent data. My bad. Sorry for bothering with a false alarm. With the working pool pattern, it would be much more usable and intuitive. |
Last two days I tried to avoid huge memory leak in
kotlinx.coroutines.experimental.channels.Channel
.All I found is that
OutOfMemoryError
is unavoidable at this moment (correct me if I'm wrong, please).Every
Channel
implementations leak every object passed through it. This makes them impossible for real use.Example to quickly reproduce memory leak (run it with
-Xmx256m
, for example):The text was updated successfully, but these errors were encountered: