Skip to content

Commit 78b895f

Browse files
sdeleuzechristophstrobl
authored andcommitted
DATAREDIS-1033 - Add Kotlin Flow based extensions.
Original Pull Request: #477
1 parent 669d3a9 commit 78b895f

16 files changed

+1541
-22
lines changed

src/main/kotlin/org/springframework/data/redis/core/ReactiveGeoOperationsExtensions.kt

+51-3
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,19 @@
1515
*/
1616
package org.springframework.data.redis.core
1717

18+
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.asFlow
1821
import kotlinx.coroutines.reactive.awaitFirstOrNull
1922
import kotlinx.coroutines.reactive.awaitSingle
23+
import kotlinx.coroutines.reactor.asFlux
24+
import org.springframework.data.geo.Circle
2025
import org.springframework.data.geo.Distance
26+
import org.springframework.data.geo.GeoResult
2127
import org.springframework.data.geo.Metric
2228
import org.springframework.data.geo.Point
23-
import org.springframework.data.redis.connection.RedisGeoCommands
29+
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation
30+
import org.springframework.data.redis.connection.RedisGeoCommands.GeoRadiusCommandArgs
2431

2532
/**
2633
* Coroutines variant of [ReactiveGeoOperations.add].
@@ -37,7 +44,7 @@ suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.addAndAwait(key: K, p
3744
* @author Mark Paluch
3845
* @since 2.2
3946
*/
40-
suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.addAndAwait(key: K, location: RedisGeoCommands.GeoLocation<M>): Long =
47+
suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.addAndAwait(key: K, location: GeoLocation<M>): Long =
4148
add(key, location).awaitSingle()
4249

4350
/**
@@ -55,9 +62,19 @@ suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.addAndAwait(key: K, m
5562
* @author Mark Paluch
5663
* @since 2.2
5764
*/
58-
suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.addAndAwait(key: K, locations: Iterable<RedisGeoCommands.GeoLocation<M>>): Long =
65+
suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.addAndAwait(key: K, locations: Iterable<GeoLocation<M>>): Long =
5966
add(key, locations).awaitSingle()
6067

68+
/**
69+
* Coroutines [Flow] variant of [ReactiveGeoOperations.add].
70+
*
71+
* @author Sebastien Deleuze
72+
* @since 2.2
73+
*/
74+
@ExperimentalCoroutinesApi
75+
fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.add(key: K, locations: Flow<Collection<GeoLocation<M>>>): Flow<Long> =
76+
add(key, locations.asFlux()).asFlow()
77+
6178
/**
6279
* Coroutines variant of [ReactiveGeoOperations.distance].
6380
*
@@ -115,6 +132,37 @@ suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.positionAndAwait(key:
115132
suspend fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.positionAndAwait(key: K, vararg members: M): List<Point> =
116133
position(key, *members).awaitSingle()
117134

135+
/**
136+
* Coroutines [Flow] variant of [ReactiveGeoOperations.radius].
137+
*
138+
* @author Sebastien Deleuze
139+
* @since 2.2
140+
*/
141+
@ExperimentalCoroutinesApi
142+
fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.radiusAsFlow(key: K, within: Circle, args: GeoRadiusCommandArgs? = null): Flow<GeoResult<GeoLocation<M>>> =
143+
(if (args != null) radius(key, within, args) else radius(key, within)).asFlow()
144+
145+
146+
/**
147+
* Coroutines [Flow] variant of [ReactiveGeoOperations.radius].
148+
*
149+
* @author Sebastien Deleuze
150+
* @since 2.2
151+
*/
152+
@ExperimentalCoroutinesApi
153+
fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.radiusAsFlow(key: K, member: M, radius: Double): Flow<GeoResult<GeoLocation<M>>> =
154+
radius(key, member, radius).asFlow()
155+
156+
/**
157+
* Coroutines [Flow] variant of [ReactiveGeoOperations.radius].
158+
*
159+
* @author Sebastien Deleuze
160+
* @since 2.2
161+
*/
162+
@ExperimentalCoroutinesApi
163+
fun <K : Any, M : Any> ReactiveGeoOperations<K, M>.radiusAsFlow(key: K, member: M, distance: Distance, args: GeoRadiusCommandArgs? = null): Flow<GeoResult<GeoLocation<M>>> =
164+
(if (args != null) radius(key, member, distance, args) else radius(key, member, distance)).asFlow()
165+
118166
/**
119167
* Coroutines variant of [ReactiveGeoOperations.remove].
120168
*

src/main/kotlin/org/springframework/data/redis/core/ReactiveHashOperationsExtensions.kt

+43
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package org.springframework.data.redis.core
1717

18+
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.asFlow
1821
import kotlinx.coroutines.reactive.awaitFirstOrNull
1922
import kotlinx.coroutines.reactive.awaitSingle
2023

@@ -55,6 +58,16 @@ suspend fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.mult
5558
suspend fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.incrementAndAwait(key: H, hashKey: HK, delta: Long): Long =
5659
increment(key, hashKey, delta).awaitSingle()
5760

61+
/**
62+
* Coroutines variant of [ReactiveHashOperations.keys].
63+
*
64+
* @author Sebastien Deleuze
65+
* @since 2.2
66+
*/
67+
@ExperimentalCoroutinesApi
68+
fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.keysAsFlow(key: H): Flow<HK> =
69+
keys(key).asFlow()
70+
5871
/**
5972
* Coroutines variant of [ReactiveHashOperations.increment].
6073
*
@@ -100,6 +113,36 @@ suspend fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.putA
100113
suspend fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.putIfAbsentAndAwait(key: H, hashKey: HK, hashValue: HV): Boolean =
101114
putIfAbsent(key, hashKey, hashValue).awaitSingle()
102115

116+
/**
117+
* Coroutines variant of [ReactiveHashOperations.values].
118+
*
119+
* @author Sebastien Deleuze
120+
* @since 2.2
121+
*/
122+
@ExperimentalCoroutinesApi
123+
fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.valuesAsFlow(key: H): Flow<HV> =
124+
values(key).asFlow()
125+
126+
/**
127+
* Coroutines variant of [ReactiveHashOperations.entries].
128+
*
129+
* @author Sebastien Deleuze
130+
* @since 2.2
131+
*/
132+
@ExperimentalCoroutinesApi
133+
fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.entriesAsFlow(key: H): Flow<Map.Entry<HK, HV>> =
134+
entries(key).asFlow()
135+
136+
/**
137+
* Coroutines variant of [ReactiveHashOperations.scan].
138+
*
139+
* @author Sebastien Deleuze
140+
* @since 2.2
141+
*/
142+
@ExperimentalCoroutinesApi
143+
fun <H : Any, HK : Any, HV : Any> ReactiveHashOperations<H, HK, HV>.scanAsFlow(key: H, options: ScanOptions = ScanOptions.NONE): Flow<Map.Entry<HK, HV>> =
144+
scan(key, options).asFlow()
145+
103146
/**
104147
* Coroutines variant of [ReactiveHashOperations.remove].
105148
*

src/main/kotlin/org/springframework/data/redis/core/ReactiveListOperationsExtensions.kt

+13
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,23 @@
1515
*/
1616
package org.springframework.data.redis.core
1717

18+
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.asFlow
1821
import kotlinx.coroutines.reactive.awaitFirstOrNull
1922
import kotlinx.coroutines.reactive.awaitSingle
2023
import java.time.Duration
2124

25+
/**
26+
* Coroutines variant of [ReactiveListOperations.range].
27+
*
28+
* @author Sebastien Deleuze
29+
* @since 2.2
30+
*/
31+
@ExperimentalCoroutinesApi
32+
fun <K : Any, V : Any> ReactiveListOperations<K, V>.rangeAsFlow(key: K, start: Long, end: Long): Flow<V> =
33+
range(key, start, end).asFlow()
34+
2235
/**
2336
* Coroutines variant of [ReactiveListOperations.trim].
2437
*

src/main/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensions.kt

+90
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,52 @@
1515
*/
1616
package org.springframework.data.redis.core
1717

18+
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.flow.Flow
20+
import kotlinx.coroutines.reactive.asFlow
21+
import kotlinx.coroutines.reactive.asPublisher
1822
import kotlinx.coroutines.reactive.awaitFirstOrNull
1923
import kotlinx.coroutines.reactive.awaitSingle
2024
import org.springframework.data.redis.connection.DataType
25+
import org.springframework.data.redis.connection.ReactiveRedisConnection
26+
import org.springframework.data.redis.connection.ReactiveSubscription.*
27+
import org.springframework.data.redis.core.script.RedisScript
28+
import org.springframework.data.redis.listener.Topic
29+
import org.springframework.data.redis.serializer.RedisElementReader
30+
import org.springframework.data.redis.serializer.RedisElementWriter
2131
import java.time.Duration
2232
import java.time.Instant
2333

34+
/**
35+
* Coroutines variant of [ReactiveRedisOperations.execute].
36+
*
37+
* @author Sebastien Deleuze
38+
* @since 2.2
39+
*/
40+
@ExperimentalCoroutinesApi
41+
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(action: (ReactiveRedisConnection) -> Flow<T>): Flow<T> =
42+
execute { action(it).asPublisher() }.asFlow()
43+
44+
/**
45+
* Coroutines variant of [ReactiveRedisOperations.execute].
46+
*
47+
* @author Sebastien Deleuze
48+
* @since 2.2
49+
*/
50+
@ExperimentalCoroutinesApi
51+
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(script: RedisScript<T>, keys: List<K> = emptyList(), args: List<*> = emptyList<Any>()): Flow<T> =
52+
execute(script, keys, args).asFlow()
53+
54+
/**
55+
* Coroutines variant of [ReactiveRedisOperations.execute].
56+
*
57+
* @author Sebastien Deleuze
58+
* @since 2.2
59+
*/
60+
@ExperimentalCoroutinesApi
61+
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(script: RedisScript<T>, keys: List<K> = emptyList(), args: List<*> = emptyList<Any>(), argsWriter: RedisElementWriter<*>, resultReader: RedisElementReader<T>): Flow<T> =
62+
execute(script, keys, args, argsWriter, resultReader).asFlow()
63+
2464
/**
2565
* Coroutines variant of [ReactiveRedisOperations.convertAndSend].
2666
*
@@ -30,6 +70,36 @@ import java.time.Instant
3070
suspend fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.sendAndAwait(destination: String, message: V): Long =
3171
convertAndSend(destination, message).awaitSingle()
3272

73+
/**
74+
* Coroutines variant of [ReactiveRedisOperations.listenToChannel].
75+
*
76+
* @author Sebastien Deleuze
77+
* @since 2.2
78+
*/
79+
@ExperimentalCoroutinesApi
80+
fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.listenToChannelAsFlow(vararg channels: String): Flow<Message<String, V>> =
81+
listenToChannel(*channels).asFlow()
82+
83+
/**
84+
* Coroutines variant of [ReactiveRedisOperations.listenToPattern].
85+
*
86+
* @author Sebastien Deleuze
87+
* @since 2.2
88+
*/
89+
@ExperimentalCoroutinesApi
90+
fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.listenToPatternAsFlow(vararg patterns: String): Flow<Message<String, V>> =
91+
listenToPattern(*patterns).asFlow()
92+
93+
/**
94+
* Coroutines variant of [ReactiveRedisOperations.listenTo].
95+
*
96+
* @author Sebastien Deleuze
97+
* @since 2.2
98+
*/
99+
@ExperimentalCoroutinesApi
100+
fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.listenToAsFlow(vararg topics: Topic): Flow<Message<String, V>> =
101+
listenTo(*topics).asFlow()
102+
33103
/**
34104
* Coroutines variant of [ReactiveRedisOperations.hasKey].
35105
*
@@ -48,6 +118,26 @@ suspend fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.hasKeyAndAwait(key:
48118
suspend fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.typeAndAwait(key: K): DataType =
49119
type(key).awaitSingle()
50120

121+
/**
122+
* Coroutines variant of [ReactiveRedisOperations.keys].
123+
*
124+
* @author Sebastien Deleuze
125+
* @since 2.2
126+
*/
127+
@ExperimentalCoroutinesApi
128+
fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.keysAsFlow(pattern: K): Flow<K> =
129+
keys(pattern).asFlow()
130+
131+
/**
132+
* Coroutines variant of [ReactiveRedisOperations.scan].
133+
*
134+
* @author Sebastien Deleuze
135+
* @since 2.2
136+
*/
137+
@ExperimentalCoroutinesApi
138+
fun <K : Any, V : Any> ReactiveRedisOperations<K, V>.scanAsFlow(options: ScanOptions = ScanOptions.NONE): Flow<K> =
139+
scan(options).asFlow()
140+
51141
/**
52142
* Coroutines variant of [ReactiveRedisOperations.randomKey].
53143
*

0 commit comments

Comments
 (0)