Skip to content

Allow providing a custom ElasticsearchTransport in the configuration classes. #2703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@
* @author Peter-Josef Meisch
* @since 4.4
*/
@SuppressWarnings("unused")
public final class ElasticsearchClients {
/**
* Name of whose value can be used to correlate log messages for this request.
*/
private static final String X_SPRING_DATA_ELASTICSEARCH_CLIENT = "X-SpringDataElasticsearch-Client";
private static final String IMPERATIVE_CLIENT = "imperative";
private static final String REACTIVE_CLIENT = "reactive";
public static final String IMPERATIVE_CLIENT = "imperative";
public static final String REACTIVE_CLIENT = "reactive";

private static final JsonpMapper DEFAULT_JSONP_MAPPER = new JacksonJsonpMapper();

// region reactive client
/**
* Creates a new {@link ReactiveElasticsearchClient}
*
Expand Down Expand Up @@ -131,10 +133,28 @@ public static ReactiveElasticsearchClient createReactive(RestClient restClient)
*/
public static ReactiveElasticsearchClient createReactive(RestClient restClient,
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {
return new ReactiveElasticsearchClient(
getElasticsearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper));

Assert.notNull(restClient, "restClient must not be null");

var transport = getElasticsearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper);
return createReactive(transport);
}

/**
* Creates a new {@link ReactiveElasticsearchClient} that uses the given {@link ElasticsearchTransport}.
*
* @param transport the transport to use
* @return the {@link ElasticsearchClient
*/
public static ReactiveElasticsearchClient createReactive(ElasticsearchTransport transport) {

Assert.notNull(transport, "transport must not be null");

return new ReactiveElasticsearchClient(transport);
}
// endregion

// region imperative client
/**
* Creates a new imperative {@link ElasticsearchClient}
*
Expand Down Expand Up @@ -183,8 +203,40 @@ public static ElasticsearchClient createImperative(RestClient restClient, @Nulla
ElasticsearchTransport transport = getElasticsearchTransport(restClient, IMPERATIVE_CLIENT, transportOptions,
jsonpMapper);

return createImperative(transport);
}

/**
* Creates a new {@link ElasticsearchClient} that uses the given {@link ElasticsearchTransport}.
*
* @param transport the transport to use
* @return the {@link ElasticsearchClient
*/
public static AutoCloseableElasticsearchClient createImperative(ElasticsearchTransport transport) {

Assert.notNull(transport, "transport must not be null");

return new AutoCloseableElasticsearchClient(transport);
}
// endregion

// region low level RestClient
private static RestClientOptions.Builder getRestClientOptionsBuilder(@Nullable TransportOptions transportOptions) {

if (transportOptions instanceof RestClientOptions restClientOptions) {
return restClientOptions.toBuilder();
}

var builder = new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder());

if (transportOptions != null) {
transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue()));
transportOptions.queryParameters().forEach(builder::setParameter);
builder.onWarnings(transportOptions.onWarnings());
}

return builder;
}

/**
* Creates a low level {@link RestClient} for the given configuration.
Expand Down Expand Up @@ -256,10 +308,26 @@ private static RestClientBuilder getRestClientBuilder(ClientConfiguration client
}
return builder;
}
// endregion

private static ElasticsearchTransport getElasticsearchTransport(RestClient restClient, String clientType,
// region Elasticsearch transport
/**
* Creates an {@link ElasticsearchTransport} that will use the given client that additionally is customized with a
* header to contain the clientType
*
* @param restClient the client to use
* @param clientType the client type to pass in each request as header
* @param transportOptions options for the transport
* @param jsonpMapper mapper for the transport
* @return ElasticsearchTransport
*/
public static ElasticsearchTransport getElasticsearchTransport(RestClient restClient, String clientType,
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {

Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(clientType, "clientType must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");

TransportOptions.Builder transportOptionsBuilder = transportOptions != null ? transportOptions.toBuilder()
: new RestClientOptions(RequestOptions.DEFAULT).toBuilder();

Expand All @@ -285,26 +353,10 @@ private static ElasticsearchTransport getElasticsearchTransport(RestClient restC

return new RestClientTransport(restClient, jsonpMapper, restClientOptionsBuilder.build());
}

private static RestClientOptions.Builder getRestClientOptionsBuilder(@Nullable TransportOptions transportOptions) {

if (transportOptions instanceof RestClientOptions restClientOptions) {
return restClientOptions.toBuilder();
}

var builder = new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder());

if (transportOptions != null) {
transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue()));
transportOptions.queryParameters().forEach(builder::setParameter);
builder.onWarnings(transportOptions.onWarnings());
}

return builder;
}
// endregion

private static List<String> formattedHosts(List<InetSocketAddress> hosts, boolean useSsl) {
return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ":" + it.getPort())
return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort())
.collect(Collectors.toList());
}

Expand All @@ -320,13 +372,7 @@ private static org.apache.http.Header[] toHeaderArray(HttpHeaders headers) {
*
* @since 4.4
*/
private static class CustomHeaderInjector implements HttpRequestInterceptor {

public CustomHeaderInjector(Supplier<HttpHeaders> headersSupplier) {
this.headersSupplier = headersSupplier;
}

private final Supplier<HttpHeaders> headersSupplier;
private record CustomHeaderInjector(Supplier<HttpHeaders> headersSupplier) implements HttpRequestInterceptor {

@Override
public void process(HttpRequest request, HttpContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.rest_client.RestClientOptions;

import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;
Expand All @@ -33,7 +33,8 @@

/**
* Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch
* connection using the Elasticsearch Client.
* connection using the Elasticsearch Client. This class exposes different parts of the setup as Spring beans. Deriving
* classes must provide the {@link ClientConfiguration} to use.
*
* @author Peter-Josef Meisch
* @since 4.4
Expand All @@ -49,7 +50,7 @@ public abstract class ElasticsearchConfiguration extends ElasticsearchConfigurat
public abstract ClientConfiguration clientConfiguration();

/**
* Provides the underlying low level RestClient.
* Provides the underlying low level Elasticsearch RestClient.
*
* @param clientConfiguration configuration for the client, must not be {@literal null}
* @return RestClient
Expand All @@ -62,19 +63,35 @@ public RestClient elasticsearchRestClient(ClientConfiguration clientConfiguratio
return ElasticsearchClients.getRestClient(clientConfiguration);
}

/**
* Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and
* the {@link JsonpMapper} bean provided in this class.
*
* @return the {@link ElasticsearchTransport}
* @since 5.2
*/
@Bean
public ElasticsearchTransport elasticsearchTransport(RestClient restClient, JsonpMapper jsonpMapper) {

Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");

return ElasticsearchClients.getElasticsearchTransport(restClient, ElasticsearchClients.IMPERATIVE_CLIENT,
transportOptions(), jsonpMapper);
}

/**
* Provides the {@link ElasticsearchClient} to be used.
*
* @param restClient the low level RestClient to use
* @param jsonpMapper the JsonpMapper to use
* @param transport the {@link ElasticsearchTransport} to use
* @return ElasticsearchClient instance
*/
@Bean
public ElasticsearchClient elasticsearchClient(RestClient restClient, JsonpMapper jsonpMapper) {
public ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) {

Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(transport, "transport must not be null");

return ElasticsearchClients.createImperative(restClient, transportOptions(), jsonpMapper);
return ElasticsearchClients.createImperative(transport);
}

/**
Expand All @@ -94,7 +111,7 @@ public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter el
}

/**
* Provides the JsonpMapper bean that is used in the {@link #elasticsearchClient(RestClient, JsonpMapper)} method.
* Provides the JsonpMapper bean that is used in the {@link #elasticsearchTransport(RestClient, JsonpMapper)} method.
*
* @return the {@link JsonpMapper} to use
* @since 5.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.rest_client.RestClientOptions;

Expand All @@ -31,7 +32,8 @@

/**
* Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch
* connection using the {@link ReactiveElasticsearchClient}.
* connection using the {@link ReactiveElasticsearchClient}. This class exposes different parts of the setup as Spring
* beans. Deriving * classes must provide the {@link ClientConfiguration} to use.
*
* @author Peter-Josef Meisch
* @since 4.4
Expand Down Expand Up @@ -60,18 +62,35 @@ public RestClient elasticsearchRestClient(ClientConfiguration clientConfiguratio
return ElasticsearchClients.getRestClient(clientConfiguration);
}

/**
* Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and
* the {@link JsonpMapper} bean provided in this class.
*
* @return the {@link ElasticsearchTransport}
* @since 5.2
*/
@Bean
public ElasticsearchTransport elasticsearchTransport(RestClient restClient, JsonpMapper jsonpMapper) {

Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");

return ElasticsearchClients.getElasticsearchTransport(restClient, ElasticsearchClients.REACTIVE_CLIENT,
transportOptions(), jsonpMapper);
}

/**
* Provides the {@link ReactiveElasticsearchClient} instance used.
*
* @param restClient the low level RestClient to use
* @param transport the ElasticsearchTransport to use
* @return ReactiveElasticsearchClient instance.
*/
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient(RestClient restClient, JsonpMapper jsonpMapper) {
public ReactiveElasticsearchClient reactiveElasticsearchClient(ElasticsearchTransport transport) {

Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(transport, "transport must not be null");

return ElasticsearchClients.createReactive(restClient, transportOptions(), jsonpMapper);
return ElasticsearchClients.createReactive(transport);
}

/**
Expand All @@ -91,8 +110,8 @@ public ReactiveElasticsearchOperations reactiveElasticsearchOperations(Elasticse
}

/**
* Provides the JsonpMapper that is used in the {@link #reactiveElasticsearchClient(RestClient, JsonpMapper)} method
* and exposes it as a bean.
* Provides the JsonpMapper that is used in the {@link #elasticsearchTransport(RestClient, JsonpMapper)} method and
* exposes it as a bean.
*
* @return the {@link JsonpMapper} to use
* @since 5.2
Expand Down