Skip to content

DATAES-684 Implement bulk request from reactive client #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
Expand Down Expand Up @@ -124,6 +126,7 @@
* @author Mark Paluch
* @author Peter-Josef Meisch
* @author Huw Ayling-Miller
* @author Henrique Amaral
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
Expand Down Expand Up @@ -429,6 +432,16 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
.publishNext();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
*/
@Override
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
return sendRequest(bulkRequest, RequestCreator.bulk(), BulkResponse.class, headers) //
.publishNext();
}

// --> INDICES

/*
Expand Down Expand Up @@ -748,6 +761,18 @@ static Function<DeleteByQueryRequest, Request> deleteByQuery() {
};
}

static Function<BulkRequest, Request> bulk() {

return request -> {

try {
return RequestConverters.bulk(request);
} catch (IOException e) {
throw new ElasticsearchException("Could not parse request", e);
}
};
}

// --> INDICES

static Function<GetIndexRequest, Request> indexExists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.client.reactive;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -57,6 +59,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Henrique Amaral
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
Expand Down Expand Up @@ -429,6 +432,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
*/
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);

/**
* Execute a {@link BulkRequest} against the {@literal bulk} API.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
* API on elastic.co</a>
* @return a {@link Mono} emitting the emitting operation response.
*/
default Mono<BulkResponse> bulk(Consumer<BulkRequest> consumer) {

BulkRequest request = new BulkRequest();
consumer.accept(request);
return bulk(request);
}

/**
* Execute a {@link BulkRequest} against the {@literal bulk} API.
*
* @param bulkRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
* API on elastic.co</a>
* @return a {@link Mono} emitting the emitting operation response.
*/
default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
return bulk(HttpHeaders.EMPTY, bulkRequest);
}

/**
* Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param bulkRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk
* API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);

/**
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import static org.assertj.core.api.Assertions.*;

import lombok.SneakyThrows;
import org.elasticsearch.action.bulk.BulkRequest;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -650,6 +652,34 @@ public void flushNonExistingIndex() {
.verifyError(ElasticsearchStatusException.class);
}

@Test // DATAES-684
public void bulkShouldUpdateExistingDocument() {
String idFirstDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
String idSecondDoc = addSourceDocument().ofType(TYPE_I).to(INDEX_I);

UpdateRequest requestFirstDoc = new UpdateRequest(INDEX_I, TYPE_I, idFirstDoc) //
.doc(Collections.singletonMap("dutiful", "farseer"));
UpdateRequest requestSecondDoc = new UpdateRequest(INDEX_I, TYPE_I, idSecondDoc) //
.doc(Collections.singletonMap("secondDocUpdate", "secondDocUpdatePartTwo"));

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(requestFirstDoc);
bulkRequest.add(requestSecondDoc);

client.bulk(bulkRequest)
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.OK);
assertThat(it.hasFailures()).isFalse();

Arrays.stream(it.getItems()).forEach(itemResponse-> {
assertThat(itemResponse.status()).isEqualTo(RestStatus.OK);
assertThat(itemResponse.getVersion()).isEqualTo(2);
});
})
.verifyComplete();
}

private AddToIndexOfType addSourceDocument() {
return add(DOC_SOURCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.*;
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.rest.RestStatus;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -622,4 +624,26 @@ public void scrollShouldCleanUpResourcesOnError() throws IOException {
});
}

@Test // DATAES-684
public void bulkShouldEmitResponseCorrectly() {

hostProvider.when(HOST) //
.receiveBulkOk();

final UpdateRequest updateRequest = new UpdateRequest("twitter", "doc", "1")
.doc(Collections.singletonMap("user", "cstrobl"));
final BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(updateRequest);

client.bulk(bulkRequest)
.as(StepVerifier::create) //
.consumeNextWith(bulkResponse -> {

assertThat(bulkResponse.status()).isEqualTo(RestStatus.OK);
assertThat(bulkResponse.hasFailures()).isFalse();

}) //
.verifyComplete();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ default Receive updateFail() {
});
}

default Receive receiveBulkOk() {

return receiveJsonFromFile("bulk-ok") //
.receive(Receive::ok);
}

}

public interface Receive {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"took": 30,
"errors": false,
"items": [
{
"update": {
"_index": "twitter",
"_type": "doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 4
}
}
]

}