diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 6c81dfbee..03921beae 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -15,8 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -73,9 +71,6 @@ abstract public class AbstractReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware { - protected static final Log QUERY_LOGGER = LogFactory - .getLog("org.springframework.data.elasticsearch.core.QUERY"); - protected final ElasticsearchConverter converter; protected final SimpleElasticsearchMappingContext mappingContext; protected final EntityOperations entityOperations; @@ -175,11 +170,12 @@ public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) { * @return a Mono signalling finished execution * @since 4.3 */ + @SuppressWarnings("unused") public Mono logVersions() { - return getVendor() // - .zipWith(getRuntimeLibraryVersion()) // - .zipWith(getClusterVersion()) // + return getVendor() + .zipWith(getRuntimeLibraryVersion()) + .zipWith(getClusterVersion()) .doOnNext(objects -> VersionInfo.logVersions(objects.getT1().getT1(), objects.getT1().getT2(), objects.getT2())) .then(); } @@ -233,42 +229,48 @@ public Flux save(Flux entities, IndexCoordinates index, int bulkSize) return Flux.defer(() -> { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - entities.window(bulkSize) // - .concatMap(flux -> flux.collectList()) // - .subscribe(new Subscriber>() { - private Subscription subscription; - private AtomicBoolean upstreamComplete = new AtomicBoolean(false); - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(List entityList) { - saveAll(entityList, index) // - .map(sink::tryEmitNext) // - .doOnComplete(() -> { - if (!upstreamComplete.get()) { - subscription.request(1); - } else { - sink.tryEmitComplete(); - } - }).subscribe(); - } - - @Override - public void onError(Throwable throwable) { - subscription.cancel(); - sink.tryEmitError(throwable); - } - - @Override - public void onComplete() { - upstreamComplete.set(true); - } - }); + // noinspection ReactiveStreamsSubscriberImplementation + entities + .bufferTimeout(bulkSize, Duration.ofMillis(200), true) + .subscribe(new Subscriber<>() { + @Nullable private Subscription subscription = null; + private final AtomicBoolean upstreamComplete = new AtomicBoolean(false); + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(List entityList) { + saveAll(entityList, index) + .map(sink::tryEmitNext) + .doOnComplete(() -> { + if (!upstreamComplete.get()) { + if (subscription == null) { + throw new IllegalStateException("no subscription"); + } + subscription.request(1); + } else { + sink.tryEmitComplete(); + } + }).subscribe(); + } + + @Override + public void onError(Throwable throwable) { + if (subscription != null) { + subscription.cancel(); + } + sink.tryEmitError(throwable); + } + + @Override + public void onComplete() { + upstreamComplete.set(true); + } + }); return sink.asFlux(); }); @@ -324,6 +326,7 @@ protected T updateIndexedObject(T entity, IndexedObjectInformation indexedOb .getPersistentEntity(entity.getClass()); if (persistentEntity != null) { + // noinspection DuplicatedCode PersistentPropertyAccessor propertyAccessor = persistentEntity.getPropertyAccessor(entity); ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty(); @@ -353,8 +356,7 @@ protected T updateIndexedObject(T entity, IndexedObjectInformation indexedOb } // noinspection unchecked - T updatedEntity = (T) propertyAccessor.getBean(); - return updatedEntity; + return (T) propertyAccessor.getBean(); } else { EntityOperations.AdaptableEntity adaptableEntity = entityOperations.forEntity(entity, converter.getConversionService(), routingResolver); @@ -385,15 +387,15 @@ public Mono save(T entity, IndexCoordinates index) { Assert.notNull(index, "index must not be null"); return maybeCallbackBeforeConvert(entity, index) - .flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) // + .flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) .map(it -> { T savedEntity = it.getT1(); IndexResponseMetaData indexResponseMetaData = it.getT2(); - return updateIndexedObject(savedEntity, new IndexedObjectInformation( // - indexResponseMetaData.id(), // - indexResponseMetaData.index(), // - indexResponseMetaData.seqNo(), // - indexResponseMetaData.primaryTerm(), // + return updateIndexedObject(savedEntity, new IndexedObjectInformation( + indexResponseMetaData.id(), + indexResponseMetaData.index(), + indexResponseMetaData.seqNo(), + indexResponseMetaData.primaryTerm(), indexResponseMetaData.version())); }).flatMap(saved -> maybeCallbackAfterSave(saved, index)); } @@ -478,12 +480,12 @@ public Mono> searchForPage(Query query, Class entityType, C SearchDocumentCallback callback = new ReadSearchDocumentCallback<>(resultType, index); - return doFindForResponse(query, entityType, index) // - .flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) // - .flatMap(callback::toEntity) // - .collectList() // - .map(entities -> SearchHitMapping.mappingFor(resultType, converter) // - .mapHits(searchDocumentResponse, entities))) // + return doFindForResponse(query, entityType, index) + .flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) + .flatMap(callback::toEntity) + .collectList() + .map(entities -> SearchHitMapping.mappingFor(resultType, converter) + .mapHits(searchDocumentResponse, entities))) .map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable())); } @@ -503,17 +505,18 @@ public Mono> searchForHits(Query query, Class entit SearchDocumentCallback callback = new ReadSearchDocumentCallback<>(resultType, index); - return doFindForResponse(query, entityType, index) // - .flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) // - .flatMap(callback::toEntity) // - .collectList() // - .map(entities -> SearchHitMapping.mappingFor(resultType, converter) // - .mapHits(searchDocumentResponse, entities))) // + return doFindForResponse(query, entityType, index) + .flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) + .flatMap(callback::toEntity) + .collectList() + .map(entities -> SearchHitMapping.mappingFor(resultType, converter) + .mapHits(searchDocumentResponse, entities))) .map(ReactiveSearchHitSupport::searchHitsFor); } abstract protected Flux doFind(Query query, Class clazz, IndexCoordinates index); + @SuppressWarnings("unused") abstract protected Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index); @@ -639,17 +642,16 @@ public Mono toEntity(@Nullable Document document) { return Mono.empty(); } - return maybeCallbackAfterLoad(document, type, index) // + return maybeCallbackAfterLoad(document, type, index) .flatMap(documentAfterLoad -> { - + // noinspection DuplicatedCode T entity = reader.read(type, documentAfterLoad); - - IndexedObjectInformation indexedObjectInformation = new IndexedObjectInformation( // - documentAfterLoad.hasId() ? documentAfterLoad.getId() : null, // - documentAfterLoad.getIndex(), // - documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null, // - documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null, // - documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); // + IndexedObjectInformation indexedObjectInformation = new IndexedObjectInformation( + documentAfterLoad.hasId() ? documentAfterLoad.getId() : null, + documentAfterLoad.getIndex(), + documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null, + documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null, + documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); entity = updateIndexedObject(entity, indexedObjectInformation); return maybeCallbackAfterConvert(entity, documentAfterLoad, index); @@ -667,7 +669,7 @@ protected interface SearchDocumentCallback { /** * converts a {@link SearchDocument} to an entity * - * @param searchDocument + * @param searchDocument the document to convert * @return the entity in a MOno */ Mono toEntity(SearchDocument searchDocument); @@ -675,8 +677,8 @@ protected interface SearchDocumentCallback { /** * converts a {@link SearchDocument} into a SearchHit * - * @param searchDocument - * @return + * @param searchDocument the document to convert + * @return the converted SearchHit */ Mono> toSearchHit(SearchDocument searchDocument); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java index ac5dd6429..0ceefea94 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java @@ -1189,9 +1189,11 @@ void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() { .mapToObj(SampleEntity::of) // .collect(Collectors.toList()); - // we add a random delay to make suure the underlying implementation handles irregular incoming data + // we add a random delay to make sure the underlying implementation handles irregular incoming data var entities = Flux.fromIterable(entityList).concatMap( - entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity)); + entity -> Mono.just(entity) + .delay(Duration.ofMillis((long) (Math.random() * 10))) + .thenReturn(entity)); operations.save(entities, SampleEntity.class).collectList() // .as(StepVerifier::create) //