Skip to content

Commit 0faa63b

Browse files
iromurstoyanchev
authored andcommitted
Allow use of loadbalanced RSocketRequester
See gh-498
1 parent 33ec463 commit 0faa63b

File tree

2 files changed

+52
-4
lines changed

2 files changed

+52
-4
lines changed

spring-graphql/src/main/java/org/springframework/graphql/client/DefaultRSocketGraphQlClientBuilder.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package org.springframework.graphql.client;
1818

1919
import java.net.URI;
20+
import java.util.List;
2021
import java.util.function.Consumer;
2122

23+
import io.rsocket.loadbalance.LoadbalanceStrategy;
24+
import io.rsocket.loadbalance.LoadbalanceTarget;
2225
import io.rsocket.transport.ClientTransport;
2326
import io.rsocket.transport.netty.client.TcpClientTransport;
2427
import io.rsocket.transport.netty.client.WebsocketClientTransport;
28+
import org.reactivestreams.Publisher;
2529
import reactor.core.publisher.Mono;
2630

2731
import org.springframework.lang.Nullable;
@@ -45,6 +49,10 @@ final class DefaultRSocketGraphQlClientBuilder
4549

4650
private final RSocketRequester.Builder requesterBuilder;
4751

52+
private Publisher<List<LoadbalanceTarget>> targetPublisher;
53+
54+
private LoadbalanceStrategy loadbalanceStrategy;
55+
4856
@Nullable
4957
private ClientTransport clientTransport;
5058

@@ -111,6 +119,13 @@ public DefaultRSocketGraphQlClientBuilder rsocketRequester(Consumer<RSocketReque
111119
return this;
112120
}
113121

122+
@Override
123+
public DefaultRSocketGraphQlClientBuilder transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
124+
this.targetPublisher = targetPublisher;
125+
this.loadbalanceStrategy = loadbalanceStrategy;
126+
return this;
127+
}
128+
114129
@Override
115130
public RSocketGraphQlClient build() {
116131

@@ -120,13 +135,20 @@ public RSocketGraphQlClient build() {
120135
builder.encoders(encoders -> setJsonEncoder(CodecDelegate.findJsonEncoder(encoders)));
121136
});
122137

123-
Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured");
124-
RSocketRequester requester = this.requesterBuilder.transport(this.clientTransport);
138+
RSocketRequester requester;
139+
140+
if (this.targetPublisher != null && this.loadbalanceStrategy != null) {
141+
requester = this.requesterBuilder.transports(this.targetPublisher, this.loadbalanceStrategy);
142+
} else {
143+
Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured");
144+
requester = this.requesterBuilder.transport(this.clientTransport);
145+
}
125146
RSocketGraphQlTransport graphQlTransport = new RSocketGraphQlTransport(this.route, requester, getJsonDecoder());
126147

127148
return new DefaultRSocketGraphQlClient(
128149
super.buildGraphQlClient(graphQlTransport), requester,
129-
this.requesterBuilder, this.clientTransport, this.route, getBuilderInitializer());
150+
this.requesterBuilder, this.clientTransport, this.targetPublisher, this.loadbalanceStrategy,
151+
this.route, getBuilderInitializer());
130152
}
131153

132154

@@ -141,19 +163,26 @@ private static class DefaultRSocketGraphQlClient extends AbstractDelegatingGraph
141163

142164
private final ClientTransport clientTransport;
143165

166+
private final Publisher<List<LoadbalanceTarget>> targetPublisher;
167+
168+
private final LoadbalanceStrategy loadbalanceStrategy;
169+
144170
private final String route;
145171

146172
private final Consumer<AbstractGraphQlClientBuilder<?>> builderInitializer;
147173

148174
DefaultRSocketGraphQlClient(
149175
GraphQlClient graphQlClient, RSocketRequester requester, RSocketRequester.Builder requesterBuilder,
150-
ClientTransport clientTransport, String route, Consumer<AbstractGraphQlClientBuilder<?>> builderInitializer) {
176+
ClientTransport clientTransport, Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy,
177+
String route, Consumer<AbstractGraphQlClientBuilder<?>> builderInitializer) {
151178

152179
super(graphQlClient);
153180

154181
this.requester = requester;
155182
this.requesterBuilder = requesterBuilder;
156183
this.clientTransport = clientTransport;
184+
this.targetPublisher = targetPublisher;
185+
this.loadbalanceStrategy = loadbalanceStrategy;
157186
this.route = route;
158187
this.builderInitializer = builderInitializer;
159188
}
@@ -174,6 +203,7 @@ public Mono<Void> stop() {
174203
public RSocketGraphQlClient.Builder<?> mutate() {
175204
DefaultRSocketGraphQlClientBuilder builder = new DefaultRSocketGraphQlClientBuilder(this.requesterBuilder);
176205
builder.clientTransport(this.clientTransport);
206+
builder.transports(this.targetPublisher, this.loadbalanceStrategy);
177207
builder.route(this.route);
178208
this.builderInitializer.accept(builder);
179209
return builder;

spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
package org.springframework.graphql.client;
1818

1919
import java.net.URI;
20+
import java.util.List;
2021
import java.util.function.Consumer;
2122

2223
import io.rsocket.core.RSocketClient;
24+
import io.rsocket.loadbalance.LoadbalanceStrategy;
25+
import io.rsocket.loadbalance.LoadbalanceTarget;
2326
import io.rsocket.transport.ClientTransport;
27+
import org.reactivestreams.Publisher;
2428
import reactor.core.publisher.Mono;
2529

2630
import org.springframework.messaging.rsocket.RSocketRequester;
@@ -129,6 +133,20 @@ interface Builder<B extends Builder<B>> extends GraphQlClient.Builder<B> {
129133
*/
130134
B rsocketRequester(Consumer<RSocketRequester.Builder> requester);
131135

136+
/**
137+
* Build an {@link RSocketRequester} with an
138+
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient} that will
139+
* connect to one of the given targets selected through the given
140+
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient}.
141+
* @param targetPublisher a {@code Publisher} that supplies a list of
142+
* target transports to loadbalance against; the given list may be
143+
* periodically updated by the {@code Publisher}.
144+
* @param loadbalanceStrategy the strategy to use for selecting from
145+
* the list of loadbalance targets.
146+
* @return the same builder instance
147+
*/
148+
B transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy);
149+
132150
/**
133151
* Build the {@code RSocketGraphQlClient} instance.
134152
*/

0 commit comments

Comments
 (0)