Skip to content

Commit 6ae4244

Browse files
henriqamaralsothawo
authored andcommitted
DATAES-684 Implement bulk request from reactive client
Original PR: #342 * DATAES-684 Implement bulk request from reactive client * Update src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java Co-Authored-By: Peter-Josef Meisch <[email protected]> * DATAES-684 Implement bulk request from reactive client Added author
1 parent a4b9a76 commit 6ae4244

File tree

6 files changed

+152
-0
lines changed

6 files changed

+152
-0
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+25
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
5959
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
6060
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
61+
import org.elasticsearch.action.bulk.BulkRequest;
62+
import org.elasticsearch.action.bulk.BulkResponse;
6163
import org.elasticsearch.action.delete.DeleteRequest;
6264
import org.elasticsearch.action.delete.DeleteResponse;
6365
import org.elasticsearch.action.get.GetRequest;
@@ -124,6 +126,7 @@
124126
* @author Mark Paluch
125127
* @author Peter-Josef Meisch
126128
* @author Huw Ayling-Miller
129+
* @author Henrique Amaral
127130
* @since 3.2
128131
* @see ClientConfiguration
129132
* @see ReactiveRestClients
@@ -429,6 +432,16 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
429432
.publishNext();
430433
}
431434

435+
/*
436+
* (non-Javadoc)
437+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
438+
*/
439+
@Override
440+
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
441+
return sendRequest(bulkRequest, RequestCreator.bulk(), BulkResponse.class, headers) //
442+
.publishNext();
443+
}
444+
432445
// --> INDICES
433446

434447
/*
@@ -748,6 +761,18 @@ static Function<DeleteByQueryRequest, Request> deleteByQuery() {
748761
};
749762
}
750763

764+
static Function<BulkRequest, Request> bulk() {
765+
766+
return request -> {
767+
768+
try {
769+
return RequestConverters.bulk(request);
770+
} catch (IOException e) {
771+
throw new ElasticsearchException("Could not parse request", e);
772+
}
773+
};
774+
}
775+
751776
// --> INDICES
752777

753778
static Function<GetIndexRequest, Request> indexExists() {

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+41
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

18+
import org.elasticsearch.action.bulk.BulkRequest;
19+
import org.elasticsearch.action.bulk.BulkResponse;
1820
import reactor.core.publisher.Flux;
1921
import reactor.core.publisher.Mono;
2022

@@ -57,6 +59,7 @@
5759
*
5860
* @author Christoph Strobl
5961
* @author Mark Paluch
62+
* @author Henrique Amaral
6063
* @since 3.2
6164
* @see ClientConfiguration
6265
* @see ReactiveRestClients
@@ -429,6 +432,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
429432
*/
430433
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);
431434

435+
/**
436+
* Execute a {@link BulkRequest} against the {@literal bulk} API.
437+
*
438+
* @param consumer never {@literal null}.
439+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
440+
* API on elastic.co</a>
441+
* @return a {@link Mono} emitting the emitting operation response.
442+
*/
443+
default Mono<BulkResponse> bulk(Consumer<BulkRequest> consumer) {
444+
445+
BulkRequest request = new BulkRequest();
446+
consumer.accept(request);
447+
return bulk(request);
448+
}
449+
450+
/**
451+
* Execute a {@link BulkRequest} against the {@literal bulk} API.
452+
*
453+
* @param bulkRequest must not be {@literal null}.
454+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
455+
* API on elastic.co</a>
456+
* @return a {@link Mono} emitting the emitting operation response.
457+
*/
458+
default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
459+
return bulk(HttpHeaders.EMPTY, bulkRequest);
460+
}
461+
462+
/**
463+
* Execute a {@link BulkRequest} against the {@literal bulk} API.
464+
*
465+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
466+
* @param bulkRequest must not be {@literal null}.
467+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
468+
* API on elastic.co</a>
469+
* @return a {@link Mono} emitting operation response.
470+
*/
471+
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);
472+
432473
/**
433474
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
434475
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java

+31
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import static org.assertj.core.api.Assertions.*;
1919

2020
import lombok.SneakyThrows;
21+
import org.elasticsearch.action.bulk.BulkRequest;
2122
import reactor.test.StepVerifier;
2223

2324
import java.io.IOException;
2425
import java.time.Duration;
26+
import java.util.Arrays;
2527
import java.util.Collections;
2628
import java.util.LinkedHashMap;
2729
import java.util.Map;
@@ -64,6 +66,7 @@
6466
* @author Christoph Strobl
6567
* @author Mark Paluch
6668
* @author Peter-Josef Meisch
69+
* @author Henrique Amaral
6770
*/
6871
@SpringIntegrationTest
6972
@ContextConfiguration(classes = { ElasticsearchRestTemplateConfiguration.class })
@@ -650,6 +653,34 @@ public void flushNonExistingIndex() {
650653
.verifyError(ElasticsearchStatusException.class);
651654
}
652655

656+
@Test // DATAES-684
657+
public void bulkShouldUpdateExistingDocument() {
658+
String idFirstDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
659+
String idSecondDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
660+
661+
UpdateRequest requestFirstDoc = new UpdateRequest(INDEX_I, TYPE_I, idFirstDoc) //
662+
.doc(Collections.singletonMap("dutiful", "farseer"));
663+
UpdateRequest requestSecondDoc = new UpdateRequest(INDEX_I, TYPE_I, idSecondDoc) //
664+
.doc(Collections.singletonMap("secondDocUpdate", "secondDocUpdatePartTwo"));
665+
666+
BulkRequest bulkRequest = new BulkRequest();
667+
bulkRequest.add(requestFirstDoc);
668+
bulkRequest.add(requestSecondDoc);
669+
670+
client.bulk(bulkRequest)
671+
.as(StepVerifier::create) //
672+
.consumeNextWith(it -> {
673+
assertThat(it.status()).isEqualTo(RestStatus.OK);
674+
assertThat(it.hasFailures()).isFalse();
675+
676+
Arrays.stream(it.getItems()).forEach(itemResponse-> {
677+
assertThat(itemResponse.status()).isEqualTo(RestStatus.OK);
678+
assertThat(itemResponse.getVersion()).isEqualTo(2);
679+
});
680+
})
681+
.verifyComplete();
682+
}
683+
653684
private AddToIndexOfType addSourceDocument() {
654685
return add(DOC_SOURCE);
655686
}

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java

+25
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.mockito.Mockito.*;
2121
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
2222

23+
import org.elasticsearch.action.bulk.BulkRequest;
24+
import org.elasticsearch.rest.RestStatus;
2325
import reactor.core.publisher.Mono;
2426
import reactor.test.StepVerifier;
2527

@@ -51,6 +53,7 @@
5153

5254
/**
5355
* @author Christoph Strobl
56+
* @author Henrique Amaral
5457
*/
5558
public class ReactiveElasticsearchClientUnitTests {
5659

@@ -622,4 +625,26 @@ public void scrollShouldCleanUpResourcesOnError() throws IOException {
622625
});
623626
}
624627

628+
@Test // DATAES-684
629+
public void bulkShouldEmitResponseCorrectly() {
630+
631+
hostProvider.when(HOST) //
632+
.receiveBulkOk();
633+
634+
final UpdateRequest updateRequest = new UpdateRequest("twitter", "doc", "1")
635+
.doc(Collections.singletonMap("user", "cstrobl"));
636+
final BulkRequest bulkRequest = new BulkRequest();
637+
bulkRequest.add(updateRequest);
638+
639+
client.bulk(bulkRequest)
640+
.as(StepVerifier::create) //
641+
.consumeNextWith(bulkResponse -> {
642+
643+
assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK);
644+
assertThat(bulkResponse.hasFailures()).isFalse();
645+
646+
}) //
647+
.verifyComplete();
648+
}
649+
625650
}

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java

+7
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
/**
6161
* @author Christoph Strobl
6262
* @author Huw Ayling-Miller
63+
* @author Henrique Amaral
6364
*/
6465
public class ReactiveMockClientTestsUtils {
6566

@@ -371,6 +372,12 @@ default Receive updateFail() {
371372
});
372373
}
373374

375+
default Receive receiveBulkOk() {
376+
377+
return receiveJsonFromFile("bulk-ok") //
378+
.receive(Receive::ok);
379+
}
380+
374381
}
375382

376383
public interface Receive {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"took": 30,
3+
"errors": false,
4+
"items": [
5+
{
6+
"update": {
7+
"_index": "twitter",
8+
"_type": "doc",
9+
"_id": "1",
10+
"_version": 2,
11+
"result": "updated",
12+
"_shards": {
13+
"total": 2,
14+
"successful": 1,
15+
"failed": 0
16+
},
17+
"_seq_no": 2,
18+
"_primary_term": 4
19+
}
20+
}
21+
]
22+
23+
}

0 commit comments

Comments
 (0)