Skip to content

Commit f82dd22

Browse files
henriqamaralsothawo
authored andcommitted
DATAES-684 Implement bulk request from reactive client
Original PR: #342 * DATAES-684 Implement bulk request from reactive client * Update src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java Co-Authored-By: Peter-Josef Meisch <[email protected]> * DATAES-684 Implement bulk request from reactive client Added author (cherry picked from commit 6ae4244)
1 parent 3b833f6 commit f82dd22

File tree

6 files changed

+152
-2
lines changed

6 files changed

+152
-2
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+25
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
5959
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
6060
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
61+
import org.elasticsearch.action.bulk.BulkRequest;
62+
import org.elasticsearch.action.bulk.BulkResponse;
6163
import org.elasticsearch.action.delete.DeleteRequest;
6264
import org.elasticsearch.action.delete.DeleteResponse;
6365
import org.elasticsearch.action.get.GetRequest;
@@ -122,6 +124,7 @@
122124
*
123125
* @author Christoph Strobl
124126
* @author Mark Paluch
127+
* @author Henrique Amaral
125128
* @since 3.2
126129
* @see ClientConfiguration
127130
* @see ReactiveRestClients
@@ -423,6 +426,16 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
423426
.publishNext();
424427
}
425428

429+
/*
430+
* (non-Javadoc)
431+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
432+
*/
433+
@Override
434+
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
435+
return sendRequest(bulkRequest, RequestCreator.bulk(), BulkResponse.class, headers) //
436+
.publishNext();
437+
}
438+
426439
// --> INDICES
427440

428441
/*
@@ -742,6 +755,18 @@ static Function<DeleteByQueryRequest, Request> deleteByQuery() {
742755
};
743756
}
744757

758+
static Function<BulkRequest, Request> bulk() {
759+
760+
return request -> {
761+
762+
try {
763+
return RequestConverters.bulk(request);
764+
} catch (IOException e) {
765+
throw new ElasticsearchException("Could not parse request", e);
766+
}
767+
};
768+
}
769+
745770
// --> INDICES
746771

747772
static Function<GetIndexRequest, Request> indexExists() {

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+41
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

18+
import org.elasticsearch.action.bulk.BulkRequest;
19+
import org.elasticsearch.action.bulk.BulkResponse;
1820
import reactor.core.publisher.Flux;
1921
import reactor.core.publisher.Mono;
2022

@@ -58,6 +60,7 @@
5860
*
5961
* @author Christoph Strobl
6062
* @author Mark Paluch
63+
* @author Henrique Amaral
6164
* @since 3.2
6265
* @see ClientConfiguration
6366
* @see ReactiveRestClients
@@ -430,6 +433,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
430433
*/
431434
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);
432435

436+
/**
437+
* Execute a {@link BulkRequest} against the {@literal bulk} API.
438+
*
439+
* @param consumer never {@literal null}.
440+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
441+
* API on elastic.co</a>
442+
* @return a {@link Mono} emitting the emitting operation response.
443+
*/
444+
default Mono<BulkResponse> bulk(Consumer<BulkRequest> consumer) {
445+
446+
BulkRequest request = new BulkRequest();
447+
consumer.accept(request);
448+
return bulk(request);
449+
}
450+
451+
/**
452+
* Execute a {@link BulkRequest} against the {@literal bulk} API.
453+
*
454+
* @param bulkRequest must not be {@literal null}.
455+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
456+
* API on elastic.co</a>
457+
* @return a {@link Mono} emitting the emitting operation response.
458+
*/
459+
default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
460+
return bulk(HttpHeaders.EMPTY, bulkRequest);
461+
}
462+
463+
/**
464+
* Execute a {@link BulkRequest} against the {@literal bulk} API.
465+
*
466+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
467+
* @param bulkRequest must not be {@literal null}.
468+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
469+
* API on elastic.co</a>
470+
* @return a {@link Mono} emitting operation response.
471+
*/
472+
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);
473+
433474
/**
434475
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
435476
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import static org.assertj.core.api.Assertions.*;
1919

2020
import lombok.SneakyThrows;
21+
import org.elasticsearch.action.bulk.BulkRequest;
2122
import reactor.test.StepVerifier;
2223

2324
import java.io.IOException;
2425
import java.time.Duration;
26+
import java.util.Arrays;
2527
import java.util.Collections;
2628
import java.util.LinkedHashMap;
2729
import java.util.Map;
@@ -65,7 +67,7 @@
6567
/**
6668
* @author Christoph Strobl
6769
* @author Mark Paluch
68-
* @currentRead Fool's Fate - Robin Hobb
70+
* @author Henrique Amaral
6971
*/
7072
@RunWith(SpringRunner.class)
7173
@ContextConfiguration("classpath:infrastructure.xml")
@@ -653,6 +655,34 @@ public void flushNonExistingIndex() {
653655
.verifyError(ElasticsearchStatusException.class);
654656
}
655657

658+
@Test // DATAES-684
659+
public void bulkShouldUpdateExistingDocument() {
660+
String idFirstDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
661+
String idSecondDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
662+
663+
UpdateRequest requestFirstDoc = new UpdateRequest(INDEX_I, TYPE_I, idFirstDoc) //
664+
.doc(Collections.singletonMap("dutiful", "farseer"));
665+
UpdateRequest requestSecondDoc = new UpdateRequest(INDEX_I, TYPE_I, idSecondDoc) //
666+
.doc(Collections.singletonMap("secondDocUpdate", "secondDocUpdatePartTwo"));
667+
668+
BulkRequest bulkRequest = new BulkRequest();
669+
bulkRequest.add(requestFirstDoc);
670+
bulkRequest.add(requestSecondDoc);
671+
672+
client.bulk(bulkRequest)
673+
.as(StepVerifier::create) //
674+
.consumeNextWith(it -> {
675+
assertThat(it.status()).isEqualTo(RestStatus.OK);
676+
assertThat(it.hasFailures()).isFalse();
677+
678+
Arrays.stream(it.getItems()).forEach(itemResponse-> {
679+
assertThat(itemResponse.status()).isEqualTo(RestStatus.OK);
680+
assertThat(itemResponse.getVersion()).isEqualTo(2);
681+
});
682+
})
683+
.verifyComplete();
684+
}
685+
656686
private AddToIndexOfType addSourceDocument() {
657687
return add(DOC_SOURCE);
658688
}

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.mockito.Mockito.*;
2121
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
2222

23+
import org.elasticsearch.action.bulk.BulkRequest;
24+
import org.elasticsearch.rest.RestStatus;
2325
import reactor.core.publisher.Mono;
2426
import reactor.test.StepVerifier;
2527

@@ -51,7 +53,7 @@
5153

5254
/**
5355
* @author Christoph Strobl
54-
* @currentRead Golden Fool - Robin Hobb
56+
* @author Henrique Amaral
5557
*/
5658
public class ReactiveElasticsearchClientUnitTests {
5759

@@ -623,4 +625,26 @@ public void scrollShouldCleanUpResourcesOnError() throws IOException {
623625
});
624626
}
625627

628+
@Test // DATAES-684
629+
public void bulkShouldEmitResponseCorrectly() {
630+
631+
hostProvider.when(HOST) //
632+
.receiveBulkOk();
633+
634+
final UpdateRequest updateRequest = new UpdateRequest("twitter", "doc", "1")
635+
.doc(Collections.singletonMap("user", "cstrobl"));
636+
final BulkRequest bulkRequest = new BulkRequest();
637+
bulkRequest.add(updateRequest);
638+
639+
client.bulk(bulkRequest)
640+
.as(StepVerifier::create) //
641+
.consumeNextWith(bulkResponse -> {
642+
643+
assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK);
644+
assertThat(bulkResponse.hasFailures()).isFalse();
645+
646+
}) //
647+
.verifyComplete();
648+
}
649+
626650
}

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java

+7
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060

6161
/**
6262
* @author Christoph Strobl
63+
* @author Henrique Amaral
6364
*/
6465
public class ReactiveMockClientTestsUtils {
6566

@@ -361,6 +362,12 @@ default Receive updateFail() {
361362
});
362363
}
363364

365+
default Receive receiveBulkOk() {
366+
367+
return receiveJsonFromFile("bulk-ok") //
368+
.receive(Receive::ok);
369+
}
370+
364371
}
365372

366373
public interface Receive {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"took": 30,
3+
"errors": false,
4+
"items": [
5+
{
6+
"update": {
7+
"_index": "twitter",
8+
"_type": "doc",
9+
"_id": "1",
10+
"_version": 2,
11+
"result": "updated",
12+
"_shards": {
13+
"total": 2,
14+
"successful": 1,
15+
"failed": 0
16+
},
17+
"_seq_no": 2,
18+
"_primary_term": 4
19+
}
20+
}
21+
]
22+
23+
}

0 commit comments

Comments
 (0)