Skip to content

INT-4444: Introduce @Reactive & reactive() #3503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 8, 2021

Conversation

artembilan
Copy link
Member

JIRA: https://jira.spring.io/browse/INT-4444

Right now the high-level API creates a ReactiveStreamsConsumer
only when the input channel is a Publisher<?> impl or target handler
is a ReactiveMessageHandler

  • Add @Reactive[] reactive() attribute to messaging annotations
  • Add ConsumerEndpointSpec.reactive()
    Both options point to the same ConsumerEndpointFactoryBean.setReactiveCustomizer()
    making the target endpoint always as a ReactiveStreamsConsumer independently of
    the input channel and target handler
  • Use the Function to customize a source Flux from the channel
  • Test and document a new feature

* Only one {@link Reactive} element is allowed.
* Mutually exclusive with {@link #poller()}.
*/
Reactive[] reactive() default { };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to make these arrays; given that Reactive has defaults for all its attributes (1), you can simply use default = @Reactive and use an empty value to indicate not provided.

Or default = @Reactive("not_provided").

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. @Reactive("not_provided") (or similar marker) will work. Otherwise I would argue how to determine "no reactive" then: the function is optional, but users really would like to make an endpoint reactive independently of the input channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't have a strong opinion on this, given that we do the same with @Poller - so feel free to reject the suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(all @Poller attributes have defaults too).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I think at the moment we introduced a @Poller we just didn't think about some some not_provided marker.
I believe we can break it and bring it back to a single annotation in 6.0. See ValueConstants.DEFAULT_NONE

import java.lang.annotation.Target;

/**
* Provides a reactive configuration options for the consumer endpoint making
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Provides a reactive configuration options for the consumer endpoint making
* Provides reactive configuration options for the consumer endpoint making

Comment on lines 268 to 270
Starting with version 5.5, the `ConsumerEndpointSpec` provide a `reactive()` configuration property with an optional customizer `Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>`.
This option makes the target endpoint as a `ReactiveStreamsConsumer` instance independently of the input channel type, which is turned to the `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`.
The provided function is used from the `Flux.transform()` operator to customize (`publishOn()`, `log()`, `doOnNext()` etc.) a reactive stream source from the input channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Starting with version 5.5, the `ConsumerEndpointSpec` provide a `reactive()` configuration property with an optional customizer `Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>`.
This option makes the target endpoint as a `ReactiveStreamsConsumer` instance independently of the input channel type, which is turned to the `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`.
The provided function is used from the `Flux.transform()` operator to customize (`publishOn()`, `log()`, `doOnNext()` etc.) a reactive stream source from the input channel.
Starting with version 5.5, the `ConsumerEndpointSpec` provides a `reactive()` configuration property with an optional customizer `Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>`.
This option configures the target endpoint as a `ReactiveStreamsConsumer` instance, independently of the input channel type, which is converted to a `Flux` via `IntegrationReactiveUtils.messageChannelToFlux()`.
The provided function is used from the `Flux.transform()` operator to customize (`publishOn()`, `log()`, `doOnNext()` etc.) a reactive stream source from the input channel.

@artembilan
Copy link
Member Author

Forgot to mention: let's consider an XML tag as possible external contribution. Too much places to take care about:

Poller_in_XML

Plus it is not so natural to declare Function beans in the XML config.
It is a bit awkward with that @Reactive annotation, but there is no other choice with them unless bean references...

The Java DSL is the best choice for this type of configuration! 😄

JIRA: https://jira.spring.io/browse/INT-4444

Right now the high-level API creates a `ReactiveStreamsConsumer`
only when the input channel is a `Publisher<?>` impl or target handler
is a `ReactiveMessageHandler`

* Add `@Reactive[] reactive()` attribute to messaging annotations
* Add `ConsumerEndpointSpec.reactive()`
Both options point to the same `ConsumerEndpointFactoryBean.setReactiveCustomizer()`
making the target endpoint always as a `ReactiveStreamsConsumer` independently of
the input channel and target handler
* Use the `Function` to customize a source `Flux` from the channel
* Test and document a new feature
…`@Reactive` value

with default as `@Reactive(ValueConstants.DEFAULT_NONE)`
* Fix language in docs
* Fix `MessagingAnnotationUtils.resolveAttribute()` to use `requiredType.isInstance()`
instead of comparing classes since annotation instances are `Proxy` at runtime
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants