Skip to content

Commit 7fe6f54

Browse files
committed
DATAES-631 - bulk ops refactoring.
1 parent 89de31b commit 7fe6f54

File tree

4 files changed

+66
-80
lines changed

4 files changed

+66
-80
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

+15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import java.util.List;
77
import java.util.Map;
88

9+
import org.elasticsearch.action.bulk.BulkItemResponse;
10+
import org.elasticsearch.action.bulk.BulkResponse;
911
import org.elasticsearch.common.collect.MapBuilder;
1012
import org.elasticsearch.index.query.MoreLikeThisQueryBuilder;
1113
import org.elasticsearch.index.query.QueryBuilder;
@@ -164,4 +166,17 @@ private <T> Map getDefaultSettings(ElasticsearchPersistentEntity<T> persistentEn
164166
.put("index.store.type", persistentEntity.getIndexStoreType()).map();
165167
}
166168

169+
protected void checkForBulkOperationFailure(BulkResponse bulkResponse) {
170+
if (bulkResponse.hasFailures()) {
171+
Map<String, String> failedDocuments = new HashMap<>();
172+
for (BulkItemResponse item : bulkResponse.getItems()) {
173+
if (item.isFailed())
174+
failedDocuments.put(item.getId(), item.getFailureMessage());
175+
}
176+
throw new ElasticsearchException(
177+
"Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
178+
+ failedDocuments + "]",
179+
failedDocuments);
180+
}
181+
}
167182
}

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+1-18
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@
3737
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
3838
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
3939
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
40-
import org.elasticsearch.action.bulk.BulkItemResponse;
4140
import org.elasticsearch.action.bulk.BulkRequest;
42-
import org.elasticsearch.action.bulk.BulkResponse;
4341
import org.elasticsearch.action.delete.DeleteRequest;
4442
import org.elasticsearch.action.get.GetRequest;
4543
import org.elasticsearch.action.get.GetResponse;
@@ -556,27 +554,12 @@ public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, Index
556554
private void doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
557555
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
558556
try {
559-
checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
557+
checkForBulkOperationFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
560558
} catch (IOException e) {
561559
throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e);
562560
}
563561
}
564562

565-
566-
private void checkForBulkUpdateFailure(BulkResponse bulkResponse) {
567-
if (bulkResponse.hasFailures()) {
568-
Map<String, String> failedDocuments = new HashMap<>();
569-
for (BulkItemResponse item : bulkResponse.getItems()) {
570-
if (item.isFailed())
571-
failedDocuments.put(item.getId(), item.getFailureMessage());
572-
}
573-
throw new ElasticsearchException(
574-
"Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
575-
+ failedDocuments + "]",
576-
failedDocuments);
577-
}
578-
}
579-
580563
@Override
581564
public boolean indexExists(String indexName) {
582565
GetIndexRequest request = new GetIndexRequest(indexName);

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

+5-50
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.springframework.util.CollectionUtils.*;
2121

2222
import java.util.ArrayList;
23-
import java.util.HashMap;
2423
import java.util.Iterator;
2524
import java.util.List;
2625
import java.util.Map;
@@ -34,9 +33,7 @@
3433
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
3534
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
3635
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
37-
import org.elasticsearch.action.bulk.BulkItemResponse;
3836
import org.elasticsearch.action.bulk.BulkRequestBuilder;
39-
import org.elasticsearch.action.bulk.BulkResponse;
4037
import org.elasticsearch.action.get.GetRequestBuilder;
4138
import org.elasticsearch.action.get.GetResponse;
4239
import org.elasticsearch.action.get.MultiGetRequest;
@@ -431,62 +428,20 @@ public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCo
431428
Assert.notNull(queries, "List of IndexQuery must not be null");
432429
Assert.notNull(bulkOptions, "BulkOptions must not be null");
433430

434-
BulkRequestBuilder bulkRequest = client.prepareBulk();
435-
setBulkOptions(bulkRequest, bulkOptions);
436-
queries.stream() //
437-
.map(query -> requestFactory.indexRequestBuilder(client, query, index)) //
438-
.forEach(bulkRequest::add);
439-
checkForBulkUpdateFailure(bulkRequest.execute().actionGet());
431+
doBulkOperation(queries, bulkOptions, index);
440432
}
441433

442434
@Override
443435
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
444436
Assert.notNull(queries, "List of UpdateQuery must not be null");
445437
Assert.notNull(bulkOptions, "BulkOptions must not be null");
446438

447-
BulkRequestBuilder bulkRequest = client.prepareBulk();
448-
setBulkOptions(bulkRequest, bulkOptions);
449-
queries.stream() //
450-
.map(query -> requestFactory.updateRequestBuilderFor(client, query, index)) //
451-
.forEach(bulkRequest::add);
452-
checkForBulkUpdateFailure(bulkRequest.execute().actionGet());
439+
doBulkOperation(queries, bulkOptions, index);
453440
}
454441

455-
private static void setBulkOptions(BulkRequestBuilder bulkRequest, BulkOptions bulkOptions) {
456-
457-
if (bulkOptions.getTimeout() != null) {
458-
bulkRequest.setTimeout(bulkOptions.getTimeout());
459-
}
460-
461-
if (bulkOptions.getRefreshPolicy() != null) {
462-
bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy());
463-
}
464-
465-
if (bulkOptions.getWaitForActiveShards() != null) {
466-
bulkRequest.setWaitForActiveShards(bulkOptions.getWaitForActiveShards());
467-
}
468-
469-
if (bulkOptions.getPipeline() != null) {
470-
bulkRequest.pipeline(bulkOptions.getPipeline());
471-
}
472-
473-
if (bulkOptions.getRoutingId() != null) {
474-
bulkRequest.routing(bulkOptions.getRoutingId());
475-
}
476-
}
477-
478-
private void checkForBulkUpdateFailure(BulkResponse bulkResponse) {
479-
if (bulkResponse.hasFailures()) {
480-
Map<String, String> failedDocuments = new HashMap<>();
481-
for (BulkItemResponse item : bulkResponse.getItems()) {
482-
if (item.isFailed())
483-
failedDocuments.put(item.getId(), item.getFailureMessage());
484-
}
485-
throw new ElasticsearchException(
486-
"Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
487-
+ failedDocuments + "]",
488-
failedDocuments);
489-
}
442+
private void doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
443+
BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client,queries, bulkOptions, index);
444+
checkForBulkOperationFailure(bulkRequest.execute().actionGet());
490445
}
491446

492447
@Override

src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java

+45-12
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import java.util.List;
2121
import java.util.Map;
22-
import java.util.Objects;
2322
import java.util.Optional;
2423

2524
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
2625
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2726
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
2827
import org.elasticsearch.action.bulk.BulkRequest;
28+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
2929
import org.elasticsearch.action.get.GetRequest;
3030
import org.elasticsearch.action.get.GetRequestBuilder;
3131
import org.elasticsearch.action.index.IndexRequest;
@@ -119,20 +119,53 @@ public BulkRequest bulkRequest(List<?> queries, BulkOptions bulkOptions, IndexCo
119119
bulkRequest.routing(bulkOptions.getRoutingId());
120120
}
121121

122-
queries.stream() //
123-
.map(query -> {
124-
if (query instanceof IndexQuery) {
125-
return indexRequest((IndexQuery) query, index);
126-
} else if (query instanceof UpdateQuery) {
127-
return updateRequest((UpdateQuery) query, index);
128-
}
129-
return null;
130-
}) //
131-
.filter(Objects::nonNull) //
132-
.forEach(request -> bulkRequest.add(request));
122+
queries.forEach(query -> {
123+
124+
if (query instanceof IndexQuery) {
125+
bulkRequest.add(indexRequest((IndexQuery) query, index));
126+
} else if (query instanceof UpdateQuery) {
127+
bulkRequest.add(updateRequest((UpdateQuery) query, index));
128+
}
129+
});
133130
return bulkRequest;
134131
}
135132

133+
public BulkRequestBuilder bulkRequestBuilder(Client client, List<?> queries, BulkOptions bulkOptions,
134+
IndexCoordinates index) {
135+
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
136+
137+
if (bulkOptions.getTimeout() != null) {
138+
bulkRequestBuilder.setTimeout(bulkOptions.getTimeout());
139+
}
140+
141+
if (bulkOptions.getRefreshPolicy() != null) {
142+
bulkRequestBuilder.setRefreshPolicy(bulkOptions.getRefreshPolicy());
143+
}
144+
145+
if (bulkOptions.getWaitForActiveShards() != null) {
146+
bulkRequestBuilder.setWaitForActiveShards(bulkOptions.getWaitForActiveShards());
147+
}
148+
149+
if (bulkOptions.getPipeline() != null) {
150+
bulkRequestBuilder.pipeline(bulkOptions.getPipeline());
151+
}
152+
153+
if (bulkOptions.getRoutingId() != null) {
154+
bulkRequestBuilder.routing(bulkOptions.getRoutingId());
155+
}
156+
157+
queries.forEach(query -> {
158+
159+
if (query instanceof IndexQuery) {
160+
bulkRequestBuilder.add(indexRequestBuilder(client, (IndexQuery) query, index));
161+
} else if (query instanceof UpdateQuery) {
162+
bulkRequestBuilder.add(updateRequestBuilderFor(client, (UpdateQuery) query, index));
163+
}
164+
});
165+
166+
return bulkRequestBuilder;
167+
}
168+
136169
public CreateIndexRequest createIndexRequest(String indexName, Object settings) {
137170
CreateIndexRequest request = new CreateIndexRequest(indexName);
138171
if (settings instanceof String) {

0 commit comments

Comments
 (0)