Skip to content

Commit e5c514e

Browse files
committed
DATAES-570 - Use Delete By Query API for delete by query operations.
Original pull request: #280
1 parent be34ff8 commit e5c514e

File tree

3 files changed

+421
-640
lines changed

3 files changed

+421
-640
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+26-40
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,15 @@
2424
import java.io.BufferedReader;
2525
import java.io.IOException;
2626
import java.io.InputStreamReader;
27-
import java.util.*;
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
29+
import java.util.Iterator;
30+
import java.util.LinkedList;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.NoSuchElementException;
34+
import java.util.Optional;
35+
import java.util.Set;
2836

2937
import org.apache.http.util.EntityUtils;
3038
import org.elasticsearch.action.ActionFuture;
@@ -52,6 +60,7 @@
5260
import org.elasticsearch.action.search.SearchScrollRequest;
5361
import org.elasticsearch.action.update.UpdateRequest;
5462
import org.elasticsearch.action.update.UpdateResponse;
63+
import org.elasticsearch.client.RequestOptions;
5564
import org.elasticsearch.client.Requests;
5665
import org.elasticsearch.client.Response;
5766
import org.elasticsearch.client.RestClient;
@@ -69,6 +78,7 @@
6978
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
7079
import org.elasticsearch.index.query.QueryBuilder;
7180
import org.elasticsearch.index.query.QueryBuilders;
81+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
7282
import org.elasticsearch.search.SearchHit;
7383
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
7484
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -85,15 +95,13 @@
8595
import org.springframework.context.ApplicationContextAware;
8696
import org.springframework.core.io.ClassPathResource;
8797
import org.springframework.data.domain.Page;
88-
import org.springframework.data.domain.PageRequest;
8998
import org.springframework.data.domain.Pageable;
9099
import org.springframework.data.domain.Sort;
91100
import org.springframework.data.elasticsearch.ElasticsearchException;
92101
import org.springframework.data.elasticsearch.annotations.Document;
93102
import org.springframework.data.elasticsearch.annotations.Mapping;
94103
import org.springframework.data.elasticsearch.annotations.Setting;
95104
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
96-
import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;
97105
import org.springframework.data.elasticsearch.core.client.support.AliasData;
98106
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
99107
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
@@ -863,47 +871,24 @@ public <T> void delete(DeleteQuery deleteQuery, Class<T> clazz) {
863871
: getPersistentEntityFor(clazz).getIndexName();
864872
String typeName = hasText(deleteQuery.getType()) ? deleteQuery.getType()
865873
: getPersistentEntityFor(clazz).getIndexType();
866-
Integer pageSize = deleteQuery.getPageSize() != null ? deleteQuery.getPageSize() : 1000;
867-
Long scrollTimeInMillis = deleteQuery.getScrollTimeInMillis() != null ? deleteQuery.getScrollTimeInMillis()
868-
: 10000l;
869874

870-
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(deleteQuery.getQuery()).withIndices(indexName)
871-
.withTypes(typeName).withPageable(PageRequest.of(0, pageSize)).build();
875+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName) //
876+
.setDocTypes(typeName) //
877+
.setQuery(deleteQuery.getQuery()) //
878+
.setAbortOnVersionConflict(false) //
879+
.setRefresh(true);
872880

873-
SearchResultMapper deleteEntryResultMapper = new SearchResultMapperAdapter() {
881+
if (deleteQuery.getPageSize() != null)
882+
deleteByQueryRequest.setBatchSize(deleteQuery.getPageSize());
874883

875-
@Override
876-
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
877-
return new AggregatedPageImpl<>((List<T>) Arrays.asList(response.getHits().getHits()), response.getScrollId());
878-
}
879-
};
880-
881-
ScrolledPage<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
882-
deleteEntryResultMapper);
883-
BulkRequest request = new BulkRequest();
884-
List<SearchHit> documentsToDelete = new ArrayList<>();
885-
886-
do {
887-
documentsToDelete.addAll(scrolledResult.getContent());
888-
scrolledResult = continueScroll(scrolledResult.getScrollId(), scrollTimeInMillis,
889-
SearchHit.class, deleteEntryResultMapper);
890-
} while (scrolledResult.getContent().size() != 0);
891-
892-
for (SearchHit entry : documentsToDelete) {
893-
request.add(new DeleteRequest(entry.getIndex(), typeName, entry.getId()));
894-
}
884+
if (deleteQuery.getScrollTimeInMillis() != null)
885+
deleteByQueryRequest.setScroll(TimeValue.timeValueMillis(deleteQuery.getScrollTimeInMillis()));
895886

896-
if (request.numberOfActions() > 0) {
897-
BulkResponse response;
898-
try {
899-
response = client.bulk(request);
900-
checkForBulkUpdateFailure(response);
901-
} catch (IOException e) {
902-
throw new ElasticsearchException("Error while deleting bulk: " + request.toString(), e);
903-
}
887+
try {
888+
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
889+
} catch (IOException e) {
890+
throw new ElasticsearchException("Error for delete request: " + deleteByQueryRequest.toString(), e);
904891
}
905-
906-
clearScroll(scrolledResult.getScrollId());
907892
}
908893

909894
@Override
@@ -1475,7 +1460,8 @@ List<AliasMetaData> convertAliasResponse(String aliasResponse) {
14751460
node = node.findValue("aliases");
14761461

14771462
Map<String, AliasData> aliasData = mapper.readValue(mapper.writeValueAsString(node),
1478-
new TypeReference<Map<String, AliasData>>() {});
1463+
new TypeReference<Map<String, AliasData>>() {
1464+
});
14791465

14801466
Iterable<Map.Entry<String, AliasData>> aliasIter = aliasData.entrySet();
14811467
List<AliasMetaData> aliasMetaDataList = new ArrayList<AliasMetaData>();

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

+16-38
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.IOException;
2525
import java.io.InputStreamReader;
2626
import java.util.ArrayList;
27-
import java.util.Arrays;
2827
import java.util.HashMap;
2928
import java.util.Iterator;
3029
import java.util.LinkedList;
@@ -68,6 +67,8 @@
6867
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
6968
import org.elasticsearch.index.query.QueryBuilder;
7069
import org.elasticsearch.index.query.QueryBuilders;
70+
import org.elasticsearch.index.reindex.DeleteByQueryAction;
71+
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
7172
import org.elasticsearch.search.SearchHit;
7273
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
7374
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@@ -84,15 +85,13 @@
8485
import org.springframework.context.ApplicationContextAware;
8586
import org.springframework.core.io.ClassPathResource;
8687
import org.springframework.data.domain.Page;
87-
import org.springframework.data.domain.PageRequest;
8888
import org.springframework.data.domain.Pageable;
8989
import org.springframework.data.domain.Sort;
9090
import org.springframework.data.elasticsearch.ElasticsearchException;
9191
import org.springframework.data.elasticsearch.annotations.Document;
9292
import org.springframework.data.elasticsearch.annotations.Mapping;
9393
import org.springframework.data.elasticsearch.annotations.Setting;
9494
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
95-
import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;
9695
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
9796
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
9897
import org.springframework.data.elasticsearch.core.facet.FacetRequest;
@@ -750,41 +749,20 @@ public <T> void delete(DeleteQuery deleteQuery, Class<T> clazz) {
750749
: getPersistentEntityFor(clazz).getIndexName();
751750
String typeName = !StringUtils.isEmpty(deleteQuery.getType()) ? deleteQuery.getType()
752751
: getPersistentEntityFor(clazz).getIndexType();
753-
Integer pageSize = deleteQuery.getPageSize() != null ? deleteQuery.getPageSize() : 1000;
754-
Long scrollTimeInMillis = deleteQuery.getScrollTimeInMillis() != null ? deleteQuery.getScrollTimeInMillis()
755-
: 10000l;
756752

757-
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(deleteQuery.getQuery()).withIndices(indexName)
758-
.withTypes(typeName).withPageable(PageRequest.of(0, pageSize)).build();
753+
DeleteByQueryRequestBuilder requestBuilder = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) //
754+
.source(indexName) //
755+
.filter(deleteQuery.getQuery()) //
756+
.abortOnVersionConflict(false) //
757+
.refresh(true);
759758

760-
SearchResultMapper deleteEntryResultMapper = new SearchResultMapperAdapter() {
759+
SearchRequestBuilder source = requestBuilder.source() //
760+
.setTypes(typeName);
761761

762-
@Override
763-
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
764-
return new AggregatedPageImpl<>((List<T>) Arrays.asList(response.getHits().getHits()), response.getScrollId());
765-
}
766-
};
767-
768-
ScrolledPage<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
769-
deleteEntryResultMapper);
770-
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
771-
List<SearchHit> documentsToDelete = new ArrayList<>();
772-
773-
do {
774-
documentsToDelete.addAll(scrolledResult.getContent());
775-
scrolledResult = continueScroll(scrolledResult.getScrollId(), scrollTimeInMillis,
776-
SearchHit.class, deleteEntryResultMapper);
777-
} while (scrolledResult.getContent().size() != 0);
778-
779-
for (SearchHit entry : documentsToDelete) {
780-
bulkRequestBuilder.add(client.prepareDelete(entry.getIndex(), typeName, entry.getId()));
781-
}
782-
783-
if (bulkRequestBuilder.numberOfActions() > 0) {
784-
bulkRequestBuilder.execute().actionGet();
785-
}
762+
if (deleteQuery.getScrollTimeInMillis() != null)
763+
source.setScroll(TimeValue.timeValueMillis(deleteQuery.getScrollTimeInMillis()));
786764

787-
clearScroll(scrolledResult.getScrollId());
765+
requestBuilder.get();
788766
}
789767

790768
@Override
@@ -1136,14 +1114,14 @@ private void prepareSort(Query query, SearchRequestBuilder searchRequestBuilder)
11361114

11371115
if (FIELD_SCORE.equals(order.getProperty())) {
11381116
ScoreSortBuilder sort = SortBuilders //
1139-
.scoreSort() //
1140-
.order(sortOrder);
1117+
.scoreSort() //
1118+
.order(sortOrder);
11411119

11421120
searchRequestBuilder.addSort(sort);
11431121
} else {
11441122
FieldSortBuilder sort = SortBuilders //
1145-
.fieldSort(order.getProperty()) //
1146-
.order(sortOrder);
1123+
.fieldSort(order.getProperty()) //
1124+
.order(sortOrder);
11471125

11481126
if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) {
11491127
sort.missing("_first");

0 commit comments

Comments
 (0)