Skip to content

Stream.consumeAsFlow() finally block not getting called #1825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
josephlbarnett opened this issue Feb 22, 2020 · 6 comments
Closed

Stream.consumeAsFlow() finally block not getting called #1825

josephlbarnett opened this issue Feb 22, 2020 · 6 comments
Assignees
Labels

Comments

@josephlbarnett
Copy link

We're using java.util.stream.Stream.consumeAsFlow() to process a stream as a flow.

our usage looked like:

val stream = jooqQuery.fetchLazy().stream().map { /* map result to data class */}
return stream.consumeAsFlow().flowOn(Dispatchers.IO)

Occasionally, it appears that the finally { stream.close() } block is not getting called (possibly on certain cancellation / error conditions?), resulting in connection leaks. However, it appears that if we add an onCompletion {} to the returned flow, that block does get called even when the finally gets skipped:

val stream = jooqQuery.fetchLazy().stream().map { /* map result to data class */}
return stream.consumeAsFlow().flowOn(Dispatchers.IO)
       .onCompletion { stream.close() } // should not need this, as the finally block in the StreamFlow should close it

Have verified this by duplicating the StreamFlow class and adding logging to the finally block, as well as having logging in the .onCompletion block above, and seen a few instances of the completion log firing without the finally log firing, but can't easily reproduce this (usually both fire).

@LouisCAD
Copy link
Contributor

Do you have a self-contained reproducer, or a link to it from play.kotl.in?

@josephlbarnett
Copy link
Author

the collecting code lives here: https://github.com/trib3/leakycauldron/blob/master/graphql/src/main/kotlin/com/trib3/graphql/websocket/GraphQLWebSocketConsumer.kt#L103-L126

Even in our real service, it's hard to reproduce on demand, but a toy example that should demonstrate the same issue lives at https://github.com/trib3/example-cauldron-service/ ( flow gets defined at https://github.com/trib3/example-cauldron-service/blob/master/server/src/main/kotlin/com/trib3/example/server/graphql/Subscription.kt ). Using apollographql's websocket transport to connect and kick off a bunch of subscription queries, then closing those connections might be the best way to try to trigger it.

@qwwdfsad qwwdfsad added the flow label Feb 25, 2020
@qwwdfsad qwwdfsad self-assigned this Feb 25, 2020
@qwwdfsad
Copy link
Collaborator

It's not exactly a Stream.asFlow() issue, but rather flowOn(...).asPublisher().asFlow().
As a workaround, getting rid of asPublisher().asFlow() chain should help.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Feb 25, 2020

Self-contained repro (prints Done [2] and Finished):

@Test
fun testNonAtomicStart() = runBlocking<Unit> {
    try {
        coroutineScope {
            val dispatcher = wrapperDispatcher()
            val job = coroutineContext[Job]!!
            val flow = flowOf(1, 2, 3)
                .onCompletion { println("Done [1]") }
                .flowOn(dispatcher)
                .onCompletion { println("Done [2]") }

            launch {
                flow.collect()
            }
            launch {
                job.cancel()
            }
        }
    } catch (e: Throwable) {
        println("Finished")
    }
}

qwwdfsad added a commit that referenced this issue Feb 25, 2020
…o ensure proper termination, including finally blocks and onCompletion operators

Fixes #1825
@josephlbarnett
Copy link
Author

Interesting. Agree that the .asPublisher().asFlow() is not ideal, so may look into building Flow support for the graphql-java layer to avoid that. In the meantime, the post-flowOn() onCompletion {} block is a workaround that seems to work -- is it fully safe or is there still potential for the issue to manifest?

@qwwdfsad
Copy link
Collaborator

is it fully safe or is there still potential for the issue to manifest?

It is fully safe.

What about asPublisher().asFlow() -- it (by its nature, not by its implementation) breaks the structured concurrency and the order of onCompletion blocks: reactive-streams/reactive-streams-jvm#482 So I would recommend avoiding using this combination at all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants