@@ -63,31 +63,40 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
63
63
private static final Function <HttpClient , HttpClient > defaultInitializer = client -> client .compress (true );
64
64
65
65
66
- private HttpClient httpClient ;
67
-
68
66
@ Nullable
69
67
private final ReactorResourceFactory resourceFactory ;
70
68
71
69
@ Nullable
72
70
private final Function <HttpClient , HttpClient > mapper ;
73
71
74
- private volatile boolean running = true ;
72
+ @ Nullable
73
+ private volatile HttpClient httpClient ;
75
74
76
75
private final Object lifecycleMonitor = new Object ();
77
76
78
77
79
78
/**
80
79
* Default constructor. Initializes {@link HttpClient} via:
81
- * <pre class="code">
82
- * HttpClient.create().compress()
83
- * </pre>
80
+ * <pre class="code">HttpClient.create().compress(true)</pre>
84
81
*/
85
82
public ReactorClientHttpConnector () {
86
83
this .httpClient = defaultInitializer .apply (HttpClient .create ());
87
84
this .resourceFactory = null ;
88
85
this .mapper = null ;
89
86
}
90
87
88
+ /**
89
+ * Constructor with a pre-configured {@code HttpClient} instance.
90
+ * @param httpClient the client to use
91
+ * @since 5.1
92
+ */
93
+ public ReactorClientHttpConnector (HttpClient httpClient ) {
94
+ Assert .notNull (httpClient , "HttpClient is required" );
95
+ this .httpClient = httpClient ;
96
+ this .resourceFactory = null ;
97
+ this .mapper = null ;
98
+ }
99
+
91
100
/**
92
101
* Constructor with externally managed Reactor Netty resources, including
93
102
* {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
@@ -107,50 +116,34 @@ public ReactorClientHttpConnector() {
107
116
* @since 5.1
108
117
*/
109
118
public ReactorClientHttpConnector (ReactorResourceFactory resourceFactory , Function <HttpClient , HttpClient > mapper ) {
110
- this .httpClient = createHttpClient (resourceFactory , mapper );
111
119
this .resourceFactory = resourceFactory ;
112
120
this .mapper = mapper ;
121
+ if (resourceFactory .isRunning ()) {
122
+ this .httpClient = createHttpClient (resourceFactory , mapper );
123
+ }
113
124
}
114
125
115
- private static HttpClient createHttpClient (ReactorResourceFactory resourceFactory , Function <HttpClient , HttpClient > mapper ) {
116
- ConnectionProvider provider = resourceFactory .getConnectionProvider ();
117
- Assert .notNull (provider , "No ConnectionProvider: is ReactorResourceFactory not initialized yet?" );
118
- return defaultInitializer .andThen (mapper ).andThen (applyLoopResources (resourceFactory ))
119
- .apply (HttpClient .create (provider ));
120
- }
121
-
122
- private static Function <HttpClient , HttpClient > applyLoopResources (ReactorResourceFactory factory ) {
123
- return httpClient -> {
124
- LoopResources resources = factory .getLoopResources ();
125
- Assert .notNull (resources , "No LoopResources: is ReactorResourceFactory not initialized yet?" );
126
- return httpClient .runOn (resources );
127
- };
128
- }
129
-
130
-
131
- /**
132
- * Constructor with a pre-configured {@code HttpClient} instance.
133
- * @param httpClient the client to use
134
- * @since 5.1
135
- */
136
- public ReactorClientHttpConnector (HttpClient httpClient ) {
137
- Assert .notNull (httpClient , "HttpClient is required" );
138
- this .httpClient = httpClient ;
139
- this .resourceFactory = null ;
140
- this .mapper = null ;
126
+ private static HttpClient createHttpClient (ReactorResourceFactory factory , Function <HttpClient , HttpClient > mapper ) {
127
+ return defaultInitializer .andThen (mapper ).andThen (httpClient -> httpClient .runOn (factory .getLoopResources ()))
128
+ .apply (HttpClient .create (factory .getConnectionProvider ()));
141
129
}
142
130
143
131
144
132
@ Override
145
133
public Mono <ClientHttpResponse > connect (HttpMethod method , URI uri ,
146
134
Function <? super ClientHttpRequest , Mono <Void >> requestCallback ) {
147
135
148
- AtomicReference <ReactorClientHttpResponse > responseRef = new AtomicReference <>();
136
+ HttpClient httpClient = this .httpClient ;
137
+ if (httpClient == null ) {
138
+ Assert .state (this .resourceFactory != null && this .mapper != null , "Illegal configuration" );
139
+ httpClient = createHttpClient (this .resourceFactory , this .mapper );
140
+ }
149
141
150
- HttpClient .RequestSender requestSender = this . httpClient
142
+ HttpClient .RequestSender requestSender = httpClient
151
143
.request (io .netty .handler .codec .http .HttpMethod .valueOf (method .name ()));
152
144
153
145
requestSender = setUri (requestSender , uri );
146
+ AtomicReference <ReactorClientHttpResponse > responseRef = new AtomicReference <>();
154
147
155
148
return requestSender
156
149
.send ((request , outbound ) -> requestCallback .apply (adaptRequest (method , uri , request , outbound )))
@@ -185,46 +178,34 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl
185
178
return new ReactorClientHttpRequest (method , uri , request , nettyOutbound );
186
179
}
187
180
181
+
188
182
@ Override
189
183
public void start () {
190
- synchronized (this .lifecycleMonitor ) {
191
- if (! isRunning () ) {
192
- if (this .resourceFactory != null && this . mapper ! = null ) {
184
+ if (this .resourceFactory != null && this . mapper != null ) {
185
+ synchronized ( this . lifecycleMonitor ) {
186
+ if (this .httpClient = = null ) {
193
187
this .httpClient = createHttpClient (this .resourceFactory , this .mapper );
194
188
}
195
- else {
196
- logger .warn ("Restarting a ReactorClientHttpConnector bean is only supported with externally managed Reactor Netty resources" );
197
- }
198
- this .running = true ;
199
189
}
200
190
}
191
+ else {
192
+ logger .warn ("Restarting a ReactorClientHttpConnector bean is only supported " +
193
+ "with externally managed Reactor Netty resources" );
194
+ }
201
195
}
202
196
203
197
@ Override
204
198
public void stop () {
205
- synchronized (this .lifecycleMonitor ) {
206
- if ( isRunning () ) {
207
- this .running = false ;
199
+ if (this .resourceFactory != null && this . mapper != null ) {
200
+ synchronized ( this . lifecycleMonitor ) {
201
+ this .httpClient = null ;
208
202
}
209
203
}
210
204
}
211
205
212
- @ Override
213
- public final void stop (Runnable callback ) {
214
- synchronized (this .lifecycleMonitor ) {
215
- stop ();
216
- callback .run ();
217
- }
218
- }
219
-
220
206
@ Override
221
207
public boolean isRunning () {
222
- return this .running ;
223
- }
224
-
225
- @ Override
226
- public boolean isAutoStartup () {
227
- return false ;
208
+ return (this .httpClient != null );
228
209
}
229
210
230
211
@ Override
0 commit comments