17
17
package org .springframework .graphql .client ;
18
18
19
19
import java .net .URI ;
20
+ import java .util .List ;
20
21
import java .util .function .Consumer ;
21
22
23
+ import io .rsocket .loadbalance .LoadbalanceStrategy ;
24
+ import io .rsocket .loadbalance .LoadbalanceTarget ;
22
25
import io .rsocket .transport .ClientTransport ;
23
26
import io .rsocket .transport .netty .client .TcpClientTransport ;
24
27
import io .rsocket .transport .netty .client .WebsocketClientTransport ;
28
+ import org .reactivestreams .Publisher ;
25
29
import reactor .core .publisher .Mono ;
26
30
27
31
import org .springframework .lang .Nullable ;
@@ -45,6 +49,12 @@ final class DefaultRSocketGraphQlClientBuilder
45
49
46
50
private final RSocketRequester .Builder requesterBuilder ;
47
51
52
+ @ Nullable
53
+ private Publisher <List <LoadbalanceTarget >> targetPublisher ;
54
+
55
+ @ Nullable
56
+ private LoadbalanceStrategy loadbalanceStrategy ;
57
+
48
58
@ Nullable
49
59
private ClientTransport clientTransport ;
50
60
@@ -87,8 +97,17 @@ public DefaultRSocketGraphQlClientBuilder webSocket(URI uri) {
87
97
}
88
98
89
99
@ Override
90
- public DefaultRSocketGraphQlClientBuilder clientTransport (ClientTransport clientTransport ) {
91
- this .clientTransport = clientTransport ;
100
+ public DefaultRSocketGraphQlClientBuilder clientTransport (ClientTransport transport ) {
101
+ this .clientTransport = transport ;
102
+ return this ;
103
+ }
104
+
105
+ @ Override
106
+ public DefaultRSocketGraphQlClientBuilder clientTransports (
107
+ Publisher <List <LoadbalanceTarget >> publisher , LoadbalanceStrategy strategy ) {
108
+
109
+ this .targetPublisher = publisher ;
110
+ this .loadbalanceStrategy = strategy ;
92
111
return this ;
93
112
}
94
113
@@ -106,8 +125,8 @@ public DefaultRSocketGraphQlClientBuilder route(String route) {
106
125
}
107
126
108
127
@ Override
109
- public DefaultRSocketGraphQlClientBuilder rsocketRequester (Consumer <RSocketRequester .Builder > requesterConsumer ) {
110
- requesterConsumer .accept (this .requesterBuilder );
128
+ public DefaultRSocketGraphQlClientBuilder rsocketRequester (Consumer <RSocketRequester .Builder > consumer ) {
129
+ consumer .accept (this .requesterBuilder );
111
130
return this ;
112
131
}
113
132
@@ -120,13 +139,25 @@ public RSocketGraphQlClient build() {
120
139
builder .encoders (encoders -> setJsonEncoder (CodecDelegate .findJsonEncoder (encoders )));
121
140
});
122
141
123
- Assert .state (this .clientTransport != null , "Neither WebSocket nor TCP networking configured" );
124
- RSocketRequester requester = this .requesterBuilder .transport (this .clientTransport );
125
- RSocketGraphQlTransport graphQlTransport = new RSocketGraphQlTransport (this .route , requester , getJsonDecoder ());
142
+ RSocketRequester requester ;
143
+
144
+ if (this .clientTransport != null ) {
145
+ requester = this .requesterBuilder .transport (this .clientTransport );
146
+ }
147
+ else if (this .targetPublisher != null && this .loadbalanceStrategy != null ) {
148
+ requester = this .requesterBuilder .transports (this .targetPublisher , this .loadbalanceStrategy );
149
+ }
150
+ else {
151
+ throw new IllegalStateException ("Neither ClientTransport, nor Loadbalance targets and strategy" );
152
+ }
153
+
154
+ RSocketGraphQlTransport graphQlTransport =
155
+ new RSocketGraphQlTransport (this .route , requester , getJsonDecoder ());
126
156
127
157
return new DefaultRSocketGraphQlClient (
128
158
super .buildGraphQlClient (graphQlTransport ), requester ,
129
- this .requesterBuilder , this .clientTransport , this .route , getBuilderInitializer ());
159
+ this .requesterBuilder , this .clientTransport , this .targetPublisher , this .loadbalanceStrategy ,
160
+ this .route , getBuilderInitializer ());
130
161
}
131
162
132
163
@@ -139,21 +170,33 @@ private static class DefaultRSocketGraphQlClient extends AbstractDelegatingGraph
139
170
140
171
private final RSocketRequester .Builder requesterBuilder ;
141
172
173
+ @ Nullable
142
174
private final ClientTransport clientTransport ;
143
175
176
+ @ Nullable
177
+ private final Publisher <List <LoadbalanceTarget >> targetPublisher ;
178
+
179
+ @ Nullable
180
+ private final LoadbalanceStrategy loadbalanceStrategy ;
181
+
144
182
private final String route ;
145
183
146
184
private final Consumer <AbstractGraphQlClientBuilder <?>> builderInitializer ;
147
185
148
186
DefaultRSocketGraphQlClient (
149
- GraphQlClient graphQlClient , RSocketRequester requester , RSocketRequester .Builder requesterBuilder ,
150
- ClientTransport clientTransport , String route , Consumer <AbstractGraphQlClientBuilder <?>> builderInitializer ) {
187
+ GraphQlClient graphQlClient ,
188
+ RSocketRequester requester , RSocketRequester .Builder requesterBuilder ,
189
+ @ Nullable ClientTransport clientTransport ,
190
+ @ Nullable Publisher <List <LoadbalanceTarget >> targetPublisher , @ Nullable LoadbalanceStrategy strategy ,
191
+ String route , Consumer <AbstractGraphQlClientBuilder <?>> builderInitializer ) {
151
192
152
193
super (graphQlClient );
153
194
154
195
this .requester = requester ;
155
196
this .requesterBuilder = requesterBuilder ;
156
197
this .clientTransport = clientTransport ;
198
+ this .targetPublisher = targetPublisher ;
199
+ this .loadbalanceStrategy = strategy ;
157
200
this .route = route ;
158
201
this .builderInitializer = builderInitializer ;
159
202
}
@@ -173,7 +216,12 @@ public Mono<Void> stop() {
173
216
@ Override
174
217
public RSocketGraphQlClient .Builder <?> mutate () {
175
218
DefaultRSocketGraphQlClientBuilder builder = new DefaultRSocketGraphQlClientBuilder (this .requesterBuilder );
176
- builder .clientTransport (this .clientTransport );
219
+ if (this .clientTransport != null ) {
220
+ builder .clientTransport (this .clientTransport );
221
+ }
222
+ if (this .targetPublisher != null && this .loadbalanceStrategy != null ) {
223
+ builder .clientTransports (this .targetPublisher , this .loadbalanceStrategy );
224
+ }
177
225
builder .route (this .route );
178
226
this .builderInitializer .accept (builder );
179
227
return builder ;
0 commit comments