Spring Data provides dedicated messaging integration for Redis, very similar in functionality and naming to the JMS integration in Spring Framework; in fact, users familiar with the JMS support in Spring should feel right at home.
Redis messaging can be roughly divided into two areas of functionality, namely the production or publication and consumption or subscription of messages, hence the shortcut pubsub (Publish/Subscribe). The ReactiveRedisTemplate
class is used for message production. For asynchronous reception, Spring Data provides a dedicated message listener container that is used consume a stream of messages.
For the purpose of just subscribing ReactiveRedisTemplate
offers stripped down alternatives to utilizing a listener container.
The package org.springframework.data.redis.connection
and org.springframework.data.redis.listener
provide the core functionality for using Redis messaging.
To publish a message, one can use, as with the other operations, either the low-level ReactiveRedisConnection
or the high-level ReactiveRedisTemplate
. Both entities offer a publish method that accepts as an argument the message that needs to be sent as well as the destination channel. While ReactiveRedisConnection
requires raw-data, the ReactiveRedisTemplate
allow arbitrary objects to be passed in as messages:
// send message through ReactiveRedisConnection
ByteBuffer msg = …
ByteBuffer channel = …
Mono<Long> publish = con.publish(msg, channel);
// send message through ReactiveRedisTemplate
ReactiveRedisTemplate template = …
Mono<Long> publish = template.convertAndSend("channel", "message");
On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching. The latter approach is quite useful as it not only allows multiple subscriptions to be created with one command but to also listen on channels not yet created at subscription time (as long as they match the pattern).
At the low-level, ReactiveRedisConnection
offers subscribe
and pSubscribe
methods that map the Redis commands for subscribing by channel respectively by pattern. Note that multiple channels or patterns can be used as arguments. To change a subscription, simply query the channels and patterns of ReactiveSubscription
.
Note
|
Reactive subscription commands in Spring Data Redis are non-blocking and may end without emitting an element. |
As mentioned above, once subscribed a connection starts waiting for messages. No other commands can be invoked on it except for adding new subscriptions or modifying/canceling the existing ones. Commands other than subscribe
, pSubscribe
, unsubscribe
, or pUnsubscribe
are illegal and will cause an exception.
In order to receive messages, one needs to obtain the message stream. Note that a subscription only publishes messages for channels and patterns that are registered with that particular subscription. The message stream itself is a hot sequence that produces elements without regard to demand. Make sure to register sufficient demand to not exhaust the message buffer.
Spring Data offers ReactiveRedisMessageListenerContainer
which does all the heavy lifting of conversion and subscription state management on behalf of the user.
ReactiveRedisMessageListenerContainer
acts as a message listener container. It is used to receive messages from a Redis channel and expose a stream of messages that emits channel messages with deserialization applied. It takes care of registering to receive messages, resource acquisition and release, exception conversion and the like. This allows you as an application developer to write the (possibly complex) business logic associated with receiving a message (and reacting to it), and delegates boilerplate Redis infrastructure concerns to the framework. Message streams register a subscription in Redis upon publisher subscription and unregister if the subscription gets canceled.
Furthermore, to minimize the application footprint, ReactiveRedisMessageListenerContainer
allows one connection and one thread to be shared by multiple listeners even though they do not share a subscription. Thus no matter how many listeners or channels an application tracks, the runtime cost will remain the same through out its lifetime. Moreover, the container allows runtime configuration changes so one can add or remove listeners while an application is running without the need for restart. Additionally, the container uses a lazy subscription approach, using a ReactiveRedisConnection
only when needed - if all the listeners are unsubscribed, cleanup is automatically performed.
The message listener container itself does not require external threading resources. It uses the driver threads to publish messages.
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));
To await and ensure proper subscription, you can use the receiveLater
method that returns a Mono<Flux<ChannelMessage>>
.
The resulting Mono
completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting onNext
signals, you can synchronize server-side subscriptions.
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));
stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
.flatMapMany(Function.identity())
.…;
As mentioned above you can directly use ReactiveRedisTemplate
to subscribe to channels / patterns. This approach
offers a straight forward, though limited solution as you lose the option to add subscriptions after the initial
ones. Nevertheless you still can control the message stream via the returned Flux
using eg. take(Duration)
. When
done reading, on error or cancellation all bound resources are freed again.
redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
// message processing ...
}).subscribe();