Skip to content

Commit 9e87705

Browse files
committed
Polishing some missed renaming
1 parent f9a421d commit 9e87705

File tree

3 files changed

+21
-19
lines changed

3 files changed

+21
-19
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import reactor.core.publisher.FluxSink;
3333

3434
/**
35+
* The {@link AbstractMessageChannel} implementation for the
36+
* Reactive Streams {@link Publisher} based on the Project Reactor {@link FluxProcessor}.
37+
*
3538
* @author Artem Bilan
3639
* @author Gary Russell
3740
*
@@ -46,8 +49,6 @@ public class FluxMessageChannel extends AbstractMessageChannel
4649

4750
private final FluxProcessor<Message<?>, Message<?>> processor;
4851

49-
private final Flux<Message<?>> flux;
50-
5152
private final FluxSink<Message<?>> sink;
5253

5354
private volatile boolean upstreamSubscribed;
@@ -59,7 +60,6 @@ public FluxMessageChannel() {
5960
public FluxMessageChannel(FluxProcessor<Message<?>, Message<?>> processor) {
6061
Assert.notNull(processor, "'processor' must not be null");
6162
this.processor = processor;
62-
this.flux = Flux.from(processor);
6363
this.sink = processor.sink();
6464
}
6565

@@ -73,7 +73,7 @@ protected boolean doSend(Message<?> message, long timeout) {
7373
public void subscribe(Subscriber<? super Message<?>> subscriber) {
7474
this.subscribers.add(subscriber);
7575

76-
this.flux.doOnCancel(() -> FluxMessageChannel.this.subscribers.remove(subscriber))
76+
this.processor.doOnCancel(() -> FluxMessageChannel.this.subscribers.remove(subscriber))
7777
.subscribe(subscriber);
7878

7979
if (!this.upstreamSubscribed) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,20 +132,20 @@ public ExecutorChannelSpec executor(String id, Executor executor) {
132132
}
133133

134134

135-
public FluxMessageChannelSpec reactive() {
136-
return MessageChannels.reactive();
135+
public FluxMessageChannelSpec flux() {
136+
return MessageChannels.flux();
137137
}
138138

139-
public FluxMessageChannelSpec reactive(String id) {
140-
return MessageChannels.reactive(id);
139+
public FluxMessageChannelSpec flux(String id) {
140+
return MessageChannels.flux(id);
141141
}
142142

143-
public FluxMessageChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
144-
return MessageChannels.reactive(processor);
143+
public FluxMessageChannelSpec flux(FluxProcessor<Message<?>, Message<?>> processor) {
144+
return MessageChannels.flux(processor);
145145
}
146146

147-
public FluxMessageChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
148-
return MessageChannels.reactive(id, processor);
147+
public FluxMessageChannelSpec flux(String id, FluxProcessor<Message<?>, Message<?>> processor) {
148+
return MessageChannels.flux(id, processor);
149149
}
150150

151151
Channels() {

spring-integration-core/src/main/java/org/springframework/integration/dsl/channel/MessageChannels.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,22 @@ public static <S extends PublishSubscribeChannelSpec<S>> PublishSubscribeChannel
126126
return MessageChannels.<S>publishSubscribe(executor).id(id);
127127
}
128128

129-
public static FluxMessageChannelSpec reactive() {
129+
public static FluxMessageChannelSpec flux() {
130130
return new FluxMessageChannelSpec();
131131
}
132132

133-
public static FluxMessageChannelSpec reactive(String id) {
134-
return reactive().id(id);
133+
public static FluxMessageChannelSpec flux(String id) {
134+
return flux()
135+
.id(id);
135136
}
136137

137-
public static FluxMessageChannelSpec reactive(FluxProcessor<Message<?>, Message<?>> processor) {
138-
return new FluxMessageChannelSpec(processor);
138+
public static FluxMessageChannelSpec flux(String id, FluxProcessor<Message<?>, Message<?>> processor) {
139+
return flux(processor)
140+
.id(id);
139141
}
140142

141-
public static FluxMessageChannelSpec reactive(String id, FluxProcessor<Message<?>, Message<?>> processor) {
142-
return reactive(processor).id(id);
143+
public static FluxMessageChannelSpec flux(FluxProcessor<Message<?>, Message<?>> processor) {
144+
return new FluxMessageChannelSpec(processor);
143145
}
144146

145147
private MessageChannels() {

0 commit comments

Comments
 (0)