Skip to content

Improve save(Flux<T>) implementations. #2497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.function.Tuple2;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -207,6 +211,61 @@ public <T> Mono<T> save(T entity) {
return save(entity, getIndexCoordinatesFor(entity.getClass()));
}

@Override
public <T> Flux<T> save(Flux<T> entities, Class<?> clazz, int bulkSize) {
return save(entities, getIndexCoordinatesFor(clazz), bulkSize);
}

@Override
public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize) {

Assert.notNull(entities, "entities must not be null");
Assert.notNull(index, "index must not be null");
Assert.isTrue(bulkSize > 0, "bulkSize must be greater than 0");

return Flux.defer(() -> {
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
entities //
.bufferTimeout(bulkSize, Duration.ofMillis(200)) //
.subscribe(new Subscriber<List<T>>() {
private Subscription subscription;
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<T> entityList) {
saveAll(entityList, index) //
.map(sink::tryEmitNext) //
.doOnComplete(() -> {
if (!upstreamComplete.get()) {
subscription.request(1);
} else {
sink.tryEmitComplete();
}
}).subscribe();
}

@Override
public void onError(Throwable throwable) {
subscription.cancel();
sink.tryEmitError(throwable);
}

@Override
public void onComplete() {
upstreamComplete.set(true);
}
});
return sink.asFlux();
});

}

@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, Class<T> clazz) {
return saveAll(entities, getIndexCoordinatesFor(clazz));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
* @since 4.0
*/
public interface ReactiveDocumentOperations {

int FLUX_SAVE_BULK_SIZE = 500;

/**
* Index the given entity, once available, extracting index from entity metadata.
*
Expand Down Expand Up @@ -93,6 +96,61 @@ default <T> Mono<T> save(Mono<? extends T> entityPublisher, IndexCoordinates ind
*/
<T> Mono<T> save(T entity, IndexCoordinates index);

/**
* Indexes the entities into the index extracted from entity metadata.
*
* @param entities
* @param clazz the class to get the index name from
* @param <T> entity type
* @return a Flux emitting the saved entities
* @since 5.1
*/
default <T> Flux<T> save(Flux<T> entities, Class<?> clazz) {
return save(entities, clazz, FLUX_SAVE_BULK_SIZE);
}

/**
* Indexes the entities into the index extracted from entity metadata. The entities are collected into batches of
* {bulkSize} with a maximal timeout of 200 ms, see
* {@link reactor.core.publisher.Flux#bufferTimeout(int, java.time .Duration)} and then sent in a bulk operation to
* Elasticsearch.
*
* @param entities
* @param clazz the class to get the index name from
* @param bulkSize number of entities to put in a bulk request
* @param <T> entity type
* @return a Flux emitting the saved entities
* @since 5.1
*/
<T> Flux<T> save(Flux<T> entities, Class<?> clazz, int bulkSize);

/**
* Indexes the entities into the given index.
*
* @param entities the entities to save
* @param index the index to save to
* @param <T> entity type
* @return a Flux emitting the saved entities
* @since 5.1
*/
default <T> Flux<T> save(Flux<T> entities, IndexCoordinates index) {
return save(entities, index, FLUX_SAVE_BULK_SIZE);
}

/**
* Indexes the entities into the given index. The entities are collected into batches of {bulkSize} with a maximal
* timeout of 200 ms, see {@link reactor.core.publisher.Flux#bufferTimeout(int, java.time * .Duration)} and then sent
* in a bulk operation to Elasticsearch.
*
* @param entities the entities to save
* @param index the index to save to
* @param bulkSize number of entities to put in a bulk request
* @param <T> entity type
* @return a Flux emitting the saved entities
* @since 5.1
*/
<T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize);

/**
* Index entities the index extracted from entity metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.springframework.data.elasticsearch.repository.support;

import org.springframework.data.elasticsearch.core.query.BaseQuery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -30,6 +29,7 @@
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -97,7 +97,7 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {

Assert.notNull(entityStream, "EntityStream must not be null!");

return operations.saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates())
return operations.save(Flux.from(entityStream), entityInformation.getIndexCoordinates())
.concatWith(doRefresh().then(Mono.empty()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*;

import org.springframework.data.elasticsearch.annotations.IndexedIndexName;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -61,6 +61,7 @@
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.IndexedIndexName;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.annotations.WriteOnlyProperty;
Expand Down Expand Up @@ -170,7 +171,6 @@ void shouldSetIndexedIndexNameProperty() {
assertThat(saved.getIndexedIndexName()).isEqualTo(indexNameProvider.indexName() + "-indexedindexname");
}


private Mono<Boolean> documentWithIdExistsInIndex(String id, String index) {
return operations.exists(id, IndexCoordinates.of(index));
}
Expand Down Expand Up @@ -1180,6 +1180,25 @@ void shouldWorkWithReadonlyId() {
assertThat(readEntity.getPart2()).isEqualTo(entity.getPart2()); //
}).verifyComplete();
}

@Test // #2496
@DisplayName("should save data from Flux and return saved data in a flux")
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {

var count = 12_345;
var entityList = IntStream.rangeClosed(1, count)//
.mapToObj(SampleEntity::of) //
.collect(Collectors.toList());

var entityFlux = Flux.fromIterable(entityList);

operations.save(entityFlux, SampleEntity.class).collectList() //
.as(StepVerifier::create) //
.consumeNextWith(savedEntities -> {
assertThat(savedEntities).isEqualTo(entityList);
}) //
.verifyComplete();
}
// endregion

// region Helper functions
Expand Down Expand Up @@ -1262,6 +1281,13 @@ static class SampleEntity {
@Nullable
@Version private Long version;

static SampleEntity of(int id) {
var entity = new SampleEntity();
entity.setId("" + id);
entity.setMessage(" message " + id);
return entity;
}

@Nullable
public String getId() {
return id;
Expand Down Expand Up @@ -1543,15 +1569,15 @@ public void setPart2(String part2) {
this.part2 = part2;
}
}

@Document(indexName = "#{@indexNameProvider.indexName()}-indexedindexname")
private static class IndexedIndexNameEntity {
@Nullable
@Id private String id;
@Nullable
@Field(type = Text) private String someText;
@Nullable
@IndexedIndexName
private String indexedIndexName;
@IndexedIndexName private String indexedIndexName;

@Nullable
public String getId() {
Expand Down Expand Up @@ -1579,5 +1605,6 @@ public String getIndexedIndexName() {
public void setIndexedIndexName(@Nullable String indexedIndexName) {
this.indexedIndexName = indexedIndexName;
}
} // endregion
}
// endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ private void loadAnnotatedCompletionObjectEntitiesWithWeights() {
operations.bulkIndex(indexQueries, AnnotatedCompletionEntity.class);
}

// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150")
@Test
public void shouldFindSuggestionsForGivenCriteriaQueryUsingCompletionEntity() {

Expand Down Expand Up @@ -144,7 +143,6 @@ void shouldRetrieveEntityWithCompletion() {
operations.get("1", CompletionEntity.class);
}

// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150")
@Test
public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {

Expand All @@ -168,7 +166,6 @@ public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEn
assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin");
}

// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES 1issue 150")
@Test
public void shouldFindSuggestionsWithWeightsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void cleanup() {
operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete().block();
}

// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150")
@Test // #1302
@DisplayName("should find suggestions for given prefix completion")
void shouldFindSuggestionsForGivenPrefixCompletion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,24 @@ void shouldUseSourceExcludesFromParameter() {
.verifyComplete();
}

@Test // #2496
@DisplayName("should save data from Flux and return saved data in a flux")
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
var count = 12_345;
var entityList = IntStream.rangeClosed(1, count)//
.mapToObj(SampleEntity::of) //
.collect(Collectors.toList());

var entityFlux = Flux.fromIterable(entityList);

repository.saveAll(entityFlux).collectList() //
.as(StepVerifier::create) //
.consumeNextWith(savedEntities -> {
assertThat(savedEntities).isEqualTo(entityList);
}) //
.verifyComplete();
}

Mono<Void> bulkIndex(SampleEntity... entities) {
return operations.saveAll(Arrays.asList(entities), IndexCoordinates.of(indexNameProvider.indexName())).then();
}
Expand Down Expand Up @@ -829,6 +847,13 @@ static class SampleEntity {
@Field(name = "custom_field_name", type = FieldType.Text)
@Nullable private String customFieldNameMessage;

static SampleEntity of(int id) {
var entity = new SampleEntity();
entity.setId("" + id);
entity.setMessage(" message " + id);
return entity;
}

public SampleEntity() {}

public SampleEntity(@Nullable String id) {
Expand Down