Skip to content

DATAES-570 - Use Delete By Query API for delete by query operations. #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionFuture;
Expand Down Expand Up @@ -52,6 +60,7 @@
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand All @@ -69,6 +78,7 @@
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -85,15 +95,13 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;
import org.springframework.data.elasticsearch.core.client.support.AliasData;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
Expand Down Expand Up @@ -863,47 +871,24 @@ public <T> void delete(DeleteQuery deleteQuery, Class<T> clazz) {
: getPersistentEntityFor(clazz).getIndexName();
String typeName = hasText(deleteQuery.getType()) ? deleteQuery.getType()
: getPersistentEntityFor(clazz).getIndexType();
Integer pageSize = deleteQuery.getPageSize() != null ? deleteQuery.getPageSize() : 1000;
Long scrollTimeInMillis = deleteQuery.getScrollTimeInMillis() != null ? deleteQuery.getScrollTimeInMillis()
: 10000l;

SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(deleteQuery.getQuery()).withIndices(indexName)
.withTypes(typeName).withPageable(PageRequest.of(0, pageSize)).build();
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName) //
.setDocTypes(typeName) //
.setQuery(deleteQuery.getQuery()) //
.setAbortOnVersionConflict(false) //
.setRefresh(true);

SearchResultMapper deleteEntryResultMapper = new SearchResultMapperAdapter() {
if (deleteQuery.getPageSize() != null)
deleteByQueryRequest.setBatchSize(deleteQuery.getPageSize());

@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
return new AggregatedPageImpl<>((List<T>) Arrays.asList(response.getHits().getHits()), response.getScrollId());
}
};

ScrolledPage<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
deleteEntryResultMapper);
BulkRequest request = new BulkRequest();
List<SearchHit> documentsToDelete = new ArrayList<>();

do {
documentsToDelete.addAll(scrolledResult.getContent());
scrolledResult = continueScroll(scrolledResult.getScrollId(), scrollTimeInMillis,
SearchHit.class, deleteEntryResultMapper);
} while (scrolledResult.getContent().size() != 0);

for (SearchHit entry : documentsToDelete) {
request.add(new DeleteRequest(entry.getIndex(), typeName, entry.getId()));
}
if (deleteQuery.getScrollTimeInMillis() != null)
deleteByQueryRequest.setScroll(TimeValue.timeValueMillis(deleteQuery.getScrollTimeInMillis()));

if (request.numberOfActions() > 0) {
BulkResponse response;
try {
response = client.bulk(request);
checkForBulkUpdateFailure(response);
} catch (IOException e) {
throw new ElasticsearchException("Error while deleting bulk: " + request.toString(), e);
}
try {
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new ElasticsearchException("Error for delete request: " + deleteByQueryRequest.toString(), e);
}

clearScroll(scrolledResult.getScrollId());
}

@Override
Expand Down Expand Up @@ -1475,7 +1460,8 @@ List<AliasMetaData> convertAliasResponse(String aliasResponse) {
node = node.findValue("aliases");

Map<String, AliasData> aliasData = mapper.readValue(mapper.writeValueAsString(node),
new TypeReference<Map<String, AliasData>>() {});
new TypeReference<Map<String, AliasData>>() {
});

Iterable<Map.Entry<String, AliasData>> aliasIter = aliasData.entrySet();
List<AliasMetaData> aliasMetaDataList = new ArrayList<AliasMetaData>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -68,6 +67,8 @@
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
Expand All @@ -84,15 +85,13 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.facet.FacetRequest;
Expand Down Expand Up @@ -750,41 +749,20 @@ public <T> void delete(DeleteQuery deleteQuery, Class<T> clazz) {
: getPersistentEntityFor(clazz).getIndexName();
String typeName = !StringUtils.isEmpty(deleteQuery.getType()) ? deleteQuery.getType()
: getPersistentEntityFor(clazz).getIndexType();
Integer pageSize = deleteQuery.getPageSize() != null ? deleteQuery.getPageSize() : 1000;
Long scrollTimeInMillis = deleteQuery.getScrollTimeInMillis() != null ? deleteQuery.getScrollTimeInMillis()
: 10000l;

SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(deleteQuery.getQuery()).withIndices(indexName)
.withTypes(typeName).withPageable(PageRequest.of(0, pageSize)).build();
DeleteByQueryRequestBuilder requestBuilder = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) //
.source(indexName) //
.filter(deleteQuery.getQuery()) //
.abortOnVersionConflict(false) //
.refresh(true);

SearchResultMapper deleteEntryResultMapper = new SearchResultMapperAdapter() {
SearchRequestBuilder source = requestBuilder.source() //
.setTypes(typeName);

@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
return new AggregatedPageImpl<>((List<T>) Arrays.asList(response.getHits().getHits()), response.getScrollId());
}
};

ScrolledPage<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
deleteEntryResultMapper);
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
List<SearchHit> documentsToDelete = new ArrayList<>();

do {
documentsToDelete.addAll(scrolledResult.getContent());
scrolledResult = continueScroll(scrolledResult.getScrollId(), scrollTimeInMillis,
SearchHit.class, deleteEntryResultMapper);
} while (scrolledResult.getContent().size() != 0);

for (SearchHit entry : documentsToDelete) {
bulkRequestBuilder.add(client.prepareDelete(entry.getIndex(), typeName, entry.getId()));
}

if (bulkRequestBuilder.numberOfActions() > 0) {
bulkRequestBuilder.execute().actionGet();
}
if (deleteQuery.getScrollTimeInMillis() != null)
source.setScroll(TimeValue.timeValueMillis(deleteQuery.getScrollTimeInMillis()));

clearScroll(scrolledResult.getScrollId());
requestBuilder.get();
}

@Override
Expand Down Expand Up @@ -1136,14 +1114,14 @@ private void prepareSort(Query query, SearchRequestBuilder searchRequestBuilder)

if (FIELD_SCORE.equals(order.getProperty())) {
ScoreSortBuilder sort = SortBuilders //
.scoreSort() //
.order(sortOrder);
.scoreSort() //
.order(sortOrder);

searchRequestBuilder.addSort(sort);
} else {
FieldSortBuilder sort = SortBuilders //
.fieldSort(order.getProperty()) //
.order(sortOrder);
.fieldSort(order.getProperty()) //
.order(sortOrder);

if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) {
sort.missing("_first");
Expand Down
Loading