Skip to content

#1044 follow up for CompletableDeferred #1092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Tolriq opened this issue Apr 13, 2019 · 21 comments
Closed

#1044 follow up for CompletableDeferred #1092

Tolriq opened this issue Apr 13, 2019 · 21 comments
Labels

Comments

@Tolriq
Copy link

Tolriq commented Apr 13, 2019

I got one more case with Closable resources :(

Take the following example of a typical worker pool.
If the task return a Closable, there's a possibility that the await does not return but that task is completed, leading to leaked resources.

Same issue as before but I suppose would also require something for the resume of CompletableDeferred or maybe I'm missing a better way to achieve that?

cc @elizarov

class TaskManager : CoroutineScope {
            private const val MAX_TASKS = 4
            internal class Task<R>(val request: suspend () -> R) {
                val response = CompletableDeferred<R>()
            }
            internal val tasks = Channel<Task<*>>(Channel.UNLIMITED)

            override val coroutineContext = Dispatchers.IO + SupervisorJob()

            private suspend fun <R> executeTask(task: Task<R>) {
                try {
                    task.response.complete(task.request())
                } catch (e: Throwable) {
                    task.response.completeExceptionally(e)
                }
            }

            init {
                repeat(MAX_TASKS) {
                    launch {
                        for (task in tasks) executeTask(task)
                    }
                }
            }
            suspend fun <R> execute(request: suspend () -> R): R {
                val task = Task(request)
                tasks.send(task)
                return task.response.await()
            }
        }
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Apr 17, 2019

This one works on a higher level of abstraction, so, for example, it can be fixed in the following way:

try {
 response.await()
} finally {
  if (response.isCompleted) response.getCompleted().close()
}

I am afraid we can't propose a better solution because Deferred (as opposed to CancellableCoroutine) is too generic, we don't know whether deferred will be awaited once or not, we don't know who owns the result, whether it should be closed or not etc.

To make it a less painful you can extract the extension like suspend fun <T: Closeable> Deferred<T>.awaitOrClose() = ...

@Tolriq
Copy link
Author

Tolriq commented Apr 17, 2019

@qwwdfsad Thanks for the hint did not thought that I could catch the cancellation exception on the await.

Since I do not want to close all the time in a finally as it's executed all the time and I actually need the resource when not cancelled, I'm forced to do as below to not introduce nullability.

            suspend fun <R> execute(request: suspend () -> R): R {
                val task = Task(request)
                tasks.send(task)
                return try {
                    task.response.await()
                } catch (e: CancellationException) {
                    if (task.response.isCompleted) {
                        task.response.getCompleted().close()
                    }
                    throw e
                }
            }

A little ugly but seems to work. Don't know if an awaitOrExecute default extension have it's place in the default coroutine for that kind of things?

@qwwdfsad
Copy link
Collaborator

Yes, catching CancellationException for cancellation only is fine.

Don't know if an awaitOrExecute default extension have it's place in the default coroutine for that kind of things?

I am afraid that it is not the best fit (though I will give it some thought) for the core library.
For example, it is hard to name it properly. The long name awaitOrInvokeOnCancellation {} is really ugly, the short name should reflect the unambiguous intent of the call and it is not clear how to name it.

Another concern is that now it implies implicit ownership that is prone to data-races: what if two threads call awaitOrExecute{ it.close() } on the same deferred?
Or, what would happen if someone will write code like this:

val task = ...
yield() // <- cancelled here
task.awaitOrExecute { it.close() }

it is a memory leak which depends on timing now as no one is longer responsible for closing this resource.

If we are going to introduce ownership mechanism, it should be robust (otherwise it is not applicable for general use), simple and it should be possible to "own" the exceptional result as well, so every exception will be delivered.

@Tolriq
Copy link
Author

Tolriq commented Apr 17, 2019

Thanks for the details, I did not thought about multiple await on the same deferred :(

Just to be sure: given the new

suspend fun <R> execute(request: suspend () -> R): R {
                val task = Task(request)
                tasks.send(task)
                return try {
                    task.response.await()
                } catch (e: CancellationException) {
                    if (task.response.isCompleted) {
                        task.response.getCompleted().close()
                    }
                    throw e
                }
            }

a call like val X = execute(...) is now certain to get the closable or have the close() called and that the calling function won't be checked for cancellation until either I do it manually or call another suspending function?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Apr 18, 2019

Yes if you don't have suspending paths in tasks.send(task).
If your send looks like this:

suspend fun send(task) {
    submitTask(task) 
    yield() // or any other suspendable call right after the request was fired, but before 'await' call
}

then you have to move send into try block as well

@Tolriq
Copy link
Author

Tolriq commented Apr 18, 2019

Then I'm lost I still have some rare leaks :(

Given https://gist.github.com/Tolriq/d3df040e878da0ca8aa048bdb2b19855

And simple call

launch {
    query.executeAsync().close()
}

Still triggers strict mode leaks, meaning there's still one case of cancellation I do not handle

@qwwdfsad
Copy link
Collaborator

I can try to help if you will provide any kind of stable reproducer;
But most likely it is a bug in application-specific logic

@Tolriq
Copy link
Author

Tolriq commented Apr 24, 2019

@qwwdfsad It's basically all the code in previous post.

When looking at the source of the leaks I tried to builds tests to reproduce and was unable to find a proper way to tests for leaks outside of strict mode. But can't make strict mode work in unit tests :(
Maybe you know something that could help?

It can be an application specific logic but it's tied to coroutines and some cancellation logic that I miss :( As I did not have issues with simpler threading solutions before and I'm sure to close resources at the correct points if they are reached.

During analysis of this leak I found 1 leak in OkHttp, #1044 and this one, all are now fixed, but I fear this is the same kind of thing as this one.

@qwwdfsad
Copy link
Collaborator

The gist is not enough because it is no self-contained, so I can't help you with that.

Is this problem reproducible with 1.2.1? We've fixed some non-trivial data-race in #1127, it may be the cause

@Tolriq
Copy link
Author

Tolriq commented May 14, 2019

Still happens in 1.2.1 but a little harder to get than before.

The gist is complete except the creation of the closable that is application specific but anyway as I said I'm more than willing to build tests and repro for that but I need some help about how to do test that chase leaks :(
Would need some example about how to track leaks in a way that can help find the issue to start with then I can build the rest. My testing skills are limited to basic stuff :(

@elizarov
Copy link
Contributor

Any chance for a reproducer? Even if it is not small. Can you, maybe, open source some example Android app that would be leaking sometimes?

@Tolriq
Copy link
Author

Tolriq commented May 14, 2019

App is very large and closed source, so thought that time would be better spent writing the tests.

But if you can't give a startup test for that, then I can try to build a small app to reproduce and detect via Android Strict mode.

@qwwdfsad
Copy link
Collaborator

Yes, it definitely would help us to troubleshoot your issue

@Tolriq
Copy link
Author

Tolriq commented May 22, 2019

@elizarov @qwwdfsad

Sorry for the delay real life is a b... and I tried to do a small repro.

Here it is https://github.com/Tolriq/coroutines-closeable based on OkHttp / MockWebServer

Start the app and press the FAB, might require quite a few tries or delay adjustments (on a physical device Nokia 8 Android 9 takes less than 5 attempts).

The code is ugly so don't spam the FAB Android won't digest all the objects creations :)

At some point the app will crash from Strict Mode with log:

2019-05-22 10:55:24.109 27919-28005/org.debug.closable D/StrictMode: StrictMode policy violation: android.os.strictmode.LeakedClosableViolation: A resource was acquired at attached stack trace but never released. See java.io.Closeable for information on avoiding resource leaks.
        at android.os.StrictMode$AndroidCloseGuardReporter.report(StrictMode.java:1786)
        at dalvik.system.CloseGuard.warnIfOpen(CloseGuard.java:264)
        at java.lang.reflect.Method.invoke(Native Method)
        at okhttp3.internal.platform.AndroidPlatform$CloseGuard.warnIfOpen(AndroidPlatform.java:326)
        at okhttp3.internal.platform.AndroidPlatform.logCloseableLeak(AndroidPlatform.java:161)
        at okhttp3.internal.connection.RealConnectionPool.pruneAndGetAllocationCount(RealConnectionPool.java:236)
        at okhttp3.internal.connection.RealConnectionPool.cleanup(RealConnectionPool.java:177)
        at okhttp3.internal.connection.RealConnectionPool.lambda$new$0$RealConnectionPool(RealConnectionPool.java:55)
        at okhttp3.internal.connection.-$$Lambda$RealConnectionPool$Cy61BJKpsrwSB_hQmB_R9MngVNU.run(Unknown Source:2)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:764)
     Caused by: java.lang.Throwable: Explicit termination method 'response.body().close()' not called
        at dalvik.system.CloseGuard.open(CloseGuard.java:221)
        at java.lang.reflect.Method.invoke(Native Method)
        at okhttp3.internal.platform.AndroidPlatform$CloseGuard.createAndOpen(AndroidPlatform.java:314)
        at okhttp3.internal.platform.AndroidPlatform.getStackTraceForCloseable(AndroidPlatform.java:157)
        at okhttp3.internal.connection.Transmitter.callStart(Transmitter.java:116)
        at okhttp3.RealCall.enqueue(RealCall.java:92)
        at org.debug.closable.OkHttpWorkerPool.await(OkHttpWorkerPool.kt:93)
        at org.debug.closable.OkHttpWorkerPool.executeTask(OkHttpWorkerPool.kt:81)
        at org.debug.closable.OkHttpWorkerPool$startWorkers$$inlined$repeat$lambda$1.invokeSuspend(OkHttpWorkerPool.kt:42)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:238)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:742)
2019-05-22 10:55:24.109 27919-28005/? W/System.err: StrictMode VmPolicy violation with POLICY_DEATH; shutting down.

@Tolriq
Copy link
Author

Tolriq commented May 24, 2019

Are you able to reproduce or do you want that I try to find something else?

@Tolriq
Copy link
Author

Tolriq commented Jun 14, 2019

Another small bump in case it's 1.3 material and to have the "waiting for clarification" tag removed :)

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Aug 6, 2019

It's a semantic bug in your sample project.

Let's take a look at execute function:

suspend fun execute(request: Request, timeout: Long = -1L): WorkerResult {
    val task = Task(request, timeout)
    tasks.send(task)
    return try {
        task.response.await()
    } catch (e: CancellationException) {
        if (task.response.isCompleted) {
            try {
                (task.response.getCompleted() as? WorkerResult.Success)?.response?.close()
            } catch (ignore: Throwable) {
                // Ignore
            }
        }
        throw e
    }
}

By send you trigger a computation (okhhtp request), and in catch you are trying to release the response.

But consider the following scenario:

  1. execute sent a request, now awaits the result
  2. executeTask fun completed the result asynchronously and is about to invoke task.response.complete(response) (but not yet!)
  3. Cancellation is triggered.
  4. At this moment, await throws CancellationException, but deferred is not yet completed, so execute function just finishes
  5. executeTask completes the deferred with a response and completes. Now there is no one who can cleanup the resource.

The simplest fix is to bind lifecycles of deferred and its owner: https://gist.github.com/qwwdfsad/e92bef89b1a8311576f1c38a9f90cc61

@Tolriq
Copy link
Author

Tolriq commented Aug 6, 2019

@qwwdfsad Thanks a lot for the insights and details. All makes sense even if most samples about worker pool does not take this in account.

Can you just explain the change:

-            coroutineContext.cancel()
+            cancel()

I'm not sure to grasp what this change brings or it's impact.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Aug 6, 2019

This change is just a cosmetic. CoroutineScope.cancel and CoroutineConext.cancel are basically aliases

@Tolriq
Copy link
Author

Tolriq commented Aug 7, 2019

Thanks, that's what I thought but you made me doubt about lower level impacts.

I confirm that this fixes my issue in production app too on the few resources workerpool patterns I had, thanks a lot.

A sample / doc about closable worker pool would be nice as I would have never found the cause without your help. It's really hard to grasp all the internal places where cancellation can occurs and their impacts.

@Tolriq Tolriq closed this as completed Aug 7, 2019
@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Aug 7, 2019

Thanks for the feedback,
I think we will provide a robust primitive in #172 or #1097

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants