Skip to content

Support for SQL. #2949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.msearch.MultiSearchResponseItem;
import co.elastic.clients.elasticsearch.core.search.ResponseBody;
import co.elastic.clients.elasticsearch.sql.ElasticsearchSqlClient;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;

Expand Down Expand Up @@ -56,6 +58,7 @@
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.script.Script;
import org.springframework.data.elasticsearch.core.sql.SqlResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand All @@ -74,6 +77,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
private static final Log LOGGER = LogFactory.getLog(ElasticsearchTemplate.class);

private final ElasticsearchClient client;
private final ElasticsearchSqlClient sqlClient;
private final RequestConverter requestConverter;
private final ResponseConverter responseConverter;
private final JsonpMapper jsonpMapper;
Expand All @@ -85,6 +89,7 @@ public ElasticsearchTemplate(ElasticsearchClient client) {
Assert.notNull(client, "client must not be null");

this.client = client;
this.sqlClient = client.sql();
this.jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
Expand All @@ -97,6 +102,7 @@ public ElasticsearchTemplate(ElasticsearchClient client, ElasticsearchConverter
Assert.notNull(client, "client must not be null");

this.client = client;
this.sqlClient = client.sql();
this.jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
Expand Down Expand Up @@ -656,6 +662,19 @@ public boolean deleteScript(String name) {
DeleteScriptRequest request = requestConverter.scriptDelete(name);
return execute(client -> client.deleteScript(request)).acknowledged();
}

@Override
public SqlResponse search(SqlQuery query) {
Assert.notNull(query, "Query must not be null.");

try {
QueryResponse response = sqlClient.query(requestConverter.sqlQueryRequest(query));

return responseConverter.sqlResponse(response);
} catch (IOException e) {
throw exceptionTranslator.translateException(e);
}
}
// endregion

// region client callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public ReactiveElasticsearchIndicesClient indices() {
return new ReactiveElasticsearchIndicesClient(transport, transportOptions);
}

public ReactiveElasticsearchSqlClient sql() {
return new ReactiveElasticsearchSqlClient(transport, transportOptions);
}

// endregion
// region info

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.elc;

import java.io.IOException;
import java.util.function.Function;

import org.jetbrains.annotations.Nullable;

import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch.sql.QueryRequest;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.util.ObjectBuilder;
import reactor.core.publisher.Mono;

/**
* Reactive version of {@link co.elastic.clients.elasticsearch.sql.ElasticsearchSqlClient}.
*
* @author Aouichaoui Youssef
* @since 5.4
*/
public class ReactiveElasticsearchSqlClient extends ApiClient<ElasticsearchTransport, ReactiveElasticsearchSqlClient> {
public ReactiveElasticsearchSqlClient(ElasticsearchTransport transport, @Nullable TransportOptions transportOptions) {
super(transport, transportOptions);
}

@Override
public ReactiveElasticsearchSqlClient withTransportOptions(@Nullable TransportOptions transportOptions) {
return new ReactiveElasticsearchSqlClient(transport, transportOptions);
}

/**
* Executes a SQL request
*
* @param fn a function that initializes a builder to create the {@link QueryRequest}.
*/
public final Mono<QueryResponse> query(Function<QueryRequest.Builder, ObjectBuilder<QueryRequest>> fn)
throws IOException, ElasticsearchException {
return query(fn.apply(new QueryRequest.Builder()).build());
}

/**
* Executes a SQL request.
*/
public Mono<QueryResponse> query(QueryRequest query) {
return Mono.fromFuture(transport.performRequestAsync(query, QueryRequest._ENDPOINT, transportOptions));
}

/**
* Executes a SQL request.
*/
public Mono<QueryResponse> query() {
return Mono.fromFuture(
transport.performRequestAsync(new QueryRequest.Builder().build(), QueryRequest._ENDPOINT, transportOptions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import org.springframework.data.elasticsearch.core.query.SqlQuery;
import org.springframework.data.elasticsearch.core.sql.SqlResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -88,6 +90,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
private static final Log LOGGER = LogFactory.getLog(ReactiveElasticsearchTemplate.class);

private final ReactiveElasticsearchClient client;
private final ReactiveElasticsearchSqlClient sqlClient;
private final RequestConverter requestConverter;
private final ResponseConverter responseConverter;
private final JsonpMapper jsonpMapper;
Expand All @@ -99,6 +102,7 @@ public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, Elastic
Assert.notNull(client, "client must not be null");

this.client = client;
this.sqlClient = client.sql();
this.jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(converter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
Expand Down Expand Up @@ -646,6 +650,14 @@ public BaseQueryBuilder queryBuilderWithIds(List<String> ids) {
return NativeQuery.builder().withIds(ids);
}

@Override
public Mono<SqlResponse> search(SqlQuery query) {
Assert.notNull(query, "Query must not be null.");

co.elastic.clients.elasticsearch.sql.QueryRequest request = requestConverter.sqlQueryRequest(query);
return sqlClient.query(request).onErrorMap(this::translateException).map(responseConverter::sqlResponse);
}

/**
* Callback interface to be used with {@link #execute(ReactiveElasticsearchTemplate.ClientCallback<>)} for operating
* directly on {@link ReactiveElasticsearchClient}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -530,6 +531,22 @@ public co.elastic.clients.elasticsearch.indices.GetTemplateRequest indicesGetTem
.of(gtr -> gtr.name(getTemplateRequest.getTemplateName()).flatSettings(true));
}

public co.elastic.clients.elasticsearch.sql.QueryRequest sqlQueryRequest(SqlQuery query) {
Assert.notNull(query, "Query must not be null.");

return co.elastic.clients.elasticsearch.sql.QueryRequest.of(sqb -> {
sqb.query(query.getQuery()).catalog(query.getCatalog()).columnar(query.getColumnar()).cursor(query.getCursor())
.fetchSize(query.getFetchSize()).fieldMultiValueLeniency(query.getFieldMultiValueLeniency())
.indexUsingFrozen(query.getIndexIncludeFrozen()).keepAlive(time(query.getKeepAlive()))
.keepOnCompletion(query.getKeepOnCompletion()).pageTimeout(time(query.getPageTimeout()))
.requestTimeout(time(query.getRequestTimeout()))
.waitForCompletionTimeout(time(query.getWaitForCompletionTimeout())).filter(getQuery(query.getFilter(), null))
.timeZone(Objects.toString(query.getTimeZone(), null)).format("json");

return sqb;
});
}

// endregion

// region documents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,9 @@
*/
package org.springframework.data.elasticsearch.client.elc;

import static org.springframework.data.elasticsearch.client.elc.JsonUtils.*;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;

import co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.ComponentTemplateSummary;
import co.elastic.clients.elasticsearch.cluster.GetComponentTemplateResponse;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.GetScriptResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.get_index_template.IndexTemplateItem;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.json.JsonpMapper;
import static org.springframework.data.elasticsearch.client.elc.JsonUtils.toJson;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.removePrefixFromJson;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.typeMapping;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -61,10 +45,41 @@
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.script.Script;
import org.springframework.data.elasticsearch.core.sql.SqlResponse;
import org.springframework.data.elasticsearch.support.DefaultStringObjectMap;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.ComponentTemplateSummary;
import co.elastic.clients.elasticsearch.cluster.GetComponentTemplateResponse;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.GetScriptResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.Alias;
import co.elastic.clients.elasticsearch.indices.AliasDefinition;
import co.elastic.clients.elasticsearch.indices.GetAliasResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexTemplateResponse;
import co.elastic.clients.elasticsearch.indices.GetIndicesSettingsResponse;
import co.elastic.clients.elasticsearch.indices.GetMappingResponse;
import co.elastic.clients.elasticsearch.indices.GetTemplateResponse;
import co.elastic.clients.elasticsearch.indices.IndexSettings;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.elasticsearch.indices.IndexTemplateSummary;
import co.elastic.clients.elasticsearch.indices.TemplateMapping;
import co.elastic.clients.elasticsearch.indices.get_index_template.IndexTemplateItem;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.elasticsearch.sql.QueryResponse;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;

/**
* Class to convert Elasticsearch responses into Spring Data Elasticsearch classes.
*
Expand Down Expand Up @@ -536,6 +551,29 @@ public Script scriptResponse(GetScriptResponse response) {
}
// endregion

// region sql
public SqlResponse sqlResponse(QueryResponse response) {
SqlResponse.Builder builder = SqlResponse.builder();
builder.withRunning(Boolean.TRUE.equals(response.isRunning()))
.withPartial(Boolean.TRUE.equals(response.isPartial())).withCursor(response.cursor());

final List<SqlResponse.Column> columns = response.columns().stream()
.map(column -> new SqlResponse.Column(column.name(), column.type())).toList();
builder.withColumns(columns);

for (List<JsonData> rowValues : response.rows()) {
SqlResponse.Row.Builder rowBuilder = SqlResponse.Row.builder();
for (int idx = 0; idx < rowValues.size(); idx++) {
rowBuilder.withValue(columns.get(idx), rowValues.get(idx).toJson());
}

builder.withRow(rowBuilder.build());
}

return builder.build();
}
// end region

// region helper functions

private long timeToLong(Time time) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.script.ScriptOperations;
import org.springframework.data.elasticsearch.core.sql.SqlOperations;
import org.springframework.lang.Nullable;

/**
Expand All @@ -35,7 +36,7 @@
* @author Dmitriy Yakovlev
* @author Peter-Josef Meisch
*/
public interface ElasticsearchOperations extends DocumentOperations, SearchOperations, ScriptOperations {
public interface ElasticsearchOperations extends DocumentOperations, SearchOperations, ScriptOperations, SqlOperations {

/**
* get an {@link IndexOperations} that is bound to the given class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.script.ReactiveScriptOperations;
import org.springframework.data.elasticsearch.core.sql.ReactiveSqlOperations;
import org.springframework.lang.Nullable;

/**
Expand All @@ -31,7 +32,7 @@
* @since 3.2
*/
public interface ReactiveElasticsearchOperations
extends ReactiveDocumentOperations, ReactiveSearchOperations, ReactiveScriptOperations {
extends ReactiveDocumentOperations, ReactiveSearchOperations, ReactiveScriptOperations, ReactiveSqlOperations {

/**
* Get the {@link ElasticsearchConverter} used.
Expand Down
Loading