Skip to content

Use Elasticsearch 8.2.2. #2174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
<!-- version of the RestHighLevelClient -->
<elasticsearch-rhlc>7.17.3</elasticsearch-rhlc>
<!-- version of the new ElasticsearchClient -->
<elasticsearch-java>7.17.3</elasticsearch-java>
<log4j>2.17.1</log4j>
<elasticsearch-java>8.2.2</elasticsearch-java>
<log4j>2.17.2</log4j>
<netty>4.1.65.Final</netty>
<springdata.commons>3.0.0-SNAPSHOT</springdata.commons>
<testcontainers>1.16.2</testcontainers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -132,7 +133,7 @@ public static ElasticsearchRestClient create(ClientConfiguration clientConfigura
return clientBuilder;
});

RestHighLevelClient client = new RestHighLevelClient(builder);
RestHighLevelClient client = new RestHighLevelClientBuilder(builder.build()).setApiCompatibilityMode(true).build();
return () -> client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.Version;
import co.elastic.clients.transport.rest_client.RestClientOptions;
import co.elastic.clients.transport.rest_client.RestClientTransport;

Expand All @@ -41,8 +42,10 @@
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HttpContext;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
Expand Down Expand Up @@ -242,6 +245,18 @@ private static ElasticsearchTransport getElasticsearchTransport(RestClient restC

TransportOptions.Builder transportOptionsBuilder = transportOptions != null ? transportOptions.toBuilder()
: new RestClientOptions(RequestOptions.DEFAULT).toBuilder();

// need to add the compatibility header, this is only done automatically when not passing in custom options.
// code copied from RestClientTransport as it is not available outside the package
ContentType jsonContentType = null;
if (Version.VERSION == null) {
jsonContentType = ContentType.APPLICATION_JSON;
} else {
jsonContentType = ContentType.create("application/vnd.elasticsearch+json",
new BasicNameValuePair("compatible-with", String.valueOf(Version.VERSION.major())));
}
transportOptionsBuilder.addHeader("Accept", jsonContentType.toString());

TransportOptions transportOptionsWithHeader = transportOptionsBuilder
.addHeader(X_SPRING_DATA_ELASTICSEARCH_CLIENT, clientType).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,18 @@ public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptio
// endregion

@Override
protected String getClusterVersion() {
public String getClusterVersion() {
return execute(client -> client.info().version().number());

}

@Override
protected String getVendor() {
public String getVendor() {
return "Elasticsearch";
}

@Override
protected String getRuntimeLibraryVersion() {
public String getRuntimeLibraryVersion() {
return Version.VERSION.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,17 +413,17 @@ public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> e
// endregion

@Override
protected Mono<String> getVendor() {
public Mono<String> getVendor() {
return Mono.just("Elasticsearch");
}

@Override
protected Mono<String> getRuntimeLibraryVersion() {
public Mono<String> getRuntimeLibraryVersion() {
return Mono.just(Version.VERSION != null ? Version.VERSION.toString() : "null");
}

@Override
protected Mono<String> getClusterVersion() {
public Mono<String> getClusterVersion() {
return Mono.from(execute(ReactiveElasticsearchClient::info)).map(infoResponse -> infoResponse.version().number());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ public co.elastic.clients.elasticsearch.core.ReindexRequest reindex(ReindexReque
.refresh(reindexRequest.getRefresh()) //
.requireAlias(reindexRequest.getRequireAlias()) //
.requestsPerSecond(reindexRequest.getRequestsPerSecond()) //
.slices(reindexRequest.getSlices());
.slices(slices(reindexRequest.getSlices()));

return builder.build();
}
Expand Down Expand Up @@ -963,24 +963,24 @@ public UpdateByQueryRequest documentUpdateByQueryRequest(UpdateQuery updateQuery
.pipeline(updateQuery.getPipeline()) //
.requestsPerSecond(
updateQuery.getRequestsPerSecond() != null ? updateQuery.getRequestsPerSecond().longValue() : null) //
.slices(updateQuery.getSlices() != null ? Long.valueOf(updateQuery.getSlices()) : null) //
;
.slices(slices(updateQuery.getSlices() != null ? Long.valueOf(updateQuery.getSlices()) : null));

if (updateQuery.getAbortOnVersionConflict() != null) {
ub.conflicts(updateQuery.getAbortOnVersionConflict() ? Conflicts.Abort : Conflicts.Proceed);
}

if (updateQuery.getBatchSize() != null) {
ub.size(Long.valueOf(updateQuery.getBatchSize()));
}

if (updateQuery.getQuery() != null) {
Query queryQuery = updateQuery.getQuery();

if (updateQuery.getBatchSize() != null) {
((BaseQuery) queryQuery).setMaxResults(updateQuery.getBatchSize());
}
ub.query(getQuery(queryQuery, null));

// no indicesOptions available like in old client

ub.scroll(time(queryQuery.getScrollTime()));

}

// no maxRetries available like in old client
Expand Down Expand Up @@ -1164,11 +1164,11 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind

if (!query.getRuntimeFields().isEmpty()) {

Map<String, RuntimeField> runtimeMappings = new HashMap<>();
Map<String, List<RuntimeField>> runtimeMappings = new HashMap<>();
query.getRuntimeFields().forEach(runtimeField -> {
runtimeMappings.put(runtimeField.getName(), RuntimeField.of(rt -> rt //
runtimeMappings.put(runtimeField.getName(), Collections.singletonList(RuntimeField.of(rt -> rt //
.type(RuntimeFieldType._DESERIALIZER.parse(runtimeField.getType())) //
.script(s -> s.inline(is -> is.source(runtimeField.getScript())))));
.script(s -> s.inline(is -> is.source(runtimeField.getScript()))))));
});
builder.runtimeMappings(runtimeMappings);
}
Expand Down Expand Up @@ -1328,6 +1328,7 @@ private co.elastic.clients.elasticsearch._types.query_dsl.Query getQuery(@Nullab
} else {
throw new IllegalArgumentException("unhandled Query implementation " + query.getClass().getName());
}

return esQuery;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public ClusterHealth clusterHealth(HealthResponse healthResponse) {
.withNumberOfPendingTasks(healthResponse.numberOfPendingTasks()) //
.withRelocatingShards(healthResponse.relocatingShards()) //
.withStatus(healthResponse.status().toString()) //
.withTaskMaxWaitingTimeMillis(Long.parseLong(healthResponse.taskMaxWaitingInQueueMillis())) //
.withTaskMaxWaitingTimeMillis(healthResponse.taskMaxWaitingInQueueMillis().toEpochMilli()) //
.withTimedOut(healthResponse.timedOut()) //
.withUnassignedShards(healthResponse.unassignedShards()) //
.build(); //
Expand Down Expand Up @@ -144,11 +144,11 @@ public Document indicesGetMapping(GetMappingResponse getMappingResponse, IndexCo
if (indexMappingRecord == null) {

if (mappings.size() != 1) {
LOGGER.warn("no mapping returned for index {}", indexCoordinates.getIndexName());
return Document.create();
}
String index = mappings.keySet().iterator().next();
indexMappingRecord = mappings.get(index);
LOGGER.warn("no mapping returned for index {}", indexCoordinates.getIndexName());
return Document.create();
}
String index = mappings.keySet().iterator().next();
indexMappingRecord = mappings.get(index);
}

return Document.parse(toJson(indexMappingRecord.mappings(), jsonpMapper));
Expand Down Expand Up @@ -277,9 +277,9 @@ public ReindexResponse reindexResponse(co.elastic.clients.elasticsearch.core.Rei
.withNoops(reindexResponse.noops()) //
.withBulkRetries(reindexResponse.retries().bulk()) //
.withSearchRetries(reindexResponse.retries().search()) //
.withThrottledMillis(Long.parseLong(reindexResponse.throttledMillis())) //
.withThrottledMillis(reindexResponse.throttledMillis().toEpochMilli()) //
.withRequestsPerSecond(reindexResponse.requestsPerSecond()) //
.withThrottledUntilMillis(Long.parseLong(reindexResponse.throttledUntilMillis())).withFailures(failures) //
.withThrottledUntilMillis(reindexResponse.throttledUntilMillis().toEpochMilli()).withFailures(failures) //
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch.core.search.BoundaryScanner;
import co.elastic.clients.elasticsearch.core.search.BuiltinHighlighterType;
import co.elastic.clients.elasticsearch.core.search.HighlighterEncoder;
import co.elastic.clients.elasticsearch.core.search.HighlighterFragmenter;
import co.elastic.clients.elasticsearch.core.search.HighlighterOrder;
Expand Down Expand Up @@ -170,11 +169,11 @@ static HighlighterType highlighterType(@Nullable String value) {
if (value != null) {
switch (value.toLowerCase()) {
case "unified":
return HighlighterType.of(b -> b.builtin(BuiltinHighlighterType.Unified));
return HighlighterType.Unified;
case "plain":
return HighlighterType.of(b -> b.builtin(BuiltinHighlighterType.Plain));
return HighlighterType.Plain;
case "fvh":
return HighlighterType.of(b -> b.builtin(BuiltinHighlighterType.FastVector));
return HighlighterType.FastVector;
default:
return null;
}
Expand Down Expand Up @@ -308,6 +307,16 @@ static SearchType searchType(@Nullable Query.SearchType searchType) {
return null;
}

@Nullable
static Slices slices(@Nullable Long count) {

if (count == null) {
return null;
}

return Slices.of(s -> s.value(Math.toIntExact(count)));
}

@Nullable
static SortMode sortMode(Order.Mode mode) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ static WebClientProvider getWebClientProvider(ClientConfiguration clientConfigur
if (suppliedHeaders != null && suppliedHeaders != HttpHeaders.EMPTY) {
httpHeaders.addAll(suppliedHeaders);
}

// this WebClientProvider is built with ES 7 and not used on 8 anymore
httpHeaders.add("Accept", "application/vnd.elasticsearch+json;compatible-with=7");

var contentTypeHeader = "Content-Type";
if (httpHeaders.containsKey(contentTypeHeader)) {
httpHeaders.remove(contentTypeHeader);
}
httpHeaders.add(contentTypeHeader, "application/vnd.elasticsearch+json;compatible-with=7");

}));

return provider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws

/**
* Set the {@link EntityCallbacks} instance to use when invoking {@link EntityCallbacks callbacks} like the
* {@link org.springframework.data.elasticsearch.core.event.BeforeConvertCallback}.
* Overrides potentially existing {@link EntityCallbacks}.
* {@link org.springframework.data.elasticsearch.core.event.BeforeConvertCallback}. Overrides potentially existing
* {@link EntityCallbacks}.
*
* @param entityCallbacks must not be {@literal null}.
* @throws IllegalArgumentException if the given instance is {@literal null}.
Expand Down Expand Up @@ -588,19 +588,19 @@ protected <T> SearchDocumentResponse.EntityCreator<T> getEntityCreator(ReadDocum
* @return the version as string if it can be retrieved
*/
@Nullable
abstract protected String getClusterVersion();
public abstract String getClusterVersion();

/**
* @return the vendor name of the used cluster and client library
* @since 4.3
*/
abstract protected String getVendor();
public abstract String getVendor();

/**
* @return the version of the used client runtime library.
* @since 4.3
*/
abstract protected String getRuntimeLibraryVersion();
public abstract String getRuntimeLibraryVersion();

// endregion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,15 +611,15 @@ public ElasticsearchPersistentEntity<?> getPersistentEntityFor(@Nullable Class<?
* @return the vendor name of the used cluster and client library
* @since 4.3
*/
abstract protected Mono<String> getVendor();
public abstract Mono<String> getVendor();

/**
* @return the version of the used client runtime library.
* @since 4.3
*/
abstract protected Mono<String> getRuntimeLibraryVersion();
public abstract Mono<String> getRuntimeLibraryVersion();

abstract protected Mono<String> getClusterVersion();
public abstract Mono<String> getClusterVersion();

/**
* Value class to capture client independent information from a response to an index request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ private RuntimeException translateException(Exception exception) {

// region helper methods
@Override
protected String getClusterVersion() {
public String getClusterVersion() {
try {
return execute(client -> client.info(RequestOptions.DEFAULT)).getVersion().getNumber();
} catch (Exception ignored) {}
Expand All @@ -629,12 +629,12 @@ public Query idsQuery(List<String> ids) {
}

@Override
protected String getVendor() {
public String getVendor() {
return "Elasticsearch";
}

@Override
protected String getRuntimeLibraryVersion() {
public String getRuntimeLibraryVersion() {
return Version.CURRENT.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ protected SearchRequest prepareSearchRequest(SearchRequest request, boolean useS

// region Helper methods
@Override
protected Mono<String> getClusterVersion() {
public Mono<String> getClusterVersion() {
try {
return Mono.from(execute(ReactiveElasticsearchClient::info))
.map(mainResponse -> mainResponse.getVersion().toString());
Expand All @@ -689,7 +689,7 @@ protected Mono<String> getClusterVersion() {
* @since 4.3
*/
@Override
protected Mono<String> getVendor() {
public Mono<String> getVendor() {
return Mono.just("Elasticsearch");
}

Expand All @@ -698,7 +698,7 @@ protected Mono<String> getVendor() {
* @since 4.3
*/
@Override
protected Mono<String> getRuntimeLibraryVersion() {
public Mono<String> getRuntimeLibraryVersion() {
return Mono.just(Version.CURRENT.toString());
}

Expand Down
Loading