Skip to content

Commit 20dd66c

Browse files
committed
Introduce ReactorNettyClientRequestFactory
This commit introduces an implementation of ClientHttpRequestFactory based on Reactor Netty's HttpClient. Closes gh-30835
1 parent d720d6b commit 20dd66c

File tree

6 files changed

+436
-2
lines changed

6 files changed

+436
-2
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client;
18+
19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
21+
import java.net.URI;
22+
import java.time.Duration;
23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import io.netty.buffer.ByteBuf;
27+
import io.netty.buffer.ByteBufAllocator;
28+
import org.reactivestreams.FlowAdapters;
29+
import org.reactivestreams.Publisher;
30+
import reactor.core.publisher.Mono;
31+
import reactor.netty.NettyOutbound;
32+
import reactor.netty.http.client.HttpClient;
33+
import reactor.netty.http.client.HttpClientRequest;
34+
35+
import org.springframework.http.HttpHeaders;
36+
import org.springframework.http.HttpMethod;
37+
import org.springframework.lang.Nullable;
38+
import org.springframework.util.StreamUtils;
39+
40+
/**
41+
* {@link ClientHttpRequest} implementation for the Reactor-Netty HTTP client.
42+
* Created via the {@link ReactorNettyClientRequestFactory}.
43+
*
44+
* @author Arjen Poutsma
45+
* @since 6.1
46+
*/
47+
final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest {
48+
49+
private final HttpClient httpClient;
50+
51+
private final HttpMethod method;
52+
53+
private final URI uri;
54+
55+
private final Duration exchangeTimeout;
56+
57+
private final Duration readTimeout;
58+
59+
60+
public ReactorNettyClientRequest(HttpClient httpClient, URI uri, HttpMethod method,
61+
Duration exchangeTimeout, Duration readTimeout) {
62+
63+
this.httpClient = httpClient;
64+
this.method = method;
65+
this.uri = uri;
66+
this.exchangeTimeout = exchangeTimeout;
67+
this.readTimeout = readTimeout;
68+
}
69+
70+
71+
@Override
72+
public HttpMethod getMethod() {
73+
return this.method;
74+
}
75+
76+
@Override
77+
public URI getURI() {
78+
return this.uri;
79+
}
80+
81+
82+
@Override
83+
protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body body) throws IOException {
84+
HttpClient.RequestSender requestSender = this.httpClient
85+
.request(io.netty.handler.codec.http.HttpMethod.valueOf(this.method.name()));
86+
87+
requestSender = (this.uri.isAbsolute() ? requestSender.uri(this.uri) : requestSender.uri(this.uri.toString()));
88+
89+
try {
90+
ReactorNettyClientResponse result = requestSender.send((reactorRequest, nettyOutbound) ->
91+
send(headers, body, reactorRequest, nettyOutbound))
92+
.responseConnection((reactorResponse, connection) ->
93+
Mono.just(new ReactorNettyClientResponse(reactorResponse, connection, this.readTimeout)))
94+
.next()
95+
.block(this.exchangeTimeout);
96+
97+
if (result == null) {
98+
throw new IOException("HTTP exchange resulted in no result");
99+
}
100+
else {
101+
return result;
102+
}
103+
}
104+
catch (RuntimeException ex) { // Exceptions.ReactiveException is package private
105+
Throwable cause = ex.getCause();
106+
107+
if (cause instanceof UncheckedIOException uioEx) {
108+
throw uioEx.getCause();
109+
}
110+
else if (cause instanceof IOException ioEx) {
111+
throw ioEx;
112+
}
113+
else {
114+
throw ex;
115+
}
116+
}
117+
}
118+
119+
private Publisher<Void> send(HttpHeaders headers, @Nullable Body body,
120+
HttpClientRequest reactorRequest, NettyOutbound nettyOutbound) {
121+
122+
headers.forEach((key, value) -> reactorRequest.requestHeaders().set(key, value));
123+
124+
if (body != null) {
125+
AtomicReference<Executor> executor = new AtomicReference<>();
126+
127+
return nettyOutbound
128+
.withConnection(connection -> executor.set(connection.channel().eventLoop()))
129+
.send(FlowAdapters.toPublisher(OutputStreamPublisher.create(
130+
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
131+
new ByteBufMapper(nettyOutbound.alloc()),
132+
executor.getAndSet(null))));
133+
}
134+
else {
135+
return nettyOutbound;
136+
}
137+
}
138+
139+
140+
private static final class ByteBufMapper implements OutputStreamPublisher.ByteMapper<ByteBuf> {
141+
142+
private final ByteBufAllocator allocator;
143+
144+
145+
public ByteBufMapper(ByteBufAllocator allocator) {
146+
this.allocator = allocator;
147+
}
148+
149+
150+
@Override
151+
public ByteBuf map(int b) {
152+
ByteBuf byteBuf = this.allocator.buffer(1);
153+
byteBuf.writeByte(b);
154+
return byteBuf;
155+
}
156+
157+
@Override
158+
public ByteBuf map(byte[] b, int off, int len) {
159+
ByteBuf byteBuf = this.allocator.buffer(len);
160+
byteBuf.writeBytes(b, off, len);
161+
return byteBuf;
162+
}
163+
}
164+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client;
18+
19+
import java.io.IOException;
20+
import java.net.URI;
21+
import java.time.Duration;
22+
23+
import io.netty.channel.ChannelOption;
24+
import reactor.netty.http.client.HttpClient;
25+
26+
import org.springframework.http.HttpMethod;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Reactor-Netty implementation of {@link ClientHttpRequestFactory}.
31+
*
32+
* @author Arjen Poutsma
33+
* @since 6.1
34+
*/
35+
public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory {
36+
37+
private final HttpClient httpClient;
38+
39+
40+
private Duration exchangeTimeout = Duration.ofSeconds(5);
41+
42+
private Duration readTimeout = Duration.ofSeconds(10);
43+
44+
45+
46+
/**
47+
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
48+
* with a default {@link HttpClient} that has compression enabled.
49+
*/
50+
public ReactorNettyClientRequestFactory() {
51+
this(HttpClient.create().compress(true));
52+
}
53+
54+
/**
55+
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
56+
* based on the given {@link HttpClient}.
57+
* @param httpClient the client to base on
58+
*/
59+
public ReactorNettyClientRequestFactory(HttpClient httpClient) {
60+
Assert.notNull(httpClient, "HttpClient must not be null");
61+
this.httpClient = httpClient;
62+
}
63+
64+
/**
65+
* Set the underlying connect timeout in milliseconds.
66+
* A value of 0 specifies an infinite timeout.
67+
* <p>Default is 30 seconds.
68+
* @see HttpClient#option(ChannelOption, Object)
69+
* @see ChannelOption#CONNECT_TIMEOUT_MILLIS
70+
*/
71+
public void setConnectTimeout(int connectTimeout) {
72+
Assert.isTrue(connectTimeout >= 0, "Timeout must be a non-negative value");
73+
this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
74+
}
75+
76+
/**
77+
* Set the underlying connect timeout in milliseconds.
78+
* A value of 0 specifies an infinite timeout.
79+
* <p>Default is 30 seconds.
80+
* @see HttpClient#option(ChannelOption, Object)
81+
* @see ChannelOption#CONNECT_TIMEOUT_MILLIS
82+
*/
83+
public void setConnectTimeout(Duration connectTimeout) {
84+
Assert.notNull(connectTimeout, "ConnectTimeout must not be null");
85+
Assert.isTrue(!connectTimeout.isNegative(), "Timeout must be a non-negative value");
86+
this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)connectTimeout.toMillis());
87+
}
88+
89+
/**
90+
* Set the underlying read timeout in milliseconds.
91+
* <p>Default is 10 seconds.
92+
*/
93+
public void setReadTimeout(long readTimeout) {
94+
Assert.isTrue(readTimeout > 0, "Timeout must be a positive value");
95+
this.readTimeout = Duration.ofMillis(readTimeout);
96+
}
97+
98+
/**
99+
* Set the underlying read timeout as {@code Duration}.
100+
* <p>Default is 10 seconds.
101+
*/
102+
public void setReadTimeout(Duration readTimeout) {
103+
Assert.notNull(readTimeout, "ReadTimeout must not be null");
104+
Assert.isTrue(!readTimeout.isNegative(), "Timeout must be a non-negative value");
105+
this.readTimeout = readTimeout;
106+
}
107+
108+
/**
109+
* Set the timeout for the HTTP exchange in milliseconds.
110+
* <p>Default is 30 seconds.
111+
*/
112+
public void setExchangeTimeout(long exchangeTimeout) {
113+
Assert.isTrue(exchangeTimeout > 0, "Timeout must be a positive value");
114+
this.exchangeTimeout = Duration.ofMillis(exchangeTimeout);
115+
}
116+
117+
/**
118+
* Set the timeout for the HTTP exchange.
119+
* <p>Default is 30 seconds.
120+
*/
121+
public void setExchangeTimeout(Duration exchangeTimeout) {
122+
Assert.notNull(exchangeTimeout, "ExchangeTimeout must not be null");
123+
Assert.isTrue(!exchangeTimeout.isNegative(), "Timeout must be a non-negative value");
124+
this.exchangeTimeout = exchangeTimeout;
125+
}
126+
127+
128+
129+
@Override
130+
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
131+
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
132+
}
133+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.time.Duration;
22+
23+
import reactor.netty.Connection;
24+
import reactor.netty.http.client.HttpClientResponse;
25+
26+
import org.springframework.http.HttpHeaders;
27+
import org.springframework.http.HttpStatusCode;
28+
import org.springframework.http.support.Netty4HeadersAdapter;
29+
import org.springframework.lang.Nullable;
30+
31+
/**
32+
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
33+
*
34+
* @author Arjen Poutsma
35+
* @since 6.1
36+
*/
37+
final class ReactorNettyClientResponse implements ClientHttpResponse {
38+
39+
private final HttpClientResponse response;
40+
41+
private final Connection connection;
42+
43+
private final HttpHeaders headers;
44+
45+
private final Duration readTimeout;
46+
47+
@Nullable
48+
private volatile InputStream body;
49+
50+
51+
52+
public ReactorNettyClientResponse(HttpClientResponse response, Connection connection, Duration readTimeout) {
53+
this.response = response;
54+
this.connection = connection;
55+
this.readTimeout = readTimeout;
56+
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
57+
}
58+
59+
@Override
60+
public HttpStatusCode getStatusCode() {
61+
return HttpStatusCode.valueOf(this.response.status().code());
62+
}
63+
64+
@Override
65+
public String getStatusText() {
66+
return this.response.status().reasonPhrase();
67+
}
68+
69+
@Override
70+
public HttpHeaders getHeaders() {
71+
return this.headers;
72+
}
73+
74+
@Override
75+
public InputStream getBody() throws IOException {
76+
if (this.body == null) {
77+
InputStream body = this.connection.inbound().receive()
78+
.aggregate().asInputStream().block(this.readTimeout);
79+
if (body != null) {
80+
this.body = body;
81+
}
82+
else {
83+
throw new IOException("Could not receive body");
84+
}
85+
}
86+
return this.body;
87+
}
88+
89+
@Override
90+
public void close() {
91+
this.connection.dispose();
92+
}
93+
}

0 commit comments

Comments
 (0)