diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 8978eff23..b4c511b24 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -58,6 +58,8 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -124,6 +126,7 @@ * @author Mark Paluch * @author Peter-Josef Meisch * @author Huw Ayling-Miller + * @author Henrique Amaral * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -429,6 +432,16 @@ public Mono deleteBy(HttpHeaders headers, DeleteByQueryReq .publishNext(); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest) + */ + @Override + public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { + return sendRequest(bulkRequest, RequestCreator.bulk(), BulkResponse.class, headers) // + .publishNext(); + } + // --> INDICES /* @@ -748,6 +761,18 @@ static Function deleteByQuery() { }; } + static Function bulk() { + + return request -> { + + try { + return RequestConverters.bulk(request); + } catch (IOException e) { + throw new ElasticsearchException("Could not parse request", e); + } + }; + } + // --> INDICES static Function indexExists() { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index cbce99f30..7dd9add69 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.client.reactive; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -57,6 +59,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Henrique Amaral * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -429,6 +432,44 @@ default Mono deleteBy(DeleteByQueryRequest deleteRequest) */ Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest); + /** + * Execute a {@link BulkRequest} against the {@literal bulk} API. + * + * @param consumer never {@literal null}. + * @see Bulk + * API on elastic.co + * @return a {@link Mono} emitting the emitting operation response. + */ + default Mono bulk(Consumer consumer) { + + BulkRequest request = new BulkRequest(); + consumer.accept(request); + return bulk(request); + } + + /** + * Execute a {@link BulkRequest} against the {@literal bulk} API. + * + * @param bulkRequest must not be {@literal null}. + * @see Bulk + * API on elastic.co + * @return a {@link Mono} emitting the emitting operation response. + */ + default Mono bulk(BulkRequest bulkRequest) { + return bulk(HttpHeaders.EMPTY, bulkRequest); + } + + /** + * Execute a {@link BulkRequest} against the {@literal bulk} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param bulkRequest must not be {@literal null}. + * @see Bulk + * API on elastic.co + * @return a {@link Mono} emitting operation response. + */ + Mono bulk(HttpHeaders headers, BulkRequest bulkRequest); + /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 599948872..bdb4db25e 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -18,10 +18,12 @@ import static org.assertj.core.api.Assertions.*; import lombok.SneakyThrows; +import org.elasticsearch.action.bulk.BulkRequest; import reactor.test.StepVerifier; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -64,6 +66,7 @@ * @author Christoph Strobl * @author Mark Paluch * @author Peter-Josef Meisch + * @author Henrique Amaral */ @SpringIntegrationTest @ContextConfiguration(classes = { ElasticsearchRestTemplateConfiguration.class }) @@ -650,6 +653,34 @@ public void flushNonExistingIndex() { .verifyError(ElasticsearchStatusException.class); } + @Test // DATAES-684 + public void bulkShouldUpdateExistingDocument() { + String idFirstDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I); + String idSecondDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I); + + UpdateRequest requestFirstDoc = new UpdateRequest(INDEX_I, TYPE_I, idFirstDoc) // + .doc(Collections.singletonMap("dutiful", "farseer")); + UpdateRequest requestSecondDoc = new UpdateRequest(INDEX_I, TYPE_I, idSecondDoc) // + .doc(Collections.singletonMap("secondDocUpdate", "secondDocUpdatePartTwo")); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(requestFirstDoc); + bulkRequest.add(requestSecondDoc); + + client.bulk(bulkRequest) + .as(StepVerifier::create) // + .consumeNextWith(it -> { + assertThat(it.status()).isEqualTo(RestStatus.OK); + assertThat(it.hasFailures()).isFalse(); + + Arrays.stream(it.getItems()).forEach(itemResponse-> { + assertThat(itemResponse.status()).isEqualTo(RestStatus.OK); + assertThat(itemResponse.getVersion()).isEqualTo(2); + }); + }) + .verifyComplete(); + } + private AddToIndexOfType addSourceDocument() { return add(DOC_SOURCE); } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java index c94ea739f..4458d4514 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java @@ -20,6 +20,8 @@ import static org.mockito.Mockito.*; import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.rest.RestStatus; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -51,6 +53,7 @@ /** * @author Christoph Strobl + * @author Henrique Amaral */ public class ReactiveElasticsearchClientUnitTests { @@ -622,4 +625,26 @@ public void scrollShouldCleanUpResourcesOnError() throws IOException { }); } + @Test // DATAES-684 + public void bulkShouldEmitResponseCorrectly() { + + hostProvider.when(HOST) // + .receiveBulkOk(); + + final UpdateRequest updateRequest = new UpdateRequest("twitter", "doc", "1") + .doc(Collections.singletonMap("user", "cstrobl")); + final BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(updateRequest); + + client.bulk(bulkRequest) + .as(StepVerifier::create) // + .consumeNextWith(bulkResponse -> { + + assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK); + assertThat(bulkResponse.hasFailures()).isFalse(); + + }) // + .verifyComplete(); + } + } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java index 98b849422..aabc8282f 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java @@ -60,6 +60,7 @@ /** * @author Christoph Strobl * @author Huw Ayling-Miller + * @author Henrique Amaral */ public class ReactiveMockClientTestsUtils { @@ -371,6 +372,12 @@ default Receive updateFail() { }); } + default Receive receiveBulkOk() { + + return receiveJsonFromFile("bulk-ok") // + .receive(Receive::ok); + } + } public interface Receive { diff --git a/src/test/resources/org/springframework/data/elasticsearch/client/bulk-ok.json b/src/test/resources/org/springframework/data/elasticsearch/client/bulk-ok.json new file mode 100644 index 000000000..d782733c7 --- /dev/null +++ b/src/test/resources/org/springframework/data/elasticsearch/client/bulk-ok.json @@ -0,0 +1,23 @@ +{ + "took": 30, + "errors": false, + "items": [ + { + "update": { + "_index": "twitter", + "_type": "doc", + "_id": "1", + "_version": 2, + "result": "updated", + "_shards": { + "total": 2, + "successful": 1, + "failed": 0 + }, + "_seq_no": 2, + "_primary_term": 4 + } + } + ] + +} \ No newline at end of file