Skip to content

Commit 044da79

Browse files
committed
Polishing ReactorClientHttpRequestFactory
1 parent 89d56b1 commit 044da79

File tree

3 files changed

+91
-98
lines changed

3 files changed

+91
-98
lines changed

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java

+35-39
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
5858
private final Duration readTimeout;
5959

6060

61-
public ReactorClientHttpRequest(HttpClient httpClient, URI uri, HttpMethod method,
62-
Duration exchangeTimeout, Duration readTimeout) {
61+
public ReactorClientHttpRequest(
62+
HttpClient httpClient, URI uri, HttpMethod method, Duration exchangeTimeout, Duration readTimeout) {
6363

6464
this.httpClient = httpClient;
6565
this.method = method;
@@ -82,55 +82,51 @@ public URI getURI() {
8282

8383
@Override
8484
protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body body) throws IOException {
85-
HttpClient.RequestSender requestSender = this.httpClient
85+
86+
HttpClient.RequestSender sender = this.httpClient
8687
.request(io.netty.handler.codec.http.HttpMethod.valueOf(this.method.name()));
8788

88-
requestSender = (this.uri.isAbsolute() ? requestSender.uri(this.uri) : requestSender.uri(this.uri.toString()));
89+
sender = (this.uri.isAbsolute() ? sender.uri(this.uri) : sender.uri(this.uri.toString()));
8990

9091
try {
91-
ReactorClientHttpResponse result = requestSender.send((reactorRequest, nettyOutbound) ->
92-
send(headers, body, reactorRequest, nettyOutbound))
93-
.responseConnection((reactorResponse, connection) ->
94-
Mono.just(new ReactorClientHttpResponse(reactorResponse, connection, this.readTimeout)))
95-
.next()
96-
.block(this.exchangeTimeout);
92+
ReactorClientHttpResponse result =
93+
sender.send((request, outbound) -> send(headers, body, request, outbound))
94+
.responseConnection((response, conn) ->
95+
Mono.just(new ReactorClientHttpResponse(response, conn, this.readTimeout)))
96+
.next()
97+
.block(this.exchangeTimeout);
9798

9899
if (result == null) {
99100
throw new IOException("HTTP exchange resulted in no result");
100101
}
101-
else {
102-
return result;
103-
}
102+
103+
return result;
104104
}
105105
catch (RuntimeException ex) {
106106
throw convertException(ex);
107107
}
108108
}
109109

110-
private Publisher<Void> send(HttpHeaders headers, @Nullable Body body,
111-
HttpClientRequest reactorRequest, NettyOutbound nettyOutbound) {
112-
113-
headers.forEach((key, value) -> reactorRequest.requestHeaders().set(key, value));
110+
private Publisher<Void> send(
111+
HttpHeaders headers, @Nullable Body body, HttpClientRequest request, NettyOutbound outbound) {
114112

115-
if (body != null) {
116-
ByteBufMapper byteMapper = new ByteBufMapper(nettyOutbound.alloc());
117-
AtomicReference<Executor> executor = new AtomicReference<>();
113+
headers.forEach((key, value) -> request.requestHeaders().set(key, value));
118114

119-
return nettyOutbound
120-
.withConnection(connection -> executor.set(connection.channel().eventLoop()))
121-
.send(FlowAdapters.toPublisher(new OutputStreamPublisher<>(
122-
os -> body.writeTo(StreamUtils.nonClosing(os)), byteMapper,
123-
executor.getAndSet(null), null)));
124-
}
125-
else {
126-
return nettyOutbound;
115+
if (body == null) {
116+
return outbound;
127117
}
118+
119+
AtomicReference<Executor> executorRef = new AtomicReference<>();
120+
121+
return outbound
122+
.withConnection(connection -> executorRef.set(connection.channel().eventLoop()))
123+
.send(FlowAdapters.toPublisher(new OutputStreamPublisher<>(
124+
os -> body.writeTo(StreamUtils.nonClosing(os)), new ByteBufMapper(outbound),
125+
executorRef.getAndSet(null), null)));
128126
}
129127

130128
static IOException convertException(RuntimeException ex) {
131-
// Exceptions.ReactiveException is package private
132-
Throwable cause = ex.getCause();
133-
129+
Throwable cause = ex.getCause(); // Exceptions.ReactiveException is private
134130
if (cause instanceof IOException ioEx) {
135131
return ioEx;
136132
}
@@ -148,22 +144,22 @@ private static final class ByteBufMapper implements OutputStreamPublisher.ByteMa
148144

149145
private final ByteBufAllocator allocator;
150146

151-
public ByteBufMapper(ByteBufAllocator allocator) {
152-
this.allocator = allocator;
147+
public ByteBufMapper(NettyOutbound outbound) {
148+
this.allocator = outbound.alloc();
153149
}
154150

155151
@Override
156152
public ByteBuf map(int b) {
157-
ByteBuf byteBuf = this.allocator.buffer(1);
158-
byteBuf.writeByte(b);
159-
return byteBuf;
153+
ByteBuf buf = this.allocator.buffer(1);
154+
buf.writeByte(b);
155+
return buf;
160156
}
161157

162158
@Override
163159
public ByteBuf map(byte[] b, int off, int len) {
164-
ByteBuf byteBuf = this.allocator.buffer(len);
165-
byteBuf.writeBytes(b, off, len);
166-
return byteBuf;
160+
ByteBuf buf = this.allocator.buffer(len);
161+
buf.writeBytes(b, off, len);
162+
return buf;
167163
}
168164
}
169165

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java

+49-56
Original file line numberDiff line numberDiff line change
@@ -71,59 +71,68 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
7171

7272

7373
/**
74-
* Create a new instance of the {@code ReactorClientHttpRequestFactory}
75-
* with a default {@link HttpClient} that has compression enabled.
74+
* Constructor with default client, created via {@link HttpClient#create()},
75+
* and with {@link HttpClient#compress compression} enabled.
7676
*/
7777
public ReactorClientHttpRequestFactory() {
78-
this.httpClient = defaultInitializer.apply(HttpClient.create());
79-
this.resourceFactory = null;
80-
this.mapper = null;
78+
this(defaultInitializer.apply(HttpClient.create()));
8179
}
8280

8381
/**
84-
* Create a new instance of the {@code ReactorClientHttpRequestFactory}
85-
* based on the given {@link HttpClient}.
86-
* @param httpClient the client to base on
82+
* Constructor with a given {@link HttpClient} instance.
83+
* @param client the client to use
8784
*/
88-
public ReactorClientHttpRequestFactory(HttpClient httpClient) {
89-
Assert.notNull(httpClient, "HttpClient must not be null");
90-
this.httpClient = httpClient;
85+
public ReactorClientHttpRequestFactory(HttpClient client) {
86+
Assert.notNull(client, "HttpClient must not be null");
9187
this.resourceFactory = null;
9288
this.mapper = null;
89+
this.httpClient = client;
9390
}
9491

9592
/**
9693
* Constructor with externally managed Reactor Netty resources, including
9794
* {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
98-
* for the connection pool.
99-
* <p>This constructor should be used only when you don't want the client
100-
* to participate in the Reactor Netty global resources. By default the
101-
* client participates in the Reactor Netty global resources held in
102-
* {@link reactor.netty.http.HttpResources}, which is recommended since
103-
* fixed, shared resources are favored for event loop concurrency. However,
104-
* consider declaring a {@link ReactorResourceFactory} bean with
105-
* {@code globalResources=true} in order to ensure the Reactor Netty global
106-
* resources are shut down when the Spring ApplicationContext is stopped or closed
107-
* and restarted properly when the Spring ApplicationContext is
108-
* (with JVM Checkpoint Restore for example).
109-
* @param resourceFactory the resource factory to obtain the resources from
110-
* @param mapper a mapper for further initialization of the created client
95+
* for connection pooling.
96+
* <p>Generally, it is recommended to share resources for event loop
97+
* concurrency. This can be achieved either by participating in the JVM-wide,
98+
* global resources held in {@link reactor.netty.http.HttpResources}, or by
99+
* using a specific, shared set of resources through a
100+
* {@link ReactorResourceFactory} bean. The latter can ensure that resources
101+
* are shut down when the Spring ApplicationContext is stopped/closed and
102+
* restarted again (e.g. JVM checkpoint restore).
103+
* @param resourceFactory the resource factory to get resources from
104+
* @param mapper for further initialization of the client
111105
*/
112-
public ReactorClientHttpRequestFactory(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
106+
public ReactorClientHttpRequestFactory(
107+
ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
108+
113109
this.resourceFactory = resourceFactory;
114110
this.mapper = mapper;
115111
if (resourceFactory.isRunning()) {
116112
this.httpClient = createHttpClient(resourceFactory, mapper);
117113
}
118114
}
119115

116+
private HttpClient createHttpClient(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
117+
HttpClient client = HttpClient.create(factory.getConnectionProvider());
118+
client = defaultInitializer.andThen(mapper).apply(client);
119+
client = client.runOn(factory.getLoopResources());
120+
if (this.connectTimeout != null) {
121+
client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
122+
}
123+
return client;
124+
}
125+
120126

121127
/**
122-
* Set the underlying connect timeout in milliseconds.
123-
* A value of 0 specifies an infinite timeout.
124-
* <p>Default is 30 seconds.
128+
* Set the underlying connect timeout value on the underlying client.
129+
* Effectively, a shortcut for
130+
* {@code httpClient.option(CONNECT_TIMEOUT_MILLIS, timeout)}.
131+
* <p>By default, set to 30 seconds.
132+
* @param connectTimeout the timeout value in millis; use 0 to never time out.
125133
* @see HttpClient#option(ChannelOption, Object)
126134
* @see ChannelOption#CONNECT_TIMEOUT_MILLIS
135+
* @see <a href="https://projectreactor.io/docs/netty/release/reference/index.html#connection-timeout">Connection Timeout</a>
127136
*/
128137
public void setConnectTimeout(int connectTimeout) {
129138
Assert.isTrue(connectTimeout >= 0, "Timeout must be a non-negative value");
@@ -135,11 +144,7 @@ public void setConnectTimeout(int connectTimeout) {
135144
}
136145

137146
/**
138-
* Set the underlying connect timeout in milliseconds.
139-
* A value of 0 specifies an infinite timeout.
140-
* <p>Default is 30 seconds.
141-
* @see HttpClient#option(ChannelOption, Object)
142-
* @see ChannelOption#CONNECT_TIMEOUT_MILLIS
147+
* Variant of {@link #setConnectTimeout(int)} with a {@link Duration} value.
143148
*/
144149
public void setConnectTimeout(Duration connectTimeout) {
145150
Assert.notNull(connectTimeout, "ConnectTimeout must not be null");
@@ -156,13 +161,11 @@ public void setReadTimeout(long readTimeout) {
156161
}
157162

158163
/**
159-
* Set the underlying read timeout as {@code Duration}.
160-
* <p>Default is 10 seconds.
164+
* Variant of {@link #setConnectTimeout(int)} with a {@link Duration} value.
161165
*/
162166
public void setReadTimeout(Duration readTimeout) {
163167
Assert.notNull(readTimeout, "ReadTimeout must not be null");
164-
Assert.isTrue(!readTimeout.isNegative(), "Timeout must be a non-negative value");
165-
this.readTimeout = readTimeout;
168+
setReadTimeout((int) readTimeout.toMillis());
166169
}
167170

168171
/**
@@ -180,29 +183,20 @@ public void setExchangeTimeout(long exchangeTimeout) {
180183
*/
181184
public void setExchangeTimeout(Duration exchangeTimeout) {
182185
Assert.notNull(exchangeTimeout, "ExchangeTimeout must not be null");
183-
Assert.isTrue(!exchangeTimeout.isNegative(), "Timeout must be a non-negative value");
184-
this.exchangeTimeout = exchangeTimeout;
185-
}
186-
187-
private HttpClient createHttpClient(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
188-
HttpClient httpClient = defaultInitializer.andThen(mapper)
189-
.apply(HttpClient.create(factory.getConnectionProvider()));
190-
httpClient = httpClient.runOn(factory.getLoopResources());
191-
if (this.connectTimeout != null) {
192-
httpClient = httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
193-
}
194-
return httpClient;
186+
setExchangeTimeout((int) exchangeTimeout.toMillis());
195187
}
196188

197189

198190
@Override
199191
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
200-
HttpClient httpClient = this.httpClient;
201-
if (httpClient == null) {
202-
Assert.state(this.resourceFactory != null && this.mapper != null, "Illegal configuration");
203-
httpClient = createHttpClient(this.resourceFactory, this.mapper);
192+
HttpClient client = this.httpClient;
193+
if (client == null) {
194+
Assert.state(this.resourceFactory != null && this.mapper != null,
195+
"Expected HttpClient or ResourceFactory and mapper");
196+
client = createHttpClient(this.resourceFactory, this.mapper);
204197
}
205-
return new ReactorClientHttpRequest(httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
198+
return new ReactorClientHttpRequest(
199+
client, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
206200
}
207201

208202

@@ -237,8 +231,7 @@ public boolean isRunning() {
237231

238232
@Override
239233
public int getPhase() {
240-
// Start after ReactorResourceFactory
241-
return 1;
234+
return 1; // start after ReactorResourceFactory (0)
242235
}
243236

244237
}

spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpResponse.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,14 @@ final class ReactorClientHttpResponse implements ClientHttpResponse {
5252
private volatile InputStream body;
5353

5454

55-
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection, Duration readTimeout) {
55+
public ReactorClientHttpResponse(
56+
HttpClientResponse response, Connection connection, Duration readTimeout) {
57+
5658
this.response = response;
5759
this.connection = connection;
5860
this.readTimeout = readTimeout;
59-
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
61+
this.headers = HttpHeaders.readOnlyHttpHeaders(
62+
new Netty4HeadersAdapter(response.responseHeaders()));
6063
}
6164

6265

@@ -106,7 +109,8 @@ public void close() {
106109
StreamUtils.drain(body);
107110
body.close();
108111
}
109-
catch (IOException ignored) {
112+
catch (IOException ex) {
113+
// ignore
110114
}
111115
}
112116

0 commit comments

Comments
 (0)