Skip to content

Commit 1dbc25e

Browse files
qwwdfsadelizarov
authored andcommitted
Implement DelayChannel
Fixes #327
1 parent 1cbe8f0 commit 1dbc25e

File tree

6 files changed

+392
-1
lines changed

6 files changed

+392
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package kotlinx.coroutines.experimental.channels
2+
3+
import kotlinx.coroutines.experimental.*
4+
import kotlinx.coroutines.experimental.timeunit.*
5+
6+
7+
/**
8+
* Creates rendezvous channel which emits the first item after the given initial delay and subsequent items with the
9+
* given delay between emissions. Backpressure is guaranteed by [RendezvousChannel], but no upper bound on actual delay is guaranteed.
10+
* If the consumer of this channel cannot keep up with given delay
11+
* and spends more than [delay] between subsequent invocations to [ReceiveChannel.receive] then rate and a maximum delay
12+
* of outcoming events will be limited by the consumer.
13+
*
14+
* This channel stops emission immediately after [ReceiveChannel.cancel] invocation.
15+
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
16+
*
17+
* @param initialDelay delay after which the first item will be emitted
18+
* @param delay delay between
19+
* @param unit unit of time that applies to [initialDelay] and [delay]
20+
*/
21+
public fun DelayChannel(
22+
delay: Long,
23+
unit: TimeUnit = TimeUnit.MILLISECONDS,
24+
initialDelay: Long = 0
25+
): ReceiveChannel<Unit> {
26+
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
27+
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
28+
29+
val result = RendezvousChannel<Unit>()
30+
launch(start = CoroutineStart.UNDISPATCHED) {
31+
delay(initialDelay, unit)
32+
while (true) {
33+
val sendTime = timeSource.nanoTime()
34+
result.send(Unit)
35+
val queueTime = timeSource.nanoTime() - sendTime
36+
val nextDelay = (unit.toNanos(delay) - queueTime).coerceAtLeast(0L)
37+
delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
38+
}
39+
}
40+
41+
return result
42+
}
43+
44+
/**
45+
* Creates rendezvous channel which emits the first item after the given initial delay and subsequent items with the
46+
* given delay after consumption of previously emitted item. Backpressure is guaranteed by [RendezvousChannel] machinery.
47+
* This channel stops emitting items immediately after [ReceiveChannel.cancel] invocation.
48+
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
49+
*
50+
* @param initialDelay delay after which the first item will be emitted
51+
* @param delay delay between
52+
* @param unit unit of time that applies to [initialDelay] and [delay]
53+
*/
54+
public fun FixedDelayChannel(
55+
delay: Long,
56+
unit: TimeUnit = TimeUnit.MILLISECONDS,
57+
initialDelay: Long = 0
58+
): ReceiveChannel<Unit> {
59+
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
60+
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
61+
62+
val result = RendezvousChannel<Unit>()
63+
launch(context = Unconfined) {
64+
delay(initialDelay, unit)
65+
while (true) {
66+
result.send(Unit)
67+
delay(delay, unit)
68+
}
69+
}
70+
71+
return result
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
18+
package guide.channel.example10
19+
20+
import kotlinx.coroutines.experimental.*
21+
import kotlinx.coroutines.experimental.channels.*
22+
import kotlin.coroutines.experimental.*
23+
24+
fun main(args: Array<String>) = runBlocking<Unit> {
25+
val delayChannel = DelayChannel(delay = 100, initialDelay = 0) // create delay channel
26+
var nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
27+
println("Initial element is available immediately: $nextElement") // Initial delay haven't passed yet
28+
29+
nextElement = withTimeoutOrNull(50) { delayChannel.receive() } // All subsequent elements has 100ms delay
30+
println("Next element is not ready in 50 ms: $nextElement")
31+
32+
nextElement = withTimeoutOrNull(51) { delayChannel.receive() }
33+
println("Next element is ready in 100 ms: $nextElement")
34+
35+
// Emulate large consumption delays
36+
println("Consumer pause in 150ms")
37+
delay(150)
38+
// Next element is available immediately
39+
nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
40+
println("Next element is available immediately after large consumer delay: $nextElement")
41+
// Note that the pause between `receive` calls is taken into account and next element arrives faster
42+
nextElement = withTimeoutOrNull(60) { delayChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
43+
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
44+
45+
delayChannel.cancel() // indicate that no more elements are needed
46+
}

core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt

+12
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,18 @@ class GuideTest {
371371
)
372372
}
373373

374+
@Test
375+
fun testGuideChannelExample10() {
376+
test("GuideChannelExample10") { guide.channel.example10.main(emptyArray()) }.verifyLines(
377+
"Initial element is available immediately: kotlin.Unit",
378+
"Next element is not ready in 50 ms: null",
379+
"Next element is ready in 100 ms: kotlin.Unit",
380+
"Consumer pause in 150ms",
381+
"Next element is available immediately after large consumer delay: kotlin.Unit",
382+
"Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit"
383+
)
384+
}
385+
374386
@Test
375387
fun testGuideSyncExample01() {
376388
test("GuideSyncExample01") { guide.sync.example01.main(emptyArray()) }.verifyLinesStart(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package kotlinx.coroutines.experimental.channels
2+
3+
import kotlinx.coroutines.experimental.*
4+
import kotlinx.coroutines.experimental.selects.*
5+
import org.junit.Test
6+
import org.junit.runner.*
7+
import org.junit.runners.*
8+
import java.io.*
9+
import kotlin.test.*
10+
11+
12+
@RunWith(Parameterized::class)
13+
class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
14+
15+
companion object {
16+
@Parameterized.Parameters(name = "{0}")
17+
@JvmStatic
18+
fun params(): Collection<Array<Any>> =
19+
Channel.values().map { arrayOf<Any>(it) }
20+
}
21+
22+
enum class Channel {
23+
DELAY {
24+
override fun invoke(delay: Long, initialDelay: Long) = DelayChannel(delay, initialDelay = initialDelay)
25+
},
26+
27+
FIXED_DELAY {
28+
override fun invoke(delay: Long, initialDelay: Long) = FixedDelayChannel(delay, initialDelay = initialDelay)
29+
};
30+
31+
32+
abstract operator fun invoke(delay: Long, initialDelay: Long = 0): ReceiveChannel<Unit>
33+
}
34+
35+
36+
@Test
37+
fun testDelay() = runTest {
38+
val delayChannel = channelFactory(delay = 100)
39+
delayChannel.checkNotEmpty()
40+
delayChannel.checkEmpty()
41+
42+
delay(50)
43+
delayChannel.checkEmpty()
44+
delay(52)
45+
delayChannel.checkNotEmpty()
46+
47+
delayChannel.cancel()
48+
delay(52)
49+
delayChannel.checkEmpty()
50+
delayChannel.cancel()
51+
}
52+
53+
@Test
54+
fun testInitialDelay() = runBlocking<Unit> {
55+
val delayChannel = channelFactory(initialDelay = 75, delay = 100)
56+
delayChannel.checkEmpty()
57+
delay(50)
58+
delayChannel.checkEmpty()
59+
delay(30)
60+
delayChannel.checkNotEmpty()
61+
62+
// Regular delay
63+
delay(75)
64+
delayChannel.checkEmpty()
65+
delay(26)
66+
delayChannel.checkNotEmpty()
67+
delayChannel.cancel()
68+
}
69+
70+
71+
@Test
72+
fun testReceive() = runBlocking<Unit> {
73+
val delayChannel = channelFactory(delay = 100)
74+
delayChannel.checkNotEmpty()
75+
var value = withTimeoutOrNull(75) {
76+
delayChannel.receive()
77+
1
78+
}
79+
80+
assertNull(value)
81+
value = withTimeoutOrNull(26) {
82+
delayChannel.receive()
83+
1
84+
}
85+
86+
assertNotNull(value)
87+
delayChannel.cancel()
88+
}
89+
90+
@Test
91+
fun testComplexOperator() = runBlocking {
92+
val producer = produce {
93+
for (i in 1..7) {
94+
send(i)
95+
delay(100)
96+
}
97+
}
98+
99+
val averages = producer.averageInTimeWindow(300).toList()
100+
assertEquals(listOf(2.0, 5.0, 7.0), averages)
101+
}
102+
103+
private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = produce {
104+
val delayChannel = channelFactory(delay = timespan, initialDelay = timespan)
105+
var sum = 0
106+
var n = 0
107+
whileSelect {
108+
this@averageInTimeWindow.onReceiveOrNull {
109+
when (it) {
110+
null -> {
111+
// Send leftovers and bail out
112+
if (n != 0) send(sum / n.toDouble())
113+
false
114+
}
115+
else -> {
116+
sum += it
117+
++n
118+
true
119+
}
120+
}
121+
}
122+
123+
// Timeout, send aggregated average and reset counters
124+
delayChannel.onReceive {
125+
send(sum / n.toDouble())
126+
sum = 0
127+
n = 0
128+
true
129+
}
130+
}
131+
132+
delayChannel.cancel()
133+
}
134+
135+
@Test
136+
fun testStress() = runBlocking<Unit> {
137+
// No OOM/SOE
138+
val iterations = 500_000 * stressTestMultiplier
139+
val delayChannel = channelFactory(0)
140+
repeat(iterations) {
141+
delayChannel.receive()
142+
}
143+
144+
delayChannel.cancel()
145+
}
146+
147+
@Test(expected = IllegalArgumentException::class)
148+
fun testNegativeDelay() {
149+
channelFactory(-1)
150+
}
151+
152+
@Test(expected = IllegalArgumentException::class)
153+
fun testNegativeInitialDelay() {
154+
channelFactory(initialDelay = -1, delay = 100)
155+
}
156+
}
157+
158+
fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
159+
160+
fun ReceiveChannel<Unit>.checkNotEmpty() = assertNotNull(poll())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package kotlinx.coroutines.experimental.channels
2+
3+
import kotlinx.coroutines.experimental.*
4+
import org.junit.*
5+
6+
class DelayChannelTest : TestBase() {
7+
8+
@Test
9+
fun testFixedDelayChannelBackpressure() = runBlocking<Unit> {
10+
val delayChannel = FixedDelayChannel(delay = 100)
11+
delayChannel.checkNotEmpty()
12+
delayChannel.checkEmpty()
13+
14+
delay(150)
15+
delayChannel.checkNotEmpty()
16+
delay(50)
17+
delayChannel.checkEmpty()
18+
delay(52)
19+
delayChannel.checkNotEmpty()
20+
delayChannel.cancel()
21+
}
22+
23+
@Test
24+
fun testDelayChannelBackpressure() = runBlocking<Unit> {
25+
val delayChannel = DelayChannel(delay = 100)
26+
delayChannel.checkNotEmpty()
27+
delayChannel.checkEmpty()
28+
29+
delay(150)
30+
delayChannel.checkNotEmpty()
31+
delay(52)
32+
delayChannel.checkNotEmpty()
33+
delay(50)
34+
delayChannel.checkEmpty()
35+
delay(52)
36+
delayChannel.checkNotEmpty()
37+
delayChannel.cancel()
38+
}
39+
}

0 commit comments

Comments
 (0)