Skip to content

Commit 58bca88

Browse files
authored
Fix reactive connection handling.
Original Pull Request #1766 Closes #1759
1 parent 4782414 commit 58bca88

File tree

2 files changed

+65
-15
lines changed

2 files changed

+65
-15
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -541,8 +541,7 @@ public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
541541
.flatMap(callback::doWithClient) //
542542
.onErrorResume(throwable -> {
543543

544-
if (throwable instanceof ConnectException) {
545-
544+
if (isCausedByConnectionException(throwable)) {
546545
return hostProvider.getActive(Verification.ACTIVE) //
547546
.flatMap(callback::doWithClient);
548547
}
@@ -551,6 +550,27 @@ public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
551550
});
552551
}
553552

553+
/**
554+
* checks if the given throwable is a {@link ConnectException} or has one in it's cause chain
555+
*
556+
* @param throwable the throwable to check
557+
* @return true if throwable is caused by a {@link ConnectException}
558+
*/
559+
private boolean isCausedByConnectionException(Throwable throwable) {
560+
561+
Throwable t = throwable;
562+
do {
563+
564+
if (t instanceof ConnectException) {
565+
return true;
566+
}
567+
568+
t = t.getCause();
569+
} while (t != null);
570+
571+
return false;
572+
}
573+
554574
@Override
555575
public Mono<Status> status() {
556576

src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java

+43-13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import reactor.util.function.Tuple2;
2121

2222
import java.net.InetSocketAddress;
23+
import java.time.Duration;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
@@ -29,6 +30,8 @@
2930
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.function.Supplier;
3132

33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3235
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
3336
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
3437
import org.springframework.data.elasticsearch.client.NoReachableHostException;
@@ -47,6 +50,8 @@
4750
*/
4851
class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
4952

53+
private final static Logger LOG = LoggerFactory.getLogger(MultiNodeHostProvider.class);
54+
5055
private final WebClientProvider clientProvider;
5156
private final Supplier<HttpHeaders> headersSupplier;
5257
private final Map<InetSocketAddress, ElasticsearchHost> hosts;
@@ -60,6 +65,8 @@ class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
6065
for (InetSocketAddress endpoint : endpoints) {
6166
this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN));
6267
}
68+
69+
LOG.debug("initialized with " + hosts);
6370
}
6471

6572
/*
@@ -68,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
6875
*/
6976
@Override
7077
public Mono<ClusterInformation> clusterInfo() {
71-
return nodes(null).map(this::updateNodeState).buffer(hosts.size())
78+
return checkNodes(null).map(this::updateNodeState).buffer(hosts.size())
7279
.then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values()))));
7380
}
7481

@@ -88,14 +95,19 @@ public WebClient createWebClient(InetSocketAddress endpoint) {
8895
@Override
8996
public Mono<InetSocketAddress> lookupActiveHost(Verification verification) {
9097

98+
LOG.trace("lookupActiveHost " + verification + " from " + hosts());
99+
91100
if (Verification.LAZY.equals(verification)) {
92101
for (ElasticsearchHost entry : hosts()) {
93102
if (entry.isOnline()) {
103+
LOG.trace("lookupActiveHost returning " + entry);
94104
return Mono.just(entry.getEndpoint());
95105
}
96106
}
107+
LOG.trace("no online host found with LAZY");
97108
}
98109

110+
LOG.trace("searching for active host");
99111
return findActiveHostInKnownActives() //
100112
.switchIfEmpty(findActiveHostInUnresolved()) //
101113
.switchIfEmpty(findActiveHostInDead()) //
@@ -107,20 +119,30 @@ Collection<ElasticsearchHost> getCachedHostState() {
107119
}
108120

109121
private Mono<InetSocketAddress> findActiveHostInKnownActives() {
110-
return findActiveForSate(State.ONLINE);
122+
return findActiveForState(State.ONLINE);
111123
}
112124

113125
private Mono<InetSocketAddress> findActiveHostInUnresolved() {
114-
return findActiveForSate(State.UNKNOWN);
126+
return findActiveForState(State.UNKNOWN);
115127
}
116128

117129
private Mono<InetSocketAddress> findActiveHostInDead() {
118-
return findActiveForSate(State.OFFLINE);
130+
return findActiveForState(State.OFFLINE);
119131
}
120132

121-
private Mono<InetSocketAddress> findActiveForSate(State state) {
122-
return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline)
123-
.map(ElasticsearchHost::getEndpoint).next();
133+
private Mono<InetSocketAddress> findActiveForState(State state) {
134+
135+
LOG.trace("findActiveForState state " + state + ", current hosts: " + hosts);
136+
137+
return checkNodes(state) //
138+
.map(this::updateNodeState) //
139+
.filter(ElasticsearchHost::isOnline) //
140+
.map(elasticsearchHost -> {
141+
LOG.trace("findActiveForState returning host " + elasticsearchHost);
142+
return elasticsearchHost;
143+
}).map(ElasticsearchHost::getEndpoint) //
144+
.takeLast(1) //
145+
.next();
124146
}
125147

126148
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple2) {
@@ -131,28 +153,36 @@ private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple
131153
return elasticsearchHost;
132154
}
133155

134-
private Flux<Tuple2<InetSocketAddress, State>> nodes(@Nullable State state) {
156+
private Flux<Tuple2<InetSocketAddress, State>> checkNodes(@Nullable State state) {
157+
158+
LOG.trace("checkNodes() with state " + state);
135159

136160
return Flux.fromIterable(hosts()) //
137161
.filter(entry -> state == null || entry.getState().equals(state)) //
138162
.map(ElasticsearchHost::getEndpoint) //
139-
.flatMap(host -> {
163+
.concatMap(host -> {
164+
165+
LOG.trace("checking host " + host);
140166

141167
Mono<ClientResponse> clientResponseMono = createWebClient(host) //
142168
.head().uri("/") //
143169
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
144170
.exchangeToMono(Mono::just) //
171+
.timeout(Duration.ofSeconds(1)) //
145172
.doOnError(throwable -> {
173+
LOG.trace("error checking host " + host + ", " + throwable.getMessage());
146174
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
147175
clientProvider.getErrorListener().accept(throwable);
148176
});
149177

150178
return Mono.just(host) //
151-
.zipWith( //
152-
clientResponseMono.flatMap(it -> it.releaseBody() //
153-
.thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
179+
.zipWith(clientResponseMono.flatMap(it -> it.releaseBody() //
180+
.thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
154181
}) //
155-
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
182+
.map(tuple -> {
183+
LOG.trace("check result " + tuple);
184+
return tuple;
185+
}).onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
156186
}
157187

158188
private List<ElasticsearchHost> hosts() {

0 commit comments

Comments
 (0)