Skip to content

about the coroutine channels distincting in a window #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
SolomonSun2010 opened this issue Mar 28, 2018 · 2 comments
Closed

about the coroutine channels distincting in a window #306

SolomonSun2010 opened this issue Mar 28, 2018 · 2 comments
Labels

Comments

@SolomonSun2010
Copy link

SolomonSun2010 commented Mar 28, 2018

We have a issue about the coroutine channels distincting in a window, please help.

  1. In Android, we usually remove all pending messages to distinct in MessageQueue at sending side:
if (mMainHandler.hasMessages(MSG_A)) {
    mMainHandler.removeMessages(MSG_A)
}
mMainHandler.sendEmptyMessage(MSG_A)

Above code is equivalent to Kotlinx following:

msglist.forEach { a ->
    val rc = msgActor as ReceiveChannel<MsgActorMsg>
    val hasNot = async { rc.none { it == a } }.await()
    println("msgActor has $a ?: ")
    if (hasNot)
        msgActor.offer(a)
}

But it does not work, perhaps deadlock. I think Channel like a iteration-once Stream of items, does not same as iteration-mutiple Queue/Collection, right ?

  1. We have to try distinct at receiving side.
    For instance , there are some items below:
val msgs1 = listOf("A", "B", "A", "B", "A", "B", "A", "B", "A", "B")
val msgs2 = listOf("B", "A", "B", "A", "B", "A", "B", "A", "B", "A")
val msgs3 = listOf("A", "A", "B", "B", "A", "A", "B", "B", "A", "A")

Then we want distinct them with window function(Channel.distinct() just is globally total distinction) in Reactor or RxJava, it works as we expected :

fun testReactor() {
    println("Reactor test **********: ")
    Flux.range(1, 100).window(20).subscribe(System.out::println)
 
    println(" 2rd test **********: ")
    Flux.interval(Duration.ofMillis(100))
            .window(2)
            .flatMap { it }
            .take(5)
            .toStream()
            .forEach(System.out::println)
 
    println(" 3rd test **********: ")
    Flux.fromIterable(msgs1)
//            .delaySubscription(Duration.ofMillis(50))
//            .window(Duration.ofMillis(50))
            .window(5)
            .flatMap { it.distinct() }
            .subscribe(System.out::print)
    println(" = ABBA : msgs1 distincted ")
 
    Flux.interval(Duration.ofMillis(100))
            .zipWithIterable(msgs2) { _, u -> u }
            .window(Duration.ofMillis(600))
//            .window(5)
            .flatMap { it.distinct() }
            .toStream()
            .forEach(System.out::print)
//            .subscribe(System.out::print)
println(" = BAAB : msgs2 distincted ")
}

We use actor to receive the messages, we may try like this:

fun mActor() = actor<String>(capacity = 50) {
  val distincted = channel
.window(5) /** No Such Method  **/
.flatMap { it.distinct() }
 
 for (msg in distincted) {
        handleMessageImpl(msg)
  }
}

But unfortunates, the ReceiveChannel currently have no these window/buffer functions series like in RxJava/Reactor.
And we don’t want add a 3rd library like RxJava/Reactor, they are about 2MB.

I found a windowed function in the Sequence :

public fun <T> Sequence<T>.windowed(size: Int, step: Int = 1, partialWindows: Boolean = false): Sequence<List<T>>

So I test as below:

fun mActor() = actor<String>(capacity = 50) {
    println("Before distinct: $channel")
  val distincted = channel
  .asSequence().windowed(3, 3, true)
            .forEachIndexed { idx, window ->
                println("flat distinct window-${idx} : is empty:${window.isEmpty()} =${window}")
                window.forEachIndexed() { index, item ->
                    println("consume ${index} : ${item}")
                }
            }
}

And the main function is :

fun main(args: Array<String>) =
        runBlocking<Unit> {
            val mActor = mActor()
            println("actor test begin !!")
            msgs.forEach { a ->
//                val hasNot = async { rc.none { it == a } }  
//                println("msgActor has $a ?: ")
//                if (hasNot.await())   // will dead lock when distincting before send
        msgActor.offer(a)
            }
            // send a message to get a counter value from an actor
            val response = CompletableDeferred<MutableList<String>>()
             mActor.send(GetInfos(response))
//            val resp = response.await()  // could wait forever while that actor let CompletableDeferred not being fulfilled/resolved !
//            println("Actor state got , size ${resp.size}: $resp")
//        println("Counter distincted = ${response.await().distinct()}")
//            mActor.close()
            println("actor test end and assertEquals !: $mActor")
            Thread.sleep(3000L) // block main thread for 3 seconds to keep JVM alive
//            assertEquals(msgs.distinct(), resp)
        }

For simply, using :

   .asSequence().windowed(3, 3, true) , 
val msgs = listOf("A", "B", "A", "B")

This is the output:

actor test begin !!
Before distinct: ActorCoroutine{Active}@15eb36c8
actor test end and assertEquals !: ActorCoroutine{Active}@15eb36c8
flat distinct window-0 : is empty:false =[A, B,A]
consume 0 : A
consume 1 : B
consume 2 : A

It seems only get ONE window, but we expect TWO windows like below:

actor test begin !!
Before distinct: ActorCoroutine{Active}@15eb36c8
actor test end and assertEquals !: ActorCoroutine{Active}@15eb36c8
flat distinct window-0 : is empty:false =[A, B,A]
consume 0 : A
consume 1 : B
consume 2 : A
flat distinct window-1 : is empty:false =[B, GetInfos]
consume 0 : B
consume 1 : GetInfos

Thus why just get single window in the sequence ? Is there third way to distinct with window function except Rx lib/Sequence ?

ps: window operators is the core of the new real time framework such as Google Dataflow、Apache Beam 、Apache Flink etc.

@elizarov
Copy link
Contributor

elizarov commented Apr 3, 2018

Can you, please, elaborate on your use-case a little bit. I seem to understand what you are trying to achieve, but I don't quite understand why are you trying to do it.

Anyway, you can write a time-based or size-based window function or you just go ahead and write time-based distinctInTimeWindow operator like it is show in this gist: https://gist.github.com/elizarov/34150213209158d0dce0db62d9c0a20f

@SolomonSun2010
Copy link
Author

SolomonSun2010 commented Apr 8, 2018

OK, window/buffer functions series are so general like in RxJava/Reactor, Google Dataflow、Apache Beam 、Apache Flink etc. We can do many aggregation data operations in window.
Our case is the events processing, requires distinct Massive-pushed, Unbounded, Out-of-Order, different kinds of events [1]. But the builtin distinct function is based on a global HashSet [2] it will distinct totally.
For instance ,

val msgs1 = listOf("A", "B", "A", "B", "A", "B", "A", "B", "A", "B")

builtin distincting will only get the result:
msgs1.distinct() == ("A", "B")

with winodw functions, we get another result:

msgs1.window(timespan=100, TimeUnit.MILLISECONDS, count=5) .flatMap { it.distinct() } == ("A", "B","A", "B")

this prevent upstream over-pushed the downstream processing, we can get constantly stably processing different types of events/messages as per our intention.

[1] https://research.google.com/pubs/pub43864.html
[2] https://github.com/Kotlin/kotlinx.coroutines/blob/master/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt#L1177

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants