Skip to content

Encapsulate client specific aggregation return types. #1921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ Check the sections on <<elasticsearch-migration-guide-4.2-4.3.deprecations>> and
* In the `org.springframework.data.elasticsearch.annotations.Document` annotation the `versionType()` property has changed to `org.springframework.data.elasticsearch.annotations.Document.VersionType`, the available enum values are the same.
* In the `org.springframework.data.elasticsearch.core.query.Query` interface the `searchType()` property has changed to `org.springframework.data.elasticsearch.core.query.Query.SearchType`, the available enum values are the same.
* In the `org.springframework.data.elasticsearch.core.query.Query` interface the return value of `timeout()` was changed to `java.time.Duration`.
* The `SearchHits<T>`class does not contain the `org.elasticsearch.search.aggregations.Aggregations` anymore.
Instead it now contains an instance of the `org.springframework.data.elasticsearch.core.AggregationsContainer<T>` class where `T` is the concrete aggregations type from the underlying client that is used.
Currently this will be a `org
.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations` object; later different implementations will be available.
The same change has been done to the `ReactiveSearchOperations.aggregate()` functions, the now return a `Flux<AggregationContainer<?>>`.
Programs using the aggregations need to be changed to cast the returned value to the appropriate class to further proces it.

=== Handling of field and sourceFilter properties of Query

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;

/**
* Aggregation container used in the Spring Data Elasticsearch API for a single aggregation. The concrete
* implementations must be provided by the code handling the direct communication with Elasticsearch.
*
* @author Peter-Josef Meisch
* @param <T> the aggregation class from the used client implementation.
* @since 4.3
*/
public interface AggregationContainer<T> {
/**
* @return the concrete aggregations implementation
*/
T aggregation();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;

/**
* Aggregations container used in the Spring Data Elasticsearch API. The concrete implementations must be provided by
* the code handling the direct communication with Elasticsearch.
*
* @author Peter-Josef Meisch
* @param <T> the aggregations class from the used client implementation.
* @since 4.3
*/
public interface AggregationsContainer<T> {
/**
* @return the concrete aggregations implementation
*/
T aggregations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.reactivestreams.Publisher;
Expand All @@ -59,6 +58,7 @@
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.core.cluster.DefaultReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
Expand Down Expand Up @@ -782,12 +782,12 @@ private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> cla
}

@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType) {
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
}

@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return doAggregate(query, entityType, index);
}

Expand All @@ -808,7 +808,7 @@ private Flux<Suggest> doSuggest(SuggestBuilder suggestion, IndexCoordinates inde
});
}

private Flux<Aggregation> doAggregate(Query query, Class<?> entityType, IndexCoordinates index) {
private Flux<AggregationContainer<?>> doAggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return Flux.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, entityType, index);
request = prepareSearchRequest(request, false);
Expand Down Expand Up @@ -872,14 +872,14 @@ protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request)
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<Aggregation> doAggregate(SearchRequest request) {
protected Flux<AggregationContainer<?>> doAggregate(SearchRequest request) {

if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doCount: {}", request);
}

return Flux.from(execute(client -> client.aggregate(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Flux.empty());
.onErrorResume(NoSuchIndexException.class, it -> Flux.empty()).map(ElasticsearchAggregation::new);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.List;

import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.springframework.data.domain.Pageable;
Expand Down Expand Up @@ -185,7 +184,7 @@ default <T> Mono<SearchPage<T>> searchForPage(Query query, Class<T> entityType,
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(Query query, Class<?> entityType);
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType);

/**
* Perform an aggregation specified by the given {@link Query query}. <br />
Expand All @@ -196,7 +195,7 @@ default <T> Mono<SearchPage<T>> searchForPage(Query query, Class<T> entityType,
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index);

/**
* Does a suggest query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.elasticsearch.search.aggregations.Aggregations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
Expand Down Expand Up @@ -95,7 +94,7 @@ private SearchHitsImpl<T> mapHitsFromResponse(SearchDocumentResponse searchDocum
SearchHit<T> hit = mapHit(document, content);
searchHits.add(hit);
}
Aggregations aggregations = searchDocumentResponse.getAggregations();
AggregationsContainer<?> aggregations = searchDocumentResponse.getAggregations();
TotalHitsRelation totalHitsRelation = TotalHitsRelation.valueOf(searchDocumentResponse.getTotalHitsRelation());

return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, searchHits, aggregations);
Expand Down Expand Up @@ -225,7 +224,7 @@ private SearchHits<?> mapInnerDocuments(SearchHits<SearchDocument> searchHits, C

/**
* find a {@link ElasticsearchPersistentEntity} following the property chain defined by the nested metadata
*
*
* @param persistentEntity base entity
* @param nestedMetaData nested metadata
* @return A {@link ElasticsearchPersistentEntityWithNestedMetaData} containing the found entity or null together with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import java.util.Iterator;
import java.util.List;

import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;

/**
* Encapsulates a list of {@link SearchHit}s with additional information from the search.
*
*
* @param <T> the result data class.
* @author Sascha Woo
* @since 4.0
Expand All @@ -35,7 +34,7 @@ public interface SearchHits<T> extends Streamable<SearchHit<T>> {
* @return the aggregations.
*/
@Nullable
Aggregations getAggregations();
AggregationsContainer<?> getAggregations();

/**
* @return the maximum score
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Collections;
import java.util.List;

import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.util.Lazy;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand All @@ -39,7 +38,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
@Nullable private final String scrollId;
private final List<? extends SearchHit<T>> searchHits;
private final Lazy<List<SearchHit<T>>> unmodifiableSearchHits;
@Nullable private final Aggregations aggregations;
@Nullable private final AggregationsContainer<?> aggregations;

/**
* @param totalHits the number of total hits for the search
Expand All @@ -50,7 +49,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
* @param aggregations the aggregations if available
*/
public SearchHitsImpl(long totalHits, TotalHitsRelation totalHitsRelation, float maxScore, @Nullable String scrollId,
List<? extends SearchHit<T>> searchHits, @Nullable Aggregations aggregations) {
List<? extends SearchHit<T>> searchHits, @Nullable AggregationsContainer<?> aggregations) {

Assert.notNull(searchHits, "searchHits must not be null");

Expand Down Expand Up @@ -113,7 +112,7 @@ public String toString() {
// region aggregations
@Override
@Nullable
public Aggregations getAggregations() {
public AggregationsContainer<?> getAggregations() {
return aggregations;
}
// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.springframework.data.elasticsearch.core;

import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;

Expand All @@ -33,7 +32,7 @@ public interface SearchHitsIterator<T> extends CloseableIterator<SearchHit<T>> {
* @return the aggregations.
*/
@Nullable
Aggregations getAggregations();
AggregationsContainer<?> getAggregations();

/**
* @return the maximum score
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -55,7 +54,7 @@ static <T> SearchHitsIterator<T> streamResults(int maxCount, SearchScrollHits<T>
Assert.notNull(continueScrollFunction, "continueScrollFunction must not be null.");
Assert.notNull(clearScrollConsumer, "clearScrollConsumer must not be null.");

Aggregations aggregations = searchHits.getAggregations();
AggregationsContainer<?> aggregations = searchHits.getAggregations();
float maxScore = searchHits.getMaxScore();
long totalHits = searchHits.getTotalHits();
TotalHitsRelation totalHitsRelation = searchHits.getTotalHitsRelation();
Expand All @@ -78,7 +77,7 @@ public void close() {

@Override
@Nullable
public Aggregations getAggregations() {
public AggregationsContainer<?> getAggregations() {
return aggregations;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.clients.elasticsearch7;

import org.elasticsearch.search.aggregations.Aggregation;
import org.springframework.data.elasticsearch.core.AggregationContainer;
import org.springframework.lang.NonNull;

/**
* AggregationContainer implementation for an Elasticsearch7 aggregation.
*
* @author Peter-Josef Meisch
* @since 4.3
*/
public class ElasticsearchAggregation implements AggregationContainer<Aggregation> {

private final Aggregation aggregation;

public ElasticsearchAggregation(Aggregation aggregation) {
this.aggregation = aggregation;
}

@NonNull
@Override
public Aggregation aggregation() {
return aggregation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.clients.elasticsearch7;

import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.lang.NonNull;

/**
* AggregationsContainer implementation for the Elasticsearch7 aggregations.
*
* @author Peter-Josef Meisch
* @since 4.3
*/
public class ElasticsearchAggregations implements AggregationsContainer<Aggregations> {

private final Aggregations aggregations;

public ElasticsearchAggregations(Aggregations aggregations) {
this.aggregations = aggregations;
}

@NonNull
@Override
public Aggregations aggregations() {
return aggregations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/**
* Classes and interfaces used by the code that uses Elasticsearch 7 client libraries
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.data.elasticsearch.core.clients.elasticsearch7;
Loading