Skip to content

Latest commit

 

History

History
88 lines (61 loc) · 6.61 KB

reactive-messaging.adoc

File metadata and controls

88 lines (61 loc) · 6.61 KB

Redis Messaging/PubSub

Spring Data provides dedicated messaging integration for Redis, very similar in functionality and naming to the JMS integration in Spring Framework; in fact, users familiar with the JMS support in Spring should feel right at home.

Redis messaging can be roughly divided into two areas of functionality, namely the production or publication and consumption or subscription of messages, hence the shortcut pubsub (Publish/Subscribe). The ReactiveRedisTemplate class is used for message production. For asynchronous reception, Spring Data provides a dedicated message listener container that is used consume a stream of messages. For the purpose of just subscribing ReactiveRedisTemplate offers stripped down alternatives to utilizing a listener container.

The package org.springframework.data.redis.connection and org.springframework.data.redis.listener provide the core functionality for using Redis messaging.

Sending/Publishing messages

To publish a message, one can use, as with the other operations, either the low-level ReactiveRedisConnection or the high-level ReactiveRedisTemplate. Both entities offer a publish method that accepts as an argument the message that needs to be sent as well as the destination channel. While ReactiveRedisConnection requires raw-data, the ReactiveRedisTemplate allow arbitrary objects to be passed in as messages:

// send message through ReactiveRedisConnection
ByteBuffer msg = …
ByteBuffer channel = …
Mono<Long> publish = con.publish(msg, channel);

// send message through ReactiveRedisTemplate
ReactiveRedisTemplate template = …
Mono<Long> publish = template.convertAndSend("channel", "message");

Receiving/Subscribing for messages

On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching. The latter approach is quite useful as it not only allows multiple subscriptions to be created with one command but to also listen on channels not yet created at subscription time (as long as they match the pattern).

At the low-level, ReactiveRedisConnection offers subscribe and pSubscribe methods that map the Redis commands for subscribing by channel respectively by pattern. Note that multiple channels or patterns can be used as arguments. To change a subscription, simply query the channels and patterns of ReactiveSubscription.

Note
Reactive subscription commands in Spring Data Redis are non-blocking and may end without emitting an element.

As mentioned above, once subscribed a connection starts waiting for messages. No other commands can be invoked on it except for adding new subscriptions or modifying/canceling the existing ones. Commands other than subscribe, pSubscribe, unsubscribe, or pUnsubscribe are illegal and will cause an exception.

In order to receive messages, one needs to obtain the message stream. Note that a subscription only publishes messages for channels and patterns that are registered with that particular subscription. The message stream itself is a hot sequence that produces elements without regard to demand. Make sure to register sufficient demand to not exhaust the message buffer.

Message Listener Containers

Spring Data offers ReactiveRedisMessageListenerContainer which does all the heavy lifting of conversion and subscription state management on behalf of the user.

ReactiveRedisMessageListenerContainer acts as a message listener container. It is used to receive messages from a Redis channel and expose a stream of messages that emits channel messages with deserialization applied. It takes care of registering to receive messages, resource acquisition and release, exception conversion and the like. This allows you as an application developer to write the (possibly complex) business logic associated with receiving a message (and reacting to it), and delegates boilerplate Redis infrastructure concerns to the framework. Message streams register a subscription in Redis upon publisher subscription and unregister if the subscription gets canceled.

Furthermore, to minimize the application footprint, ReactiveRedisMessageListenerContainer allows one connection and one thread to be shared by multiple listeners even though they do not share a subscription. Thus no matter how many listeners or channels an application tracks, the runtime cost will remain the same through out its lifetime. Moreover, the container allows runtime configuration changes so one can add or remove listeners while an application is running without the need for restart. Additionally, the container uses a lazy subscription approach, using a ReactiveRedisConnection only when needed - if all the listeners are unsubscribed, cleanup is automatically performed.

The message listener container itself does not require external threading resources. It uses the driver threads to publish messages.

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

To await and ensure proper subscription, you can use the receiveLater method that returns a Mono<Flux<ChannelMessage>>. The resulting Mono completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting onNext signals, you can synchronize server-side subscriptions.

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

Subscribing via template API

As mentioned above you can directly use ReactiveRedisTemplate to subscribe to channels / patterns. This approach offers a straight forward, though limited solution as you lose the option to add subscriptions after the initial ones. Nevertheless you still can control the message stream via the returned Flux using eg. take(Duration). When done reading, on error or cancellation all bound resources are freed again.

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();