Skip to content

Commit ed0e77c

Browse files
committed
Fixed bugs in Kotlin flow operators.
Kotlin/kotlinx.coroutines#2051
1 parent 5563f38 commit ed0e77c

File tree

12 files changed

+36
-28
lines changed

12 files changed

+36
-28
lines changed

src/main/kotlin/com/gitlab/dhorman/cryptotrader/api/PoloniexTraderApi.kt

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarket
1414
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketCompleted
1515
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketExtensions
1616
import com.gitlab.dhorman.cryptotrader.util.CsvGenerator
17+
import com.gitlab.dhorman.cryptotrader.util.first
1718
import io.swagger.annotations.ApiOperation
1819
import io.vavr.Tuple2
1920
import io.vavr.Tuple3

src/main/kotlin/com/gitlab/dhorman/cryptotrader/core/SimulatedPath.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.gitlab.dhorman.cryptotrader.service.poloniex.volume
88
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarket
99
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketPartiallyCompleted
1010
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketPredicted
11+
import com.gitlab.dhorman.cryptotrader.util.first
1112
import io.vavr.Tuple2
1213
import io.vavr.collection.Array
1314
import io.vavr.collection.Map
@@ -17,7 +18,6 @@ import io.vavr.kotlin.component2
1718
import io.vavr.kotlin.getOrNull
1819
import io.vavr.kotlin.tuple
1920
import kotlinx.coroutines.flow.Flow
20-
import kotlinx.coroutines.flow.first
2121
import java.math.BigDecimal
2222
import java.math.RoundingMode
2323
import java.util.*

src/main/kotlin/com/gitlab/dhorman/cryptotrader/service/poloniex/ExtendedPoloniexApi.kt

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import com.gitlab.dhorman.cryptotrader.core.*
66
import com.gitlab.dhorman.cryptotrader.service.poloniex.exception.DisconnectedException
77
import com.gitlab.dhorman.cryptotrader.service.poloniex.model.*
88
import com.gitlab.dhorman.cryptotrader.trader.core.AdjustedPoloniexBuySellAmountCalculator
9+
import com.gitlab.dhorman.cryptotrader.util.first
910
import com.gitlab.dhorman.cryptotrader.util.share
1011
import io.netty.handler.timeout.ReadTimeoutException
1112
import io.netty.handler.timeout.WriteTimeoutException

src/main/kotlin/com/gitlab/dhorman/cryptotrader/trader/DelayedTradeProcessor.kt

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import com.gitlab.dhorman.cryptotrader.trader.exception.CantMoveOrderSafelyExcep
1111
import com.gitlab.dhorman.cryptotrader.trader.exception.CompletePlaceMoveOrderLoop
1212
import com.gitlab.dhorman.cryptotrader.trader.exception.MoveNotRequiredException
1313
import com.gitlab.dhorman.cryptotrader.trader.exception.RepeatPlaceMoveOrderLoopAgain
14+
import com.gitlab.dhorman.cryptotrader.util.first
1415
import com.gitlab.dhorman.cryptotrader.util.returnLastIfNoValueWithinSpecifiedTime
1516
import io.netty.handler.timeout.ReadTimeoutException
1617
import io.netty.handler.timeout.WriteTimeoutException

src/main/kotlin/com/gitlab/dhorman/cryptotrader/trader/PathGenerator.kt

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import com.gitlab.dhorman.cryptotrader.trader.core.AdjustedPoloniexBuySellAmount
99
import com.gitlab.dhorman.cryptotrader.trader.dao.BlacklistedMarketsDao
1010
import com.gitlab.dhorman.cryptotrader.trader.dao.TransactionsDao
1111
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketCompleted
12+
import com.gitlab.dhorman.cryptotrader.util.first
1213
import io.vavr.Tuple2
1314
import io.vavr.Tuple3
1415
import io.vavr.collection.List

src/main/kotlin/com/gitlab/dhorman/cryptotrader/trader/PoloniexTrader.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketCompleted
1919
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketExtensions
2020
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketPartiallyCompleted
2121
import com.gitlab.dhorman.cryptotrader.util.defaultTran
22+
import com.gitlab.dhorman.cryptotrader.util.first
2223
import com.gitlab.dhorman.cryptotrader.util.repeatableReadTran
2324
import io.vavr.Tuple2
2425
import io.vavr.collection.Array
2526
import io.vavr.kotlin.*
2627
import kotlinx.coroutines.*
2728
import kotlinx.coroutines.channels.Channel
28-
import kotlinx.coroutines.flow.first
2929
import kotlinx.coroutines.reactive.collect
3030
import mu.KotlinLogging
3131
import org.springframework.beans.factory.annotation.Qualifier

src/main/kotlin/com/gitlab/dhorman/cryptotrader/trader/TransactionIntent.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ import io.vavr.collection.List
3131
import io.vavr.kotlin.*
3232
import kotlinx.coroutines.*
3333
import kotlinx.coroutines.channels.Channel
34-
import kotlinx.coroutines.flow.*
34+
import kotlinx.coroutines.flow.Flow
35+
import kotlinx.coroutines.flow.collect
36+
import kotlinx.coroutines.flow.flow
37+
import kotlinx.coroutines.flow.toList
3538
import kotlinx.coroutines.sync.Mutex
3639
import kotlinx.coroutines.sync.withLock
3740
import mu.KotlinLogging

src/main/kotlin/com/gitlab/dhorman/cryptotrader/trader/model/TraderModels.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ import com.gitlab.dhorman.cryptotrader.service.poloniex.model.Currency
1111
import com.gitlab.dhorman.cryptotrader.service.poloniex.model.CurrencyType
1212
import com.gitlab.dhorman.cryptotrader.service.poloniex.model.OrderType
1313
import com.gitlab.dhorman.cryptotrader.trader.core.AdjustedBuySellAmountCalculator
14+
import com.gitlab.dhorman.cryptotrader.util.first
1415
import io.vavr.collection.Array
1516
import io.vavr.kotlin.component1
1617
import io.vavr.kotlin.component2
17-
import kotlinx.coroutines.flow.first
1818
import java.math.BigDecimal
1919

2020
@JsonTypeInfo(

src/main/kotlin/com/gitlab/dhorman/cryptotrader/util/FlowOperators.kt

+18-19
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package com.gitlab.dhorman.cryptotrader.util
22

3-
import io.vavr.Tuple2
4-
import io.vavr.collection.HashMap
5-
import io.vavr.collection.Map
63
import kotlinx.coroutines.*
74
import kotlinx.coroutines.channels.Channel
85
import kotlinx.coroutines.channels.ProducerScope
@@ -28,7 +25,6 @@ private open class ShareOperator<T>(
2825
private val gracePeriod: Duration? = null,
2926
private val scope: CoroutineScope? = null
3027
) {
31-
private var subscribers = 0L
3228
private val subscriberChannels = mutableSetOf<SendChannel<T>>()
3329
private val subscriberChannelsMutex = Mutex()
3430
private val upstreamSubscriptionLock = Mutex()
@@ -57,8 +53,8 @@ private open class ShareOperator<T>(
5753
}
5854
}
5955
} catch (e: CancellationException) {
60-
if (subscriberChannels.size > 0) {
61-
logger.warn("upstream cancelled when subscriberChannels (${subscriberChannels.size}) variable is not empty.")
56+
if (subscriberChannels.isNotEmpty()) {
57+
logger.warn("$upstream cancelled when subscriberChannels (${subscriberChannels.size}) variable is not empty.")
6258
}
6359
throw e
6460
}
@@ -123,7 +119,7 @@ private open class ShareOperator<T>(
123119
val shareOperator = channelFlow<T> {
124120
try {
125121
upstreamSubscriptionLock.withLock {
126-
if (++subscribers == 1L) {
122+
if (subscriberChannels.isEmpty()) {
127123
cancelGracePeriodTimerJob()
128124
processQueueAndSubscribeSelf()
129125
subscribeToUpstream()
@@ -138,7 +134,7 @@ private open class ShareOperator<T>(
138134
upstreamSubscriptionLock.withLock {
139135
unsubscribeSelf()
140136

141-
if (--subscribers == 0L) {
137+
if (subscriberChannels.isEmpty()) {
142138
val launched = launchGracePeriodTimerJob()
143139

144140
if (!launched) {
@@ -197,18 +193,21 @@ fun <T> Channel<T>.buffer(scope: CoroutineScope, timespan: Duration): Channel<Li
197193
return upstream
198194
}
199195

200-
fun <K, V> flowFromMap(map: Map<K, V>): Flow<Tuple2<K, V>> = flow {
201-
map.forEach { emit(it) }
202-
}
203-
204-
suspend fun <K, V> Flow<Tuple2<K, V>>.collectMap(): Map<K, V> {
205-
var map = HashMap.empty<K, V>()
196+
private object AbortException : Throwable("", null, true, false)
206197

207-
collect {
208-
map = map.put(it)
198+
suspend fun <T> Flow<T>.first(): T {
199+
var result: T? = null
200+
try {
201+
collect { value ->
202+
result = value
203+
throw AbortException
204+
}
205+
} catch (e: AbortException) {
206+
// Do nothing
209207
}
210208

211-
return map
209+
if (result == null) throw NoSuchElementException("Expected at least one element")
210+
return result!!
212211
}
213212

214213
suspend fun <T> Flow<T>.firstOrNull(): T? {
@@ -217,9 +216,9 @@ suspend fun <T> Flow<T>.firstOrNull(): T? {
217216
try {
218217
collect {
219218
result = it
220-
throw CancellationException()
219+
throw AbortException
221220
}
222-
} catch (e: CancellationException) {
221+
} catch (e: AbortException) {
223222
// Ignore
224223
}
225224

src/test/kotlin/com/gitlab/dhorman/cryptotrader/service/poloniex/ExtendedPoloniexApiTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ package com.gitlab.dhorman.cryptotrader.service.poloniex
22

33
import com.gitlab.dhorman.cryptotrader.core.toMarket
44
import com.gitlab.dhorman.cryptotrader.util.TestClock
5+
import com.gitlab.dhorman.cryptotrader.util.first
56
import com.gitlab.dhorman.cryptotrader.util.firstOrNull
67
import io.vavr.kotlin.getOrNull
78
import kotlinx.coroutines.Dispatchers
89
import kotlinx.coroutines.delay
910
import kotlinx.coroutines.flow.collect
10-
import kotlinx.coroutines.flow.first
1111
import kotlinx.coroutines.flow.onStart
1212
import kotlinx.coroutines.launch
1313
import kotlinx.coroutines.runBlocking

src/test/kotlin/com/gitlab/dhorman/cryptotrader/trader/PathGeneratorTest.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@ import com.gitlab.dhorman.cryptotrader.service.poloniex.ExtendedPoloniexApi
55
import com.gitlab.dhorman.cryptotrader.service.poloniex.core.PoloniexBuySellAmountCalculator
66
import com.gitlab.dhorman.cryptotrader.service.poloniex.model.Amount
77
import com.gitlab.dhorman.cryptotrader.service.poloniex.model.CurrencyType
8-
import com.gitlab.dhorman.cryptotrader.trader.PathGenerator
98
import com.gitlab.dhorman.cryptotrader.trader.dao.TransactionsDao
10-
import com.gitlab.dhorman.cryptotrader.trader.findOne
119
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketCompleted
1210
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketPartiallyCompleted
1311
import com.gitlab.dhorman.cryptotrader.trader.model.TranIntentMarketPredicted
1412
import com.gitlab.dhorman.cryptotrader.util.CsvGenerator
13+
import com.gitlab.dhorman.cryptotrader.util.first
1514
import com.nhaarman.mockitokotlin2.mock
1615
import com.nhaarman.mockitokotlin2.whenever
1716
import io.vavr.collection.Array
1817
import io.vavr.kotlin.*
1918
import kotlinx.coroutines.Dispatchers
20-
import kotlinx.coroutines.flow.*
19+
import kotlinx.coroutines.flow.asFlow
20+
import kotlinx.coroutines.flow.collect
21+
import kotlinx.coroutines.flow.flatMapMerge
22+
import kotlinx.coroutines.flow.map
2123
import kotlinx.coroutines.runBlocking
2224
import mu.KotlinLogging
2325
import org.junit.jupiter.api.Test

src/test/kotlin/com/gitlab/dhorman/cryptotrader/trader/dao/BlacklistedMarketsDaoTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package com.gitlab.dhorman.cryptotrader.trader.dao
22

33
import com.gitlab.dhorman.cryptotrader.core.Market
44
import com.gitlab.dhorman.cryptotrader.util.TestClock
5+
import com.gitlab.dhorman.cryptotrader.util.first
56
import kotlinx.coroutines.delay
6-
import kotlinx.coroutines.flow.first
77
import kotlinx.coroutines.runBlocking
88
import org.junit.jupiter.api.*
99
import org.springframework.beans.factory.annotation.Autowired

0 commit comments

Comments
 (0)