From 8cdc01bf08d28cfa44560e9c13f9b3623da8dbef Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 16 Sep 2023 13:16:47 +0200 Subject: [PATCH] Allow providing a custom ElasticsearchTransport in the configuration classes. --- .../client/elc/ElasticsearchClients.java | 106 +++++++++++++----- .../elc/ElasticsearchConfiguration.java | 35 ++++-- .../ReactiveElasticsearchConfiguration.java | 33 ++++-- 3 files changed, 128 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchClients.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchClients.java index 3f8d3346f..b38e40b52 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchClients.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchClients.java @@ -57,16 +57,18 @@ * @author Peter-Josef Meisch * @since 4.4 */ +@SuppressWarnings("unused") public final class ElasticsearchClients { /** * Name of whose value can be used to correlate log messages for this request. */ private static final String X_SPRING_DATA_ELASTICSEARCH_CLIENT = "X-SpringDataElasticsearch-Client"; - private static final String IMPERATIVE_CLIENT = "imperative"; - private static final String REACTIVE_CLIENT = "reactive"; + public static final String IMPERATIVE_CLIENT = "imperative"; + public static final String REACTIVE_CLIENT = "reactive"; private static final JsonpMapper DEFAULT_JSONP_MAPPER = new JacksonJsonpMapper(); + // region reactive client /** * Creates a new {@link ReactiveElasticsearchClient} * @@ -131,10 +133,28 @@ public static ReactiveElasticsearchClient createReactive(RestClient restClient) */ public static ReactiveElasticsearchClient createReactive(RestClient restClient, @Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) { - return new ReactiveElasticsearchClient( - getElasticsearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper)); + + Assert.notNull(restClient, "restClient must not be null"); + + var transport = getElasticsearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper); + return createReactive(transport); } + /** + * Creates a new {@link ReactiveElasticsearchClient} that uses the given {@link ElasticsearchTransport}. + * + * @param transport the transport to use + * @return the {@link ElasticsearchClient + */ + public static ReactiveElasticsearchClient createReactive(ElasticsearchTransport transport) { + + Assert.notNull(transport, "transport must not be null"); + + return new ReactiveElasticsearchClient(transport); + } + // endregion + + // region imperative client /** * Creates a new imperative {@link ElasticsearchClient} * @@ -183,8 +203,40 @@ public static ElasticsearchClient createImperative(RestClient restClient, @Nulla ElasticsearchTransport transport = getElasticsearchTransport(restClient, IMPERATIVE_CLIENT, transportOptions, jsonpMapper); + return createImperative(transport); + } + + /** + * Creates a new {@link ElasticsearchClient} that uses the given {@link ElasticsearchTransport}. + * + * @param transport the transport to use + * @return the {@link ElasticsearchClient + */ + public static AutoCloseableElasticsearchClient createImperative(ElasticsearchTransport transport) { + + Assert.notNull(transport, "transport must not be null"); + return new AutoCloseableElasticsearchClient(transport); } + // endregion + + // region low level RestClient + private static RestClientOptions.Builder getRestClientOptionsBuilder(@Nullable TransportOptions transportOptions) { + + if (transportOptions instanceof RestClientOptions restClientOptions) { + return restClientOptions.toBuilder(); + } + + var builder = new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder()); + + if (transportOptions != null) { + transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue())); + transportOptions.queryParameters().forEach(builder::setParameter); + builder.onWarnings(transportOptions.onWarnings()); + } + + return builder; + } /** * Creates a low level {@link RestClient} for the given configuration. @@ -256,10 +308,26 @@ private static RestClientBuilder getRestClientBuilder(ClientConfiguration client } return builder; } + // endregion - private static ElasticsearchTransport getElasticsearchTransport(RestClient restClient, String clientType, + // region Elasticsearch transport + /** + * Creates an {@link ElasticsearchTransport} that will use the given client that additionally is customized with a + * header to contain the clientType + * + * @param restClient the client to use + * @param clientType the client type to pass in each request as header + * @param transportOptions options for the transport + * @param jsonpMapper mapper for the transport + * @return ElasticsearchTransport + */ + public static ElasticsearchTransport getElasticsearchTransport(RestClient restClient, String clientType, @Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) { + Assert.notNull(restClient, "restClient must not be null"); + Assert.notNull(clientType, "clientType must not be null"); + Assert.notNull(jsonpMapper, "jsonpMapper must not be null"); + TransportOptions.Builder transportOptionsBuilder = transportOptions != null ? transportOptions.toBuilder() : new RestClientOptions(RequestOptions.DEFAULT).toBuilder(); @@ -285,26 +353,10 @@ private static ElasticsearchTransport getElasticsearchTransport(RestClient restC return new RestClientTransport(restClient, jsonpMapper, restClientOptionsBuilder.build()); } - - private static RestClientOptions.Builder getRestClientOptionsBuilder(@Nullable TransportOptions transportOptions) { - - if (transportOptions instanceof RestClientOptions restClientOptions) { - return restClientOptions.toBuilder(); - } - - var builder = new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder()); - - if (transportOptions != null) { - transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue())); - transportOptions.queryParameters().forEach(builder::setParameter); - builder.onWarnings(transportOptions.onWarnings()); - } - - return builder; - } + // endregion private static List formattedHosts(List hosts, boolean useSsl) { - return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ":" + it.getPort()) + return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort()) .collect(Collectors.toList()); } @@ -320,13 +372,7 @@ private static org.apache.http.Header[] toHeaderArray(HttpHeaders headers) { * * @since 4.4 */ - private static class CustomHeaderInjector implements HttpRequestInterceptor { - - public CustomHeaderInjector(Supplier headersSupplier) { - this.headersSupplier = headersSupplier; - } - - private final Supplier headersSupplier; + private record CustomHeaderInjector(Supplier headersSupplier) implements HttpRequestInterceptor { @Override public void process(HttpRequest request, HttpContext context) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchConfiguration.java index f25c980c9..d48d08a47 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchConfiguration.java @@ -18,12 +18,12 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.rest_client.RestClientOptions; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; -import org.jetbrains.annotations.NotNull; import org.springframework.context.annotation.Bean; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport; @@ -33,7 +33,8 @@ /** * Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch - * connection using the Elasticsearch Client. + * connection using the Elasticsearch Client. This class exposes different parts of the setup as Spring beans. Deriving + * classes must provide the {@link ClientConfiguration} to use. * * @author Peter-Josef Meisch * @since 4.4 @@ -49,7 +50,7 @@ public abstract class ElasticsearchConfiguration extends ElasticsearchConfigurat public abstract ClientConfiguration clientConfiguration(); /** - * Provides the underlying low level RestClient. + * Provides the underlying low level Elasticsearch RestClient. * * @param clientConfiguration configuration for the client, must not be {@literal null} * @return RestClient @@ -62,19 +63,35 @@ public RestClient elasticsearchRestClient(ClientConfiguration clientConfiguratio return ElasticsearchClients.getRestClient(clientConfiguration); } + /** + * Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and + * the {@link JsonpMapper} bean provided in this class. + * + * @return the {@link ElasticsearchTransport} + * @since 5.2 + */ + @Bean + public ElasticsearchTransport elasticsearchTransport(RestClient restClient, JsonpMapper jsonpMapper) { + + Assert.notNull(restClient, "restClient must not be null"); + Assert.notNull(jsonpMapper, "jsonpMapper must not be null"); + + return ElasticsearchClients.getElasticsearchTransport(restClient, ElasticsearchClients.IMPERATIVE_CLIENT, + transportOptions(), jsonpMapper); + } + /** * Provides the {@link ElasticsearchClient} to be used. * - * @param restClient the low level RestClient to use - * @param jsonpMapper the JsonpMapper to use + * @param transport the {@link ElasticsearchTransport} to use * @return ElasticsearchClient instance */ @Bean - public ElasticsearchClient elasticsearchClient(RestClient restClient, JsonpMapper jsonpMapper) { + public ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) { - Assert.notNull(restClient, "restClient must not be null"); + Assert.notNull(transport, "transport must not be null"); - return ElasticsearchClients.createImperative(restClient, transportOptions(), jsonpMapper); + return ElasticsearchClients.createImperative(transport); } /** @@ -94,7 +111,7 @@ public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter el } /** - * Provides the JsonpMapper bean that is used in the {@link #elasticsearchClient(RestClient, JsonpMapper)} method. + * Provides the JsonpMapper bean that is used in the {@link #elasticsearchTransport(RestClient, JsonpMapper)} method. * * @return the {@link JsonpMapper} to use * @since 5.2 diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchConfiguration.java index 7c7b28303..c3ebf9909 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchConfiguration.java @@ -17,6 +17,7 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.rest_client.RestClientOptions; @@ -31,7 +32,8 @@ /** * Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch - * connection using the {@link ReactiveElasticsearchClient}. + * connection using the {@link ReactiveElasticsearchClient}. This class exposes different parts of the setup as Spring + * beans. Deriving * classes must provide the {@link ClientConfiguration} to use. * * @author Peter-Josef Meisch * @since 4.4 @@ -60,18 +62,35 @@ public RestClient elasticsearchRestClient(ClientConfiguration clientConfiguratio return ElasticsearchClients.getRestClient(clientConfiguration); } + /** + * Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and + * the {@link JsonpMapper} bean provided in this class. + * + * @return the {@link ElasticsearchTransport} + * @since 5.2 + */ + @Bean + public ElasticsearchTransport elasticsearchTransport(RestClient restClient, JsonpMapper jsonpMapper) { + + Assert.notNull(restClient, "restClient must not be null"); + Assert.notNull(jsonpMapper, "jsonpMapper must not be null"); + + return ElasticsearchClients.getElasticsearchTransport(restClient, ElasticsearchClients.REACTIVE_CLIENT, + transportOptions(), jsonpMapper); + } + /** * Provides the {@link ReactiveElasticsearchClient} instance used. * - * @param restClient the low level RestClient to use + * @param transport the ElasticsearchTransport to use * @return ReactiveElasticsearchClient instance. */ @Bean - public ReactiveElasticsearchClient reactiveElasticsearchClient(RestClient restClient, JsonpMapper jsonpMapper) { + public ReactiveElasticsearchClient reactiveElasticsearchClient(ElasticsearchTransport transport) { - Assert.notNull(restClient, "restClient must not be null"); + Assert.notNull(transport, "transport must not be null"); - return ElasticsearchClients.createReactive(restClient, transportOptions(), jsonpMapper); + return ElasticsearchClients.createReactive(transport); } /** @@ -91,8 +110,8 @@ public ReactiveElasticsearchOperations reactiveElasticsearchOperations(Elasticse } /** - * Provides the JsonpMapper that is used in the {@link #reactiveElasticsearchClient(RestClient, JsonpMapper)} method - * and exposes it as a bean. + * Provides the JsonpMapper that is used in the {@link #elasticsearchTransport(RestClient, JsonpMapper)} method and + * exposes it as a bean. * * @return the {@link JsonpMapper} to use * @since 5.2