Skip to content

Commit 89de31b

Browse files
committed
DATAES-631 - bulk ops refactoring.
1 parent ae9955c commit 89de31b

File tree

4 files changed

+290
-296
lines changed

4 files changed

+290
-296
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+11-57
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
import static org.elasticsearch.client.Requests.*;
1919
import static org.elasticsearch.index.query.QueryBuilders.*;
20-
import static org.springframework.util.CollectionUtils.isEmpty;
21-
import static org.springframework.util.StringUtils.*;
20+
import static org.springframework.util.CollectionUtils.*;
2221

2322
import java.io.IOException;
2423
import java.util.ArrayList;
@@ -71,7 +70,6 @@
7170
import org.elasticsearch.common.xcontent.XContentBuilder;
7271
import org.elasticsearch.common.xcontent.XContentType;
7372
import org.elasticsearch.index.VersionType;
74-
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
7573
import org.elasticsearch.index.query.QueryBuilder;
7674
import org.elasticsearch.index.query.QueryBuilders;
7775
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@@ -531,7 +529,7 @@ public String index(IndexQuery query, IndexCoordinates index) {
531529

532530
@Override
533531
public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
534-
UpdateRequest request = requestFactory.updateRequestFor(query, index);
532+
UpdateRequest request = requestFactory.updateRequest(query, index);
535533
try {
536534
return client.update(request, RequestOptions.DEFAULT);
537535
} catch (IOException e) {
@@ -541,62 +539,29 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
541539

542540
@Override
543541
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
544-
545542
Assert.notNull(queries, "List of IndexQuery must not be null");
546543
Assert.notNull(bulkOptions, "BulkOptions must not be null");
547544

548-
BulkRequest bulkRequest = new BulkRequest();
549-
setBulkOptions(bulkRequest, bulkOptions);
550-
queries.stream() //
551-
.map(query -> requestFactory.indexRequest(query, index)) //
552-
.forEach(bulkRequest::add);
553-
try {
554-
checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
555-
} catch (IOException e) {
556-
throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e);
557-
}
545+
doBulkOperation(queries, bulkOptions, index);
558546
}
559547

560548
@Override
561549
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
562-
563550
Assert.notNull(queries, "List of UpdateQuery must not be null");
564551
Assert.notNull(bulkOptions, "BulkOptions must not be null");
565552

566-
BulkRequest bulkRequest = new BulkRequest();
567-
setBulkOptions(bulkRequest, bulkOptions);
568-
for (UpdateQuery query : queries) {
569-
bulkRequest.add(requestFactory.updateRequestFor(query, index));
570-
}
553+
doBulkOperation(queries, bulkOptions, index);
554+
}
555+
556+
private void doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
557+
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
571558
try {
572559
checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
573560
} catch (IOException e) {
574561
throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e);
575562
}
576563
}
577564

578-
private static void setBulkOptions(BulkRequest bulkRequest, BulkOptions bulkOptions) {
579-
580-
if (bulkOptions.getTimeout() != null) {
581-
bulkRequest.timeout(bulkOptions.getTimeout());
582-
}
583-
584-
if (bulkOptions.getRefreshPolicy() != null) {
585-
bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy());
586-
}
587-
588-
if (bulkOptions.getWaitForActiveShards() != null) {
589-
bulkRequest.waitForActiveShards(bulkOptions.getWaitForActiveShards());
590-
}
591-
592-
if (bulkOptions.getPipeline() != null) {
593-
bulkRequest.pipeline(bulkOptions.getPipeline());
594-
}
595-
596-
if (bulkOptions.getRoutingId() != null) {
597-
bulkRequest.routing(bulkOptions.getRoutingId());
598-
}
599-
}
600565

601566
private void checkForBulkUpdateFailure(BulkResponse bulkResponse) {
602567
if (bulkResponse.hasFailures()) {
@@ -690,21 +655,10 @@ private SearchRequest prepareScroll(Query query, long scrollTimeInMillis,
690655
request.indicesOptions(query.getIndicesOptions());
691656
}
692657

693-
if (query instanceof NativeSearchQuery) {
694-
NativeSearchQuery searchQuery = (NativeSearchQuery) query;
658+
HighlightBuilder highlightBuilder = requestFactory.highlightBuilder(query);
695659

696-
if (searchQuery.getHighlightFields() != null || searchQuery.getHighlightBuilder() != null) {
697-
HighlightBuilder highlightBuilder = searchQuery.getHighlightBuilder();
698-
if (highlightBuilder == null) {
699-
highlightBuilder = new HighlightBuilder();
700-
}
701-
if (searchQuery.getHighlightFields() != null) {
702-
for (HighlightBuilder.Field highlightField : searchQuery.getHighlightFields()) {
703-
highlightBuilder.field(highlightField);
704-
}
705-
}
706-
searchSourceBuilder.highlighter(highlightBuilder);
707-
}
660+
if (highlightBuilder != null) {
661+
searchSourceBuilder.highlighter(highlightBuilder);
708662
}
709663

710664
request.source(searchSourceBuilder);

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

+6-21
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.elasticsearch.common.unit.TimeValue;
5656
import org.elasticsearch.common.xcontent.XContentBuilder;
5757
import org.elasticsearch.common.xcontent.XContentType;
58-
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
5958
import org.elasticsearch.index.query.QueryBuilder;
6059
import org.elasticsearch.index.query.QueryBuilders;
6160
import org.elasticsearch.search.SearchHit;
@@ -88,7 +87,6 @@
8887
import org.springframework.data.util.CloseableIterator;
8988
import org.springframework.lang.Nullable;
9089
import org.springframework.util.Assert;
91-
import org.springframework.util.StringUtils;
9290

9391
/**
9492
* ElasticsearchTemplate
@@ -430,7 +428,6 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
430428

431429
@Override
432430
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
433-
434431
Assert.notNull(queries, "List of IndexQuery must not be null");
435432
Assert.notNull(bulkOptions, "BulkOptions must not be null");
436433

@@ -444,15 +441,14 @@ public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCo
444441

445442
@Override
446443
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
447-
448444
Assert.notNull(queries, "List of UpdateQuery must not be null");
449445
Assert.notNull(bulkOptions, "BulkOptions must not be null");
450446

451447
BulkRequestBuilder bulkRequest = client.prepareBulk();
452448
setBulkOptions(bulkRequest, bulkOptions);
453-
for (UpdateQuery query : queries) {
454-
bulkRequest.add(requestFactory.updateRequestBuilderFor(client, query, index));
455-
}
449+
queries.stream() //
450+
.map(query -> requestFactory.updateRequestBuilderFor(client, query, index)) //
451+
.forEach(bulkRequest::add);
456452
checkForBulkUpdateFailure(bulkRequest.execute().actionGet());
457453
}
458454

@@ -549,21 +545,10 @@ private SearchRequestBuilder prepareScroll(Query query, long scrollTimeInMillis,
549545
requestBuilder.setIndicesOptions(query.getIndicesOptions());
550546
}
551547

552-
if (query instanceof NativeSearchQuery) {
553-
NativeSearchQuery searchQuery = (NativeSearchQuery) query;
548+
HighlightBuilder highlightBuilder = requestFactory.highlightBuilder(query);
554549

555-
if (searchQuery.getHighlightFields() != null || searchQuery.getHighlightBuilder() != null) {
556-
HighlightBuilder highlightBuilder = searchQuery.getHighlightBuilder();
557-
if (highlightBuilder == null) {
558-
highlightBuilder = new HighlightBuilder();
559-
}
560-
if (searchQuery.getHighlightFields() != null) {
561-
for (HighlightBuilder.Field highlightField : searchQuery.getHighlightFields()) {
562-
highlightBuilder.field(highlightField);
563-
}
564-
}
565-
requestBuilder.highlighter(highlightBuilder);
566-
}
550+
if (highlightBuilder != null) {
551+
requestBuilder.highlighter(highlightBuilder);
567552
}
568553

569554
return requestBuilder;

0 commit comments

Comments
 (0)