Skip to content

Add support for SubscriptionListener using Pub/Sub and introduce ReactiveRedisMessageListenerContainer.receiveLater(…) #2052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

mp911de
Copy link
Member

@mp911de mp911de commented Apr 26, 2021

Support for SubscriptionListener when using MessageListener for subscription confirmation callbacks. ReactiveRedisMessageListenerContainer and ReactiveRedisOperations provide receiveLater(…) and listenToLater(…) methods to await until Redis acknowledges the subscription.

A MessageListener can additionally implement SubscriptionListener to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.

To await and ensure proper subscription, you can use the receiveLater method that returns a Mono<Flux<ChannelMessage>>.
The resulting Mono completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting onNext signals, you can synchronize server-side subscriptions.

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

Partially addresses #1622 as this is the required infrastructure to rewrite RedisMessageListenerContainer.start() and addMessageListener(…) into a blocking form.

Closes #2054

@mp911de mp911de added the type: enhancement A general enhancement label Apr 26, 2021
@mp911de mp911de requested a review from christophstrobl April 26, 2021 15:13
christophstrobl pushed a commit that referenced this pull request May 25, 2021
christophstrobl pushed a commit that referenced this pull request May 25, 2021
Add missing (at)Override annotations.

Original Pull Request: #2052
christophstrobl pushed a commit that referenced this pull request May 25, 2021
christophstrobl pushed a commit that referenced this pull request May 25, 2021
Replace MonoProcessor with Sinks.

Original Pull Request: #2052
christophstrobl pushed a commit that referenced this pull request May 25, 2021
christophstrobl pushed a commit that referenced this pull request May 25, 2021
christophstrobl pushed a commit that referenced this pull request May 25, 2021
christophstrobl added a commit that referenced this pull request May 25, 2021
Add explicit cast update documentation and rename SubcriptionListener instance to NO_OP_...
Fix Java 16 compile error.

Original Pull Request: #2052
@christophstrobl christophstrobl deleted the issue/gh-1622 branch May 25, 2021 07:16
@christophstrobl christophstrobl added this to the 2.6 M1 (2021.1.0) milestone May 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for SubscriptionListener to listen to [P][UN]SUBSCRIBE notifications
2 participants