Skip to content

Commit 3b93b6a

Browse files
authored
Use correct overload of Flux.bufferTimeout().
Original Pull Request #2753 Closes #2607
1 parent d281df7 commit 3b93b6a

File tree

2 files changed

+82
-78
lines changed

2 files changed

+82
-78
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

+78-76
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18-
import org.apache.commons.logging.Log;
19-
import org.apache.commons.logging.LogFactory;
2018
import reactor.core.publisher.Flux;
2119
import reactor.core.publisher.Mono;
2220
import reactor.core.publisher.Sinks;
@@ -73,9 +71,6 @@
7371
abstract public class AbstractReactiveElasticsearchTemplate
7472
implements ReactiveElasticsearchOperations, ApplicationContextAware {
7573

76-
protected static final Log QUERY_LOGGER = LogFactory
77-
.getLog("org.springframework.data.elasticsearch.core.QUERY");
78-
7974
protected final ElasticsearchConverter converter;
8075
protected final SimpleElasticsearchMappingContext mappingContext;
8176
protected final EntityOperations entityOperations;
@@ -175,11 +170,12 @@ public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) {
175170
* @return a Mono signalling finished execution
176171
* @since 4.3
177172
*/
173+
@SuppressWarnings("unused")
178174
public Mono<Void> logVersions() {
179175

180-
return getVendor() //
181-
.zipWith(getRuntimeLibraryVersion()) //
182-
.zipWith(getClusterVersion()) //
176+
return getVendor()
177+
.zipWith(getRuntimeLibraryVersion())
178+
.zipWith(getClusterVersion())
183179
.doOnNext(objects -> VersionInfo.logVersions(objects.getT1().getT1(), objects.getT1().getT2(), objects.getT2()))
184180
.then();
185181
}
@@ -233,42 +229,48 @@ public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize)
233229

234230
return Flux.defer(() -> {
235231
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
236-
entities.window(bulkSize) //
237-
.concatMap(flux -> flux.collectList()) //
238-
.subscribe(new Subscriber<List<T>>() {
239-
private Subscription subscription;
240-
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
241-
242-
@Override
243-
public void onSubscribe(Subscription subscription) {
244-
this.subscription = subscription;
245-
subscription.request(1);
246-
}
247-
248-
@Override
249-
public void onNext(List<T> entityList) {
250-
saveAll(entityList, index) //
251-
.map(sink::tryEmitNext) //
252-
.doOnComplete(() -> {
253-
if (!upstreamComplete.get()) {
254-
subscription.request(1);
255-
} else {
256-
sink.tryEmitComplete();
257-
}
258-
}).subscribe();
259-
}
260-
261-
@Override
262-
public void onError(Throwable throwable) {
263-
subscription.cancel();
264-
sink.tryEmitError(throwable);
265-
}
266-
267-
@Override
268-
public void onComplete() {
269-
upstreamComplete.set(true);
270-
}
271-
});
232+
// noinspection ReactiveStreamsSubscriberImplementation
233+
entities
234+
.bufferTimeout(bulkSize, Duration.ofMillis(200), true)
235+
.subscribe(new Subscriber<>() {
236+
@Nullable private Subscription subscription = null;
237+
private final AtomicBoolean upstreamComplete = new AtomicBoolean(false);
238+
239+
@Override
240+
public void onSubscribe(Subscription subscription) {
241+
this.subscription = subscription;
242+
subscription.request(1);
243+
}
244+
245+
@Override
246+
public void onNext(List<T> entityList) {
247+
saveAll(entityList, index)
248+
.map(sink::tryEmitNext)
249+
.doOnComplete(() -> {
250+
if (!upstreamComplete.get()) {
251+
if (subscription == null) {
252+
throw new IllegalStateException("no subscription");
253+
}
254+
subscription.request(1);
255+
} else {
256+
sink.tryEmitComplete();
257+
}
258+
}).subscribe();
259+
}
260+
261+
@Override
262+
public void onError(Throwable throwable) {
263+
if (subscription != null) {
264+
subscription.cancel();
265+
}
266+
sink.tryEmitError(throwable);
267+
}
268+
269+
@Override
270+
public void onComplete() {
271+
upstreamComplete.set(true);
272+
}
273+
});
272274
return sink.asFlux();
273275
});
274276

@@ -324,6 +326,7 @@ protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedOb
324326
.getPersistentEntity(entity.getClass());
325327

326328
if (persistentEntity != null) {
329+
// noinspection DuplicatedCode
327330
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
328331
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();
329332

@@ -353,8 +356,7 @@ protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedOb
353356
}
354357

355358
// noinspection unchecked
356-
T updatedEntity = (T) propertyAccessor.getBean();
357-
return updatedEntity;
359+
return (T) propertyAccessor.getBean();
358360
} else {
359361
EntityOperations.AdaptableEntity<T> adaptableEntity = entityOperations.forEntity(entity,
360362
converter.getConversionService(), routingResolver);
@@ -385,15 +387,15 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
385387
Assert.notNull(index, "index must not be null");
386388

387389
return maybeCallbackBeforeConvert(entity, index)
388-
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) //
390+
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index))
389391
.map(it -> {
390392
T savedEntity = it.getT1();
391393
IndexResponseMetaData indexResponseMetaData = it.getT2();
392-
return updateIndexedObject(savedEntity, new IndexedObjectInformation( //
393-
indexResponseMetaData.id(), //
394-
indexResponseMetaData.index(), //
395-
indexResponseMetaData.seqNo(), //
396-
indexResponseMetaData.primaryTerm(), //
394+
return updateIndexedObject(savedEntity, new IndexedObjectInformation(
395+
indexResponseMetaData.id(),
396+
indexResponseMetaData.index(),
397+
indexResponseMetaData.seqNo(),
398+
indexResponseMetaData.primaryTerm(),
397399
indexResponseMetaData.version()));
398400
}).flatMap(saved -> maybeCallbackAfterSave(saved, index));
399401
}
@@ -478,12 +480,12 @@ public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, C
478480

479481
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
480482

481-
return doFindForResponse(query, entityType, index) //
482-
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
483-
.flatMap(callback::toEntity) //
484-
.collectList() //
485-
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
486-
.mapHits(searchDocumentResponse, entities))) //
483+
return doFindForResponse(query, entityType, index)
484+
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments())
485+
.flatMap(callback::toEntity)
486+
.collectList()
487+
.map(entities -> SearchHitMapping.mappingFor(resultType, converter)
488+
.mapHits(searchDocumentResponse, entities)))
487489
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
488490
}
489491

@@ -503,17 +505,18 @@ public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entit
503505

504506
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
505507

506-
return doFindForResponse(query, entityType, index) //
507-
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
508-
.flatMap(callback::toEntity) //
509-
.collectList() //
510-
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
511-
.mapHits(searchDocumentResponse, entities))) //
508+
return doFindForResponse(query, entityType, index)
509+
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments())
510+
.flatMap(callback::toEntity)
511+
.collectList()
512+
.map(entities -> SearchHitMapping.mappingFor(resultType, converter)
513+
.mapHits(searchDocumentResponse, entities)))
512514
.map(ReactiveSearchHitSupport::searchHitsFor);
513515
}
514516

515517
abstract protected Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index);
516518

519+
@SuppressWarnings("unused")
517520
abstract protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz,
518521
IndexCoordinates index);
519522

@@ -639,17 +642,16 @@ public Mono<T> toEntity(@Nullable Document document) {
639642
return Mono.empty();
640643
}
641644

642-
return maybeCallbackAfterLoad(document, type, index) //
645+
return maybeCallbackAfterLoad(document, type, index)
643646
.flatMap(documentAfterLoad -> {
644-
647+
// noinspection DuplicatedCode
645648
T entity = reader.read(type, documentAfterLoad);
646-
647-
IndexedObjectInformation indexedObjectInformation = new IndexedObjectInformation( //
648-
documentAfterLoad.hasId() ? documentAfterLoad.getId() : null, //
649-
documentAfterLoad.getIndex(), //
650-
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null, //
651-
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null, //
652-
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); //
649+
IndexedObjectInformation indexedObjectInformation = new IndexedObjectInformation(
650+
documentAfterLoad.hasId() ? documentAfterLoad.getId() : null,
651+
documentAfterLoad.getIndex(),
652+
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null,
653+
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null,
654+
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null);
653655
entity = updateIndexedObject(entity, indexedObjectInformation);
654656

655657
return maybeCallbackAfterConvert(entity, documentAfterLoad, index);
@@ -667,16 +669,16 @@ protected interface SearchDocumentCallback<T> {
667669
/**
668670
* converts a {@link SearchDocument} to an entity
669671
*
670-
* @param searchDocument
672+
* @param searchDocument the document to convert
671673
* @return the entity in a MOno
672674
*/
673675
Mono<T> toEntity(SearchDocument searchDocument);
674676

675677
/**
676678
* converts a {@link SearchDocument} into a SearchHit
677679
*
678-
* @param searchDocument
679-
* @return
680+
* @param searchDocument the document to convert
681+
* @return the converted SearchHit
680682
*/
681683
Mono<SearchHit<T>> toSearchHit(SearchDocument searchDocument);
682684
}

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1189,9 +1189,11 @@ void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
11891189
.mapToObj(SampleEntity::of) //
11901190
.collect(Collectors.toList());
11911191

1192-
// we add a random delay to make suure the underlying implementation handles irregular incoming data
1192+
// we add a random delay to make sure the underlying implementation handles irregular incoming data
11931193
var entities = Flux.fromIterable(entityList).concatMap(
1194-
entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity));
1194+
entity -> Mono.just(entity)
1195+
.delay(Duration.ofMillis((long) (Math.random() * 10)))
1196+
.thenReturn(entity));
11951197

11961198
operations.save(entities, SampleEntity.class).collectList() //
11971199
.as(StepVerifier::create) //

0 commit comments

Comments
 (0)