Skip to content

Commit 24b0f61

Browse files
mp911dechristophstrobl
authored andcommitted
Introduce ReactiveRedisOperations.executeInSession(…) and session callback interface.
Allows reuse of a bound connection without additional connection acquisition overhead. Closes: #2110 Original Pull Request: #2129
1 parent 997cdaa commit 24b0f61

File tree

5 files changed

+147
-8
lines changed

5 files changed

+147
-8
lines changed

Diff for: src/main/java/org/springframework/data/redis/core/ReactiveRedisOperations.java

+17
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,23 @@ public interface ReactiveRedisOperations<K, V> {
6363
*/
6464
<T> Flux<T> execute(ReactiveRedisCallback<T> action);
6565

66+
/**
67+
* Executes the given action within a Redis session using the same
68+
* {@link org.springframework.data.redis.connection.ReactiveRedisConnection}. Application exceptions thrown by the
69+
* action object get propagated to the caller (can only be unchecked) whenever possible. Redis exceptions are
70+
* transformed into appropriate DAO ones. Allows for returning a result object, that is a domain object or a
71+
* collection of domain objects. Performs automatic serialization/deserialization for the given objects to and from
72+
* binary data suitable for the Redis storage. Note: Callback code is not supposed to handle transactions itself! Use
73+
* an appropriate transaction manager. Generally, callback code must not touch any Connection lifecycle methods, like
74+
* close, to let the template do its work.
75+
*
76+
* @param <T> return type
77+
* @param action callback object that specifies the Redis action
78+
* @return a result object returned by the action or {@link Flux#empty()}.
79+
* @since 2.6
80+
*/
81+
<T> Flux<T> executeInSession(ReactiveRedisSessionCallback<K, V, T> action);
82+
6683
// -------------------------------------------------------------------------
6784
// Methods dealing with Redis Pub/Sub
6885
// -------------------------------------------------------------------------
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2017-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.core;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.springframework.dao.DataAccessException;
20+
import org.springframework.data.redis.connection.ReactiveRedisConnection;
21+
22+
/**
23+
* Generic callback interface for code that wants to use the same {@link ReactiveRedisConnection} avoiding connection
24+
* allocation overhead upon each Template API method call. Allows to execute any number of operations on a single
25+
* {@link ReactiveRedisConnection}, using any type and number of commands.
26+
* <p>
27+
* This is particularly useful for issuing multiple calls on the same connection.
28+
*
29+
* @param <T>
30+
* @author Mark Paluch
31+
* @since 2.6
32+
* @see ReactiveRedisOperations#executeInSession(ReactiveRedisSessionCallback)
33+
*/
34+
public interface ReactiveRedisSessionCallback<K, V, T> {
35+
36+
/**
37+
* Gets called by {@link ReactiveRedisOperations#executeInSession(ReactiveRedisSessionCallback)} with an active Redis
38+
* connection. Does not need to care about activating or closing the {@link ReactiveRedisConnection}.
39+
* <p>
40+
* Allows for returning a result object created within the callback, i.e. a domain object or a collection of domain
41+
* objects.
42+
*
43+
* @param operations template associated with a connection.
44+
* @return a result object publisher
45+
* @throws DataAccessException in case of custom exceptions
46+
*/
47+
Publisher<T> doWithOperations(ReactiveRedisOperations<K, V> operations) throws DataAccessException;
48+
}

Diff for: src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ public <T> Flux<T> execute(ReactiveRedisCallback<T> action, boolean exposeConnec
151151
return Flux.from(doInConnection(action, exposeConnection));
152152
}
153153

154+
@Override
155+
public <T> Flux<T> executeInSession(ReactiveRedisSessionCallback<K, V, T> action) {
156+
157+
Assert.notNull(action, "Callback object must not be null");
158+
return Flux
159+
.from(doInConnection(connection -> action.doWithOperations(withConnection(connection)), exposeConnection));
160+
}
161+
154162
/**
155163
* Create a reusable Flux for a {@link ReactiveRedisCallback}. Callback is executed within a connection context. The
156164
* connection is released outside the callback.
@@ -188,7 +196,7 @@ public <T> Mono<T> createMono(ReactiveRedisCallback<T> callback) {
188196
* @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
189197
* @return object returned by the action
190198
*/
191-
private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeConnection) {
199+
<T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeConnection) {
192200

193201
Assert.notNull(action, "Callback object must not be null");
194202

@@ -742,6 +750,31 @@ public RedisSerializationContext<K, V> getSerializationContext() {
742750
return serializationContext;
743751
}
744752

753+
private ReactiveRedisOperations<K, V> withConnection(ReactiveRedisConnection connection) {
754+
return new BoundConnectionRedisTemplate(connection, connectionFactory, serializationContext);
755+
}
756+
757+
class BoundConnectionRedisTemplate extends ReactiveRedisTemplate<K, V> {
758+
759+
private final ReactiveRedisConnection connection;
760+
761+
public BoundConnectionRedisTemplate(ReactiveRedisConnection connection,
762+
ReactiveRedisConnectionFactory connectionFactory, RedisSerializationContext<K, V> serializationContext) {
763+
super(connectionFactory, serializationContext, true);
764+
this.connection = connection;
765+
}
766+
767+
@Override
768+
<T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean exposeConnection) {
769+
770+
Assert.notNull(action, "Callback object must not be null");
771+
772+
ReactiveRedisConnection connToUse = ReactiveRedisTemplate.this.preProcessConnection(connection, true);
773+
Publisher<T> result = action.doInRedis(connToUse);
774+
return ReactiveRedisTemplate.this.postProcessResult(result, connToUse, true);
775+
}
776+
}
777+
745778
private ByteBuffer rawKey(K key) {
746779
return getSerializationContext().getKeySerializationPair().getWriter().write(key);
747780
}

Diff for: src/main/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensions.kt

+21-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
2222
import kotlinx.coroutines.reactive.awaitSingle
2323
import org.springframework.data.redis.connection.DataType
2424
import org.springframework.data.redis.connection.ReactiveRedisConnection
25-
import org.springframework.data.redis.connection.ReactiveSubscription.*
25+
import org.springframework.data.redis.connection.ReactiveSubscription.Message
2626
import org.springframework.data.redis.core.script.RedisScript
2727
import org.springframework.data.redis.listener.Topic
2828
import org.springframework.data.redis.serializer.RedisElementReader
@@ -36,17 +36,33 @@ import java.time.Instant
3636
* @author Sebastien Deleuze
3737
* @since 2.2
3838
*/
39-
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(action: (ReactiveRedisConnection) -> Flow<T>): Flow<T> =
40-
execute { action(it).asPublisher() }.asFlow()
39+
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(action: (ReactiveRedisConnection) -> Flow<T>): Flow<T> {
40+
return execute { action(it).asPublisher() }.asFlow()
41+
}
42+
43+
/**
44+
* Coroutines variant of [ReactiveRedisOperations.execute].
45+
*
46+
* @author Mark Paluch
47+
* @since 2.6
48+
*/
49+
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeInSessionAsFlow(
50+
action: (ReactiveRedisOperations<K, V>) -> Flow<T>
51+
): Flow<T> =
52+
executeInSession { action(it).asPublisher() }.asFlow()
4153

4254
/**
4355
* Coroutines variant of [ReactiveRedisOperations.execute].
4456
*
4557
* @author Sebastien Deleuze
4658
* @since 2.2
4759
*/
48-
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(script: RedisScript<T>, keys: List<K> = emptyList(), args: List<*> = emptyList<Any>()): Flow<T> =
49-
execute(script, keys, args).asFlow()
60+
fun <K : Any, V : Any, T : Any> ReactiveRedisOperations<K, V>.executeAsFlow(
61+
script: RedisScript<T>,
62+
keys: List<K> = emptyList(),
63+
args: List<*> = emptyList<Any>()
64+
): Flow<T> =
65+
execute(script, keys, args).asFlow()
5066

5167
/**
5268
* Coroutines variant of [ReactiveRedisOperations.execute].

Diff for: src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt

+27-2
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,45 @@ class ReactiveRedisOperationsExtensionsUnitTests {
5050
every { operations.execute(any<ReactiveRedisCallback<*>>()) } returns Flux.just("foo")
5151

5252
runBlocking {
53-
assertThat(operations.executeAsFlow { flow { emit("foo")} }.toList()).contains("foo")
53+
assertThat(operations.executeAsFlow { flow { emit("foo") } }
54+
.toList()).contains("foo")
5455
}
5556

5657
verify {
5758
operations.execute(any<ReactiveRedisCallback<*>>())
5859
}
5960
}
6061

62+
@Test // GH-2110
63+
fun `executeInSession with calllback`() {
64+
65+
val operations = mockk<ReactiveRedisOperations<String, String>>()
66+
every { operations.executeInSession(any<ReactiveRedisSessionCallback<String, String, *>>()) } returns Flux.just(
67+
"foo"
68+
)
69+
70+
runBlocking {
71+
assertThat(operations.executeInSessionAsFlow { flow { emit("foo") } }
72+
.toList()).contains("foo")
73+
}
74+
75+
verify {
76+
operations.executeInSession(any<ReactiveRedisSessionCallback<String, String, *>>())
77+
}
78+
}
79+
6180
@Test // DATAREDIS-1033
6281
fun `execute with script`() {
6382

6483
val script = RedisScript.of<String>("foo")
6584
val operations = mockk<ReactiveRedisOperations<String, String>>()
66-
every { operations.execute(any<RedisScript<*>>(), any(), any()) } returns Flux.just("foo")
85+
every {
86+
operations.execute(
87+
any<RedisScript<*>>(),
88+
any(),
89+
any()
90+
)
91+
} returns Flux.just("foo")
6792

6893
runBlocking {
6994
assertThat(operations.executeAsFlow(script).toList()).contains("foo")

0 commit comments

Comments
 (0)