Skip to content

SharedFlow ClassCastException cannot be cast to SharedFlowImpl$Emitter #2356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vehovsky opened this issue Nov 1, 2020 · 3 comments
Closed

Comments

@vehovsky
Copy link

vehovsky commented Nov 1, 2020

Tried migrating from BroadcastChannels to SharedFlow, but suffering this ClassCastException. I did not manage to create small reproducible example. Let me know how I can help you investigate further.

My emitter:

private val _recordAddedFlow: MutableSharedFlow<RecordAdded> = MutableSharedFlow(1)
val recordAddedFlow: SharedFlow<RecordAdded> = _recordAddedFlow.asSharedFlow()

GlobalScope.launch {
    ...
    _recordAddedFlow.emit(RecordAdded(...))
    ....
}

Collector side:

recordAddedFlow
            .dropWhile {
                ...
            }
            .take(1)
            .collectWithTimeout("error message", timeoutInMilliseconds)
suspend fun <T> Flow<T>.collectWithTimeout(message: String, timeoutInMilliseconds: Long) {
    try {
        withTimeout(timeout) {
            collect()
        }
    } catch (e: TimeoutCancellationException) {
        throw RuntimeException("Timeout of ${timeoutInMilliseconds / 1000}s elapsed. $message")
    }
}

Stack trace:

java.lang.ClassCastException: com.....PublishListener$RecordAdded cannot be cast to kotlinx.coroutines.flow.SharedFlowImpl$Emitter
	at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:490)
	at kotlinx.coroutines.flow.SharedFlowSlot.freeLocked(SharedFlow.kt:238)
	at kotlinx.coroutines.flow.SharedFlowSlot.freeLocked(SharedFlow.kt:220)
	at kotlinx.coroutines.flow.internal.AbstractSharedFlow.freeSlot(AbstractSharedFlow.kt:83)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect(SharedFlow.kt:318)
	at kotlinx.coroutines.flow.ReadonlySharedFlow.collect(Share.kt)
	at kotlinx.coroutines.flow.FlowKt__LimitKt$dropWhile$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:115)
	at kotlinx.coroutines.flow.FlowKt__LimitKt$take$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:116)
	at kotlinx.coroutines.flow.FlowKt__CollectKt.collect(Collect.kt:30)
	at kotlinx.coroutines.flow.FlowKt.collect(Unknown Source)
	at com.....FlowExtensionsKt$collectWithTimeout$2.invokeSuspend(FlowExtensions.kt:13)
	at com.....FlowExtensionsKt$collectWithTimeout$2.invoke(FlowExtensions.kt)
	at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturnIgnoreTimeout(Undispatched.kt:102)
	at kotlinx.coroutines.TimeoutKt.setupTimeout(Timeout.kt:148)
	at kotlinx.coroutines.TimeoutKt.withTimeout(Timeout.kt:44)
	at com.....FlowExtensionsKt.collectWithTimeout(FlowExtensions.kt:12)
	at com....Xyz.waitForRecordToBePublished(Xyz.kt:413)

Screenshot 2020-11-01 at 23 12 05

@elizarov
Copy link
Contributor

elizarov commented Nov 2, 2020

Thanks. I am able to reproduce some problem (albeit not this particular exception) with a similar setup. Btw, how many concurrent collectors do you have? I don't see this from the picture.

@vehovsky
Copy link
Author

vehovsky commented Nov 2, 2020

Somewhere between 500-1500 collectors.

@vehovsky
Copy link
Author

vehovsky commented Nov 2, 2020

Maybe not that many after all. Seem single one at the moment the exception was thrown. Before max was 22 (Using onSubscription to track the collectors)

Screenshot 2020-11-02 at 11 41 12

elizarov added a commit that referenced this issue Nov 2, 2020
… subscriber

* Added a specific test for a problematic scenario.
* Added stress test with concurrent emitters and subscribers that come and go.

Fixes #2356
elizarov added a commit that referenced this issue Nov 2, 2020
… subscriber

* Added a specific test for a problematic scenario.
* Added stress test with concurrent emitters and subscribers that come and go.

Fixes #2356
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants