Skip to content

Adapt custom routing. #2474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates inde

Assert.notNull(query, "query must not be null");

DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, clazz, index,
getRefreshPolicy());
DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(),
clazz, index, getRefreshPolicy());

DeleteByQueryResponse response = execute(client -> client.deleteByQuery(request));

Expand Down Expand Up @@ -309,7 +309,8 @@ public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index)
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true);
SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index,
true);

SearchResponse<EntityAsMap> searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class));

Expand All @@ -331,7 +332,8 @@ public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates in
}

protected <T> SearchHits<T> doSearch(Query query, Class<T> clazz, IndexCoordinates index) {
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index,
false);
SearchResponse<EntityAsMap> searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class));

// noinspection DuplicatedCode
Expand All @@ -343,7 +345,7 @@ protected <T> SearchHits<T> doSearch(Query query, Class<T> clazz, IndexCoordinat
}

protected <T> SearchHits<T> doSearch(SearchTemplateQuery query, Class<T> clazz, IndexCoordinates index) {
var searchTemplateRequest = requestConverter.searchTemplate(query, index);
var searchTemplateRequest = requestConverter.searchTemplate(query, routingResolver.getRouting(), index);
var searchTemplateResponse = execute(client -> client.searchTemplate(searchTemplateRequest, EntityAsMap.class));

// noinspection DuplicatedCode
Expand Down Expand Up @@ -374,7 +376,8 @@ public <T> SearchScrollHits<T> searchScrollStart(long scrollTimeInMillis, Query
Assert.notNull(query, "query must not be null");
Assert.notNull(query.getPageable(), "pageable of query must not be null.");

SearchRequest request = requestConverter.searchRequest(query, clazz, index, false, scrollTimeInMillis);
SearchRequest request = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, false,
scrollTimeInMillis);
SearchResponse<EntityAsMap> response = execute(client -> client.search(request, EntityAsMap.class));

return getSearchScrollHits(clazz, index, response);
Expand Down Expand Up @@ -492,7 +495,8 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
@SuppressWarnings({ "unchecked", "rawtypes" })
private List<SearchHits<?>> doMultiSearch(List<MultiSearchQueryParameter> multiSearchQueryParameters) {

MsearchRequest request = requestConverter.searchMsearchRequest(multiSearchQueryParameters);
MsearchRequest request = requestConverter.searchMsearchRequest(multiSearchQueryParameters,
routingResolver.getRouting());

MsearchResponse<EntityAsMap> msearchResponse = execute(client -> client.msearch(request, EntityAsMap.class));
List<MultiSearchResponseItem<EntityAsMap>> responseItems = msearchResponse.responses();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ protected <T> Mono<Tuple2<T, IndexResponseMetaData>> doIndex(T entity, IndexCoor
return Mono.just(entity) //
.zipWith(//
Mono.from(execute((ClientCallback<Publisher<IndexResponse>>) client -> client.index(indexRequest))) //
.map(indexResponse -> new IndexResponseMetaData(
indexResponse.id(), //
.map(indexResponse -> new IndexResponseMetaData(indexResponse.id(), //
indexResponse.index(), //
indexResponse.seqNo(), //
indexResponse.primaryTerm(), //
Expand Down Expand Up @@ -171,8 +170,8 @@ public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoord

Assert.notNull(query, "query must not be null");

DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
getRefreshPolicy());
DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(),
entityType, index, getRefreshPolicy());
return Mono
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
.map(responseConverter::byQueryResponse);
Expand Down Expand Up @@ -391,7 +390,8 @@ private Flux<SearchDocument> doFindUnbounded(Query query, Class<?> clazz, IndexC

baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive));
baseQuery.addSort(Sort.by("_shard_doc"));
SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true);
SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(),
clazz, index, false, true);

return Mono.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client
.search(firstSearchRequest, EntityAsMap.class))).expand(entityAsMapSearchResponse -> {
Expand All @@ -404,7 +404,8 @@ private Flux<SearchDocument> doFindUnbounded(Query query, Class<?> clazz, IndexC
List<Object> sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject)
.collect(Collectors.toList());
baseQuery.setSearchAfter(sortOptions);
SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true);
SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery,
routingResolver.getRouting(), clazz, index, false, true);
return Mono.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client
.search(followSearchRequest, EntityAsMap.class)));
});
Expand Down Expand Up @@ -460,7 +461,8 @@ protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true);
SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), entityType, index,
true);

return Mono
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
Expand All @@ -470,7 +472,8 @@ protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates

private Flux<SearchDocument> doFindBounded(Query query, Class<?> clazz, IndexCoordinates index) {

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index,
false, false);

return Mono
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
Expand All @@ -481,7 +484,7 @@ private Flux<SearchDocument> doFindBounded(Query query, Class<?> clazz, IndexCoo

private Flux<SearchDocument> doSearch(SearchTemplateQuery query, Class<?> clazz, IndexCoordinates index) {

var request = requestConverter.searchTemplate(query, index);
var request = requestConverter.searchTemplate(query, routingResolver.getRouting(), index);

return Mono
.from(execute((ClientCallback<Publisher<SearchTemplateResponse<EntityAsMap>>>) client -> client
Expand All @@ -496,7 +499,8 @@ protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index,
false);

// noinspection unchecked
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>((Class<T>) clazz, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,8 @@ public DeleteRequest documentDeleteRequest(String id, @Nullable String routing,
});
}

public DeleteByQueryRequest documentDeleteByQueryRequest(Query query, Class<?> clazz, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy) {
public DeleteByQueryRequest documentDeleteByQueryRequest(Query query, @Nullable String routing, Class<?> clazz,
IndexCoordinates index, @Nullable RefreshPolicy refreshPolicy) {

Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");
Expand All @@ -857,6 +857,8 @@ public DeleteByQueryRequest documentDeleteByQueryRequest(Query query, Class<?> c

if (query.getRoute() != null) {
b.routing(query.getRoute());
} else if (StringUtils.hasText(routing)) {
b.routing(routing);
}

return b;
Expand Down Expand Up @@ -998,45 +1000,53 @@ public UpdateByQueryRequest documentUpdateByQueryRequest(UpdateQuery updateQuery

// region search

public <T> SearchRequest searchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
boolean forCount) {
return searchRequest(query, clazz, indexCoordinates, forCount, false, null);
public <T> SearchRequest searchRequest(Query query, @Nullable String routing, @Nullable Class<T> clazz,
IndexCoordinates indexCoordinates, boolean forCount) {
return searchRequest(query, routing, clazz, indexCoordinates, forCount, false, null);
}

public <T> SearchRequest searchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
boolean forCount, long scrollTimeInMillis) {
return searchRequest(query, clazz, indexCoordinates, forCount, true, scrollTimeInMillis);
public <T> SearchRequest searchRequest(Query query, @Nullable String routing, @Nullable Class<T> clazz,
IndexCoordinates indexCoordinates, boolean forCount, long scrollTimeInMillis) {
return searchRequest(query, routing, clazz, indexCoordinates, forCount, true, scrollTimeInMillis);
}

public <T> SearchRequest searchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
boolean forCount, boolean forBatchedSearch) {
return searchRequest(query, clazz, indexCoordinates, forCount, forBatchedSearch, null);
public <T> SearchRequest searchRequest(Query query, @Nullable String routing, @Nullable Class<T> clazz,
IndexCoordinates indexCoordinates, boolean forCount, boolean forBatchedSearch) {
return searchRequest(query, routing, clazz, indexCoordinates, forCount, forBatchedSearch, null);
}

public <T> SearchRequest searchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
boolean forCount, boolean forBatchedSearch, @Nullable Long scrollTimeInMillis) {
public <T> SearchRequest searchRequest(Query query, @Nullable String routing, @Nullable Class<T> clazz,
IndexCoordinates indexCoordinates, boolean forCount, boolean forBatchedSearch,
@Nullable Long scrollTimeInMillis) {

String[] indexNames = indexCoordinates.getIndexNames();
Assert.notNull(query, "query must not be null");
Assert.notNull(indexCoordinates, "indexCoordinates must not be null");

elasticsearchConverter.updateQuery(query, clazz);
SearchRequest.Builder builder = new SearchRequest.Builder();
prepareSearchRequest(query, clazz, indexCoordinates, builder, forCount, forBatchedSearch);
prepareSearchRequest(query, routing, clazz, indexCoordinates, builder, forCount, forBatchedSearch);

if (scrollTimeInMillis != null) {
builder.scroll(t -> t.time(scrollTimeInMillis + "ms"));
}

builder.query(getQuery(query, clazz));

if (StringUtils.hasText(query.getRoute())) {
builder.routing(query.getRoute());
}
if (StringUtils.hasText(routing)) {
builder.routing(routing);
}

addFilter(query, builder);

return builder.build();
}

public MsearchRequest searchMsearchRequest(
List<ElasticsearchTemplate.MultiSearchQueryParameter> multiSearchQueryParameters) {
List<ElasticsearchTemplate.MultiSearchQueryParameter> multiSearchQueryParameters, @Nullable String routing) {

// basically the same stuff as in prepareSearchRequest, but the new Elasticsearch has different builders for a
// normal search and msearch
Expand All @@ -1049,11 +1059,16 @@ public MsearchRequest searchMsearchRequest(
.header(h -> {
h //
.index(Arrays.asList(param.index().getIndexNames())) //
.routing(query.getRoute()) //
.searchType(searchType(query.getSearchType())) //
.requestCache(query.getRequestCache()) //
;

if (StringUtils.hasText(query.getRoute())) {
h.routing(query.getRoute());
} else if (StringUtils.hasText(routing)) {
h.routing(routing);
}

if (query.getPreference() != null) {
h.preference(query.getPreference());
}
Expand Down Expand Up @@ -1156,8 +1171,8 @@ public MsearchRequest searchMsearchRequest(
});
}

private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
SearchRequest.Builder builder, boolean forCount, boolean forBatchedSearch) {
private <T> void prepareSearchRequest(Query query, @Nullable String routing, @Nullable Class<T> clazz,
IndexCoordinates indexCoordinates, SearchRequest.Builder builder, boolean forCount, boolean forBatchedSearch) {

String[] indexNames = indexCoordinates.getIndexNames();

Expand Down Expand Up @@ -1190,6 +1205,8 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind

if (query.getRoute() != null) {
builder.routing(query.getRoute());
} else if (StringUtils.hasText(routing)) {
builder.routing(routing);
}

if (query.getPreference() != null) {
Expand Down Expand Up @@ -1559,7 +1576,8 @@ public ClosePointInTimeRequest searchClosePointInTime(String pit) {
return ClosePointInTimeRequest.of(cpit -> cpit.id(pit));
}

public SearchTemplateRequest searchTemplate(SearchTemplateQuery query, IndexCoordinates index) {
public SearchTemplateRequest searchTemplate(SearchTemplateQuery query, @Nullable String routing,
IndexCoordinates index) {

Assert.notNull(query, "query must not be null");

Expand All @@ -1570,10 +1588,15 @@ public SearchTemplateRequest searchTemplate(SearchTemplateQuery query, IndexCoor
.id(query.getId()) //
.index(Arrays.asList(index.getIndexNames())) //
.preference(query.getPreference()) //
.routing(query.getRoute()) //
.searchType(searchType(query.getSearchType())).source(query.getSource()) //
;

if (query.getRoute() != null) {
builder.routing(query.getRoute());
} else if (StringUtils.hasText(routing)) {
builder.routing(routing);
}

var expandWildcards = query.getExpandWildcards();
if (!expandWildcards.isEmpty()) {
builder.expandWildcards(expandWildcards(expandWildcards));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ protected String doDelete(String id, @Nullable String routing, IndexCoordinates

@Override
public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index) {
DeleteByQueryRequest deleteByQueryRequest = requestFactory.deleteByQueryRequest(query, clazz, index);
DeleteByQueryRequest deleteByQueryRequest = requestFactory.deleteByQueryRequest(query, routingResolver.getRouting(),
clazz, index);
return ResponseConverter
.byQueryResponseOf(execute(client -> client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT)));
}
Expand Down Expand Up @@ -398,7 +399,7 @@ public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index)

final Boolean trackTotalHits = query.getTrackTotalHits();
query.setTrackTotalHits(true);
SearchRequest searchRequest = requestFactory.searchRequest(query, clazz, index);
SearchRequest searchRequest = requestFactory.searchRequest(query, routingResolver.getRouting(), clazz, index);
query.setTrackTotalHits(trackTotalHits);

searchRequest.source().size(0);
Expand All @@ -409,7 +410,7 @@ public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index)

@Override
public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates index) {
SearchRequest searchRequest = requestFactory.searchRequest(query, clazz, index);
SearchRequest searchRequest = requestFactory.searchRequest(query, routingResolver.getRouting(), clazz, index);
SearchResponse response = execute(client -> client.search(searchRequest, RequestOptions.DEFAULT));

ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
Expand All @@ -431,7 +432,7 @@ public <T> SearchScrollHits<T> searchScrollStart(long scrollTimeInMillis, Query

Assert.notNull(query.getPageable(), "pageable of query must not be null.");

SearchRequest searchRequest = requestFactory.searchRequest(query, clazz, index);
SearchRequest searchRequest = requestFactory.searchRequest(query, routingResolver.getRouting(), clazz, index);
searchRequest.scroll(TimeValue.timeValueMillis(scrollTimeInMillis));

SearchResponse response = execute(client -> client.search(searchRequest, RequestOptions.DEFAULT));
Expand Down Expand Up @@ -477,7 +478,7 @@ public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index)
public <T> List<SearchHits<T>> multiSearch(List<? extends Query> queries, Class<T> clazz, IndexCoordinates index) {
MultiSearchRequest request = new MultiSearchRequest();
for (Query query : queries) {
request.add(requestFactory.searchRequest(query, clazz, index));
request.add(requestFactory.searchRequest(query, routingResolver.getRouting(), clazz, index));
}

MultiSearchResponse.Item[] items = getMultiSearchResult(request);
Expand All @@ -504,7 +505,8 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
Iterator<Class<?>> it = classes.iterator();
for (Query query : queries) {
Class<?> clazz = it.next();
request.add(requestFactory.searchRequest(query, clazz, getIndexCoordinatesFor(clazz)));
request
.add(requestFactory.searchRequest(query, routingResolver.getRouting(), clazz, getIndexCoordinatesFor(clazz)));
}

MultiSearchResponse.Item[] items = getMultiSearchResult(request);
Expand Down Expand Up @@ -538,7 +540,7 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
MultiSearchRequest request = new MultiSearchRequest();
Iterator<Class<?>> it = classes.iterator();
for (Query query : queries) {
request.add(requestFactory.searchRequest(query, it.next(), index));
request.add(requestFactory.searchRequest(query, routingResolver.getRouting(), it.next(), index));
}

MultiSearchResponse.Item[] items = getMultiSearchResult(request);
Expand Down Expand Up @@ -572,7 +574,7 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
Iterator<Class<?>> it = classes.iterator();
Iterator<IndexCoordinates> indexesIt = indexes.iterator();
for (Query query : queries) {
request.add(requestFactory.searchRequest(query, it.next(), indexesIt.next()));
request.add(requestFactory.searchRequest(query, routingResolver.getRouting(), it.next(), indexesIt.next()));
}

MultiSearchResponse.Item[] items = getMultiSearchResult(request);
Expand Down
Loading