You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We have a issue about the coroutine channels distincting in a window, please help.
In Android, we usually remove all pending messages to distinct in MessageQueue at sending side:
if (mMainHandler.hasMessages(MSG_A)) {
mMainHandler.removeMessages(MSG_A)
}
mMainHandler.sendEmptyMessage(MSG_A)
Above code is equivalent to Kotlinx following:
msglist.forEach { a ->val rc = msgActor asReceiveChannel<MsgActorMsg>
val hasNot = async { rc.none { it == a } }.await()
println("msgActor has $a ?: ")
if (hasNot)
msgActor.offer(a)
}
But it does not work, perhaps deadlock. I think Channel like a iteration-once Stream of items, does not same as iteration-mutiple Queue/Collection, right ?
We have to try distinct at receiving side.
For instance , there are some items below:
We use actor to receive the messages, we may try like this:
funmActor() = actor<String>(capacity =50) {
val distincted = channel
.window(5) /** No Such Method **/
.flatMap { it.distinct() }
for (msg in distincted) {
handleMessageImpl(msg)
}
}
But unfortunates, the ReceiveChannel currently have no these window/buffer functions series like in RxJava/Reactor.
And we don’t want add a 3rd library like RxJava/Reactor, they are about 2MB.
funmain(args:Array<String>) =
runBlocking<Unit> {
val mActor = mActor()
println("actor test begin !!")
msgs.forEach { a ->// val hasNot = async { rc.none { it == a } } // println("msgActor has $a ?: ")// if (hasNot.await()) // will dead lock when distincting before send
msgActor.offer(a)
}
// send a message to get a counter value from an actorval response =CompletableDeferred<MutableList<String>>()
mActor.send(GetInfos(response))
// val resp = response.await() // could wait forever while that actor let CompletableDeferred not being fulfilled/resolved !// println("Actor state got , size ${resp.size}: $resp")// println("Counter distincted = ${response.await().distinct()}")// mActor.close()println("actor test end and assertEquals !: $mActor")
Thread.sleep(3000L) // block main thread for 3 seconds to keep JVM alive// assertEquals(msgs.distinct(), resp)
}
For simply, using :
.asSequence().windowed(3, 3, true) ,
val msgs =listOf("A", "B", "A", "B")
This is the output:
actor test begin !!
Before distinct: ActorCoroutine{Active}@15eb36c8
actor test end and assertEquals !: ActorCoroutine{Active}@15eb36c8
flat distinct window-0 : is empty:false =[A, B,A]
consume 0 : A
consume 1 : B
consume 2 : A
It seems only get ONE window, but we expect TWO windows like below:
actor test begin !!
Before distinct: ActorCoroutine{Active}@15eb36c8
actor test end and assertEquals !: ActorCoroutine{Active}@15eb36c8
flat distinct window-0 : is empty:false =[A, B,A]
consume 0 : A
consume 1 : B
consume 2 : A flat distinct window-1 : is empty:false =[B, GetInfos] consume 0 : B consume 1 : GetInfos
Thus why just get single window in the sequence ? Is there third way to distinct with window function except Rx lib/Sequence ?
ps: window operators is the core of the new real time framework such as Google Dataflow、Apache Beam 、Apache Flink etc.
The text was updated successfully, but these errors were encountered:
Can you, please, elaborate on your use-case a little bit. I seem to understand what you are trying to achieve, but I don't quite understand why are you trying to do it.
OK, window/buffer functions series are so general like in RxJava/Reactor, Google Dataflow、Apache Beam 、Apache Flink etc. We can do many aggregation data operations in window.
Our case is the events processing, requires distinct Massive-pushed, Unbounded, Out-of-Order, different kinds of events [1]. But the builtin distinct function is based on a global HashSet [2] it will distinct totally.
For instance ,
this prevent upstream over-pushed the downstream processing, we can get constantly stably processing different types of events/messages as per our intention.
We have a issue about the coroutine channels distincting in a window, please help.
Above code is equivalent to Kotlinx following:
But it does not work, perhaps deadlock. I think Channel like a iteration-once Stream of items, does not same as iteration-mutiple Queue/Collection, right ?
For instance , there are some items below:
Then we want distinct them with window function(Channel.distinct() just is globally total distinction) in Reactor or RxJava, it works as we expected :
We use actor to receive the messages, we may try like this:
But unfortunates, the ReceiveChannel currently have no these window/buffer functions series like in RxJava/Reactor.
And we don’t want add a 3rd library like RxJava/Reactor, they are about 2MB.
I found a windowed function in the Sequence :
So I test as below:
And the main function is :
For simply, using :
This is the output:
actor test begin !!
Before distinct: ActorCoroutine{Active}@15eb36c8
actor test end and assertEquals !: ActorCoroutine{Active}@15eb36c8
flat distinct window-0 : is empty:false =[A, B,A]
consume 0 : A
consume 1 : B
consume 2 : A
It seems only get ONE window, but we expect TWO windows like below:
actor test begin !!
Before distinct: ActorCoroutine{Active}@15eb36c8
actor test end and assertEquals !: ActorCoroutine{Active}@15eb36c8
flat distinct window-0 : is empty:false =[A, B,A]
consume 0 : A
consume 1 : B
consume 2 : A
flat distinct window-1 : is empty:false =[B, GetInfos]
consume 0 : B
consume 1 : GetInfos
Thus why just get single window in the sequence ? Is there third way to distinct with window function except Rx lib/Sequence ?
ps: window operators is the core of the new real time framework such as Google Dataflow、Apache Beam 、Apache Flink etc.
The text was updated successfully, but these errors were encountered: