Skip to content

Commit b7ec0a9

Browse files
committed
Fix reactive connection handling.
Original Pull Request #1766 Closes #1759 (cherry picked from commit 58bca88)
1 parent 3e20810 commit b7ec0a9

File tree

7 files changed

+76
-27
lines changed

7 files changed

+76
-27
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@
134134
*/
135135
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
136136

137-
private final HostProvider hostProvider;
137+
private final HostProvider<?> hostProvider;
138138
private final RequestCreator requestCreator;
139139
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
140140

@@ -144,7 +144,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
144144
*
145145
* @param hostProvider must not be {@literal null}.
146146
*/
147-
public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
147+
public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider) {
148148
this(hostProvider, new DefaultRequestCreator());
149149
}
150150

@@ -155,7 +155,7 @@ public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
155155
* @param hostProvider must not be {@literal null}.
156156
* @param requestCreator must not be {@literal null}.
157157
*/
158-
public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) {
158+
public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider, RequestCreator requestCreator) {
159159

160160
Assert.notNull(hostProvider, "HostProvider must not be null");
161161
Assert.notNull(requestCreator, "RequestCreator must not be null");
@@ -639,8 +639,7 @@ public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback
639639
.flatMap(callback::doWithClient) //
640640
.onErrorResume(throwable -> {
641641

642-
if (throwable instanceof ConnectException) {
643-
642+
if (isCausedByConnectionException(throwable)) {
644643
return hostProvider.getActive(Verification.ACTIVE) //
645644
.flatMap(callback::doWithClient);
646645
}
@@ -649,6 +648,27 @@ public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback
649648
});
650649
}
651650

651+
/**
652+
* checks if the given throwable is a {@link ConnectException} or has one in it's cause chain
653+
*
654+
* @param throwable the throwable to check
655+
* @return true if throwable is caused by a {@link ConnectException}
656+
*/
657+
private boolean isCausedByConnectionException(Throwable throwable) {
658+
659+
Throwable t = throwable;
660+
do {
661+
662+
if (t instanceof ConnectException) {
663+
return true;
664+
}
665+
666+
t = t.getCause();
667+
} while (t != null);
668+
669+
return false;
670+
}
671+
652672
@Override
653673
public Mono<Status> status() {
654674

src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,9 +34,10 @@
3434
*
3535
* @author Christoph Strobl
3636
* @author Mark Paluch
37+
* @author Peter-Josef Meisch
3738
* @since 3.2
3839
*/
39-
public interface HostProvider {
40+
public interface HostProvider<T extends HostProvider<T>> {
4041

4142
/**
4243
* Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts.
@@ -46,7 +47,7 @@ public interface HostProvider {
4647
* @param endpoints must not be {@literal null} nor empty.
4748
* @return new instance of {@link HostProvider}.
4849
*/
49-
static HostProvider provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
50+
static HostProvider<?> provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
5051
InetSocketAddress... endpoints) {
5152

5253
Assert.notNull(clientProvider, "WebClientProvider must not be null");

src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java

+39-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import reactor.util.function.Tuple2;
2222

2323
import java.net.InetSocketAddress;
24+
import java.time.Duration;
2425
import java.util.ArrayList;
2526
import java.util.Collection;
2627
import java.util.Collections;
@@ -30,6 +31,8 @@
3031
import java.util.concurrent.ConcurrentHashMap;
3132
import java.util.function.Supplier;
3233

34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3336
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
3437
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
3538
import org.springframework.data.elasticsearch.client.NoReachableHostException;
@@ -42,22 +45,28 @@
4245
*
4346
* @author Christoph Strobl
4447
* @author Mark Paluch
48+
* @author Peter-Josef Meisch
4549
* @since 3.2
4650
*/
47-
class MultiNodeHostProvider implements HostProvider {
51+
class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
52+
53+
private final static Logger LOG = LoggerFactory.getLogger(MultiNodeHostProvider.class);
4854

4955
private final WebClientProvider clientProvider;
5056
private final Supplier<HttpHeaders> headersSupplier;
5157
private final Map<InetSocketAddress, ElasticsearchHost> hosts;
5258

53-
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier, InetSocketAddress... endpoints) {
59+
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
60+
InetSocketAddress... endpoints) {
5461

5562
this.clientProvider = clientProvider;
5663
this.headersSupplier = headersSupplier;
5764
this.hosts = new ConcurrentHashMap<>();
5865
for (InetSocketAddress endpoint : endpoints) {
5966
this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN));
6067
}
68+
69+
LOG.debug("initialized with " + hosts);
6170
}
6271

6372
/*
@@ -66,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider {
6675
*/
6776
@Override
6877
public Mono<ClusterInformation> clusterInfo() {
69-
return nodes(null).map(this::updateNodeState).buffer(hosts.size())
78+
return checkNodes(null).map(this::updateNodeState).buffer(hosts.size())
7079
.then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values()))));
7180
}
7281

@@ -86,14 +95,19 @@ public WebClient createWebClient(InetSocketAddress endpoint) {
8695
@Override
8796
public Mono<InetSocketAddress> lookupActiveHost(Verification verification) {
8897

98+
LOG.trace("lookupActiveHost " + verification + " from " + hosts());
99+
89100
if (Verification.LAZY.equals(verification)) {
90101
for (ElasticsearchHost entry : hosts()) {
91102
if (entry.isOnline()) {
103+
LOG.trace("lookupActiveHost returning " + entry);
92104
return Mono.just(entry.getEndpoint());
93105
}
94106
}
107+
LOG.trace("no online host found with LAZY");
95108
}
96109

110+
LOG.trace("searching for active host");
97111
return findActiveHostInKnownActives() //
98112
.switchIfEmpty(findActiveHostInUnresolved()) //
99113
.switchIfEmpty(findActiveHostInDead()) //
@@ -105,20 +119,30 @@ Collection<ElasticsearchHost> getCachedHostState() {
105119
}
106120

107121
private Mono<InetSocketAddress> findActiveHostInKnownActives() {
108-
return findActiveForSate(State.ONLINE);
122+
return findActiveForState(State.ONLINE);
109123
}
110124

111125
private Mono<InetSocketAddress> findActiveHostInUnresolved() {
112-
return findActiveForSate(State.UNKNOWN);
126+
return findActiveForState(State.UNKNOWN);
113127
}
114128

115129
private Mono<InetSocketAddress> findActiveHostInDead() {
116-
return findActiveForSate(State.OFFLINE);
130+
return findActiveForState(State.OFFLINE);
117131
}
118132

119-
private Mono<InetSocketAddress> findActiveForSate(State state) {
120-
return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline)
121-
.map(ElasticsearchHost::getEndpoint).next();
133+
private Mono<InetSocketAddress> findActiveForState(State state) {
134+
135+
LOG.trace("findActiveForState state " + state + ", current hosts: " + hosts);
136+
137+
return checkNodes(state) //
138+
.map(this::updateNodeState) //
139+
.filter(ElasticsearchHost::isOnline) //
140+
.map(elasticsearchHost -> {
141+
LOG.trace("findActiveForState returning host " + elasticsearchHost);
142+
return elasticsearchHost;
143+
}).map(ElasticsearchHost::getEndpoint) //
144+
.takeLast(1) //
145+
.next();
122146
}
123147

124148
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, ClientResponse> tuple2) {
@@ -129,17 +153,19 @@ private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, ClientRespon
129153
return elasticsearchHost;
130154
}
131155

132-
private Flux<Tuple2<InetSocketAddress, ClientResponse>> nodes(@Nullable State state) {
156+
private Flux<Tuple2<InetSocketAddress, ClientResponse>> checkNodes(@Nullable State state) {
133157

134158
return Flux.fromIterable(hosts()) //
135159
.filter(entry -> state == null || entry.getState().equals(state)) //
136160
.map(ElasticsearchHost::getEndpoint) //
137-
.flatMap(host -> {
161+
.concatMap(host -> {
138162

139163
Mono<ClientResponse> exchange = createWebClient(host) //
140164
.head().uri("/") //
141165
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
142-
.exchange().doOnError(throwable -> {
166+
.exchange() //
167+
.timeout(Duration.ofSeconds(1)) //
168+
.doOnError(throwable -> {
143169
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
144170
clientProvider.getErrorListener().accept(throwable);
145171
});

src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,9 +32,10 @@
3232
*
3333
* @author Christoph Strobl
3434
* @author Mark Paluch
35+
* @author Peter-Josef Meisch
3536
* @since 3.2
3637
*/
37-
class SingleNodeHostProvider implements HostProvider {
38+
class SingleNodeHostProvider implements HostProvider<SingleNodeHostProvider> {
3839

3940
private final WebClientProvider clientProvider;
4041
private final Supplier<HttpHeaders> headersSupplier;

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,7 +62,7 @@ public class ReactiveElasticsearchClientUnitTests {
6262

6363
static final String HOST = ":9200";
6464

65-
MockDelegatingElasticsearchHostProvider<HostProvider> hostProvider;
65+
MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> hostProvider;
6666
ReactiveElasticsearchClient client;
6767

6868
@BeforeEach

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public T getDelegate() {
186186
return delegate;
187187
}
188188

189-
public MockDelegatingElasticsearchHostProvider<T> withActiveDefaultHost(String host) {
189+
public MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> withActiveDefaultHost(String host) {
190190
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate,
191191
host);
192192
}

src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030

3131
/**
3232
* @author Christoph Strobl
33+
* @author Peter-Josef Meisch
3334
*/
3435
public class SingleNodeHostProviderUnitTests {
3536

0 commit comments

Comments
 (0)