Skip to content

Support Kotlin Flow as possible return type in repository functions #2387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli

return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
.concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
Expand All @@ -131,7 +131,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
BulkResponseItem response = indexAndResponse.getT2();
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(),
response.primaryTerm(), response.version()));
return maybeCallAfterSave(savedEntity, index);
return maybeCallbackAfterSave(savedEntity, index);
});
});
}
Expand Down Expand Up @@ -329,8 +329,8 @@ private Flux<SearchDocument> doScroll(SearchRequest searchRequest) {

Flux<ResponseBody<EntityAsMap>> searchResponses = Flux.usingWhen(Mono.fromSupplier(ScrollState::new), //
state -> Mono
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client1 -> client1
.search(searchRequest, EntityAsMap.class))) //
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
EntityAsMap.class))) //
.expand(entityAsMapSearchResponse -> {

state.updateScrollId(entityAsMapSearchResponse.scrollId());
Expand All @@ -354,6 +354,10 @@ private Flux<SearchDocument> doScroll(SearchRequest searchRequest) {

private Publisher<?> cleanupScroll(ScrollState state) {

if (state.getScrollIds().isEmpty()) {
return Mono.empty();
}

return execute((ClientCallback<Publisher<ClearScrollResponse>>) client -> client
.clearScroll(ClearScrollRequest.of(csr -> csr.scrollId(state.getScrollIds()))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli

return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
.concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
Expand All @@ -158,7 +158,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.getId(), response.getSeqNo(),
response.getPrimaryTerm(), response.getVersion()));

return maybeCallAfterSave(savedEntity, index);
return maybeCallbackAfterSave(savedEntity, index);
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
Assert.notNull(entity, "Entity must not be null!");
Assert.notNull(index, "index must not be null");

return maybeCallBeforeConvert(entity, index)
return maybeCallbackBeforeConvert(entity, index)
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) //
.map(it -> {
T savedEntity = it.getT1();
Expand All @@ -321,7 +321,7 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
indexResponseMetaData.seqNo(), //
indexResponseMetaData.primaryTerm(), //
indexResponseMetaData.version()));
}).flatMap(saved -> maybeCallAfterSave(saved, index));
}).flatMap(saved -> maybeCallbackAfterSave(saved, index));
}

abstract protected <T> Mono<Tuple2<T, IndexResponseMetaData>> doIndex(T entity, IndexCoordinates index);
Expand Down Expand Up @@ -493,7 +493,7 @@ public Mono<Boolean> closePointInTime(String pit) {

// region callbacks

protected <T> Mono<T> maybeCallBeforeConvert(T entity, IndexCoordinates index) {
protected <T> Mono<T> maybeCallbackBeforeConvert(T entity, IndexCoordinates index) {

if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, entity, index);
Expand All @@ -502,7 +502,7 @@ protected <T> Mono<T> maybeCallBeforeConvert(T entity, IndexCoordinates index) {
return Mono.just(entity);
}

protected <T> Mono<T> maybeCallAfterSave(T entity, IndexCoordinates index) {
protected <T> Mono<T> maybeCallbackAfterSave(T entity, IndexCoordinates index) {

if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity, index);
Expand All @@ -511,7 +511,7 @@ protected <T> Mono<T> maybeCallAfterSave(T entity, IndexCoordinates index) {
return Mono.just(entity);
}

protected <T> Mono<T> maybeCallAfterConvert(T entity, Document document, IndexCoordinates index) {
protected <T> Mono<T> maybeCallbackAfterConvert(T entity, Document document, IndexCoordinates index) {

if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveAfterConvertCallback.class, entity, document, index);
Expand All @@ -528,8 +528,19 @@ protected <T> Mono<Document> maybeCallbackAfterLoad(Document document, Class<T>
return Mono.just(document);
}

/**
* Callback to convert {@link Document} into an entity of type T
*
* @param <T> the entity type
*/
protected interface DocumentCallback<T> {

/**
* Convert a document into an entity
*
* @param document the document to convert
* @return a Mono of the entity
*/
@NonNull
Mono<T> toEntity(@Nullable Document document);
}
Expand Down Expand Up @@ -566,16 +577,30 @@ public Mono<T> toEntity(@Nullable Document document) {
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); //
entity = updateIndexedObject(entity, indexedObjectInformation);

return maybeCallAfterConvert(entity, documentAfterLoad, index);
return maybeCallbackAfterConvert(entity, documentAfterLoad, index);
});
}
}

/**
* Callback to convert a {@link SearchDocument} into different other classes
* @param <T> the entity type
*/
protected interface SearchDocumentCallback<T> {

Mono<T> toEntity(SearchDocument response);

Mono<SearchHit<T>> toSearchHit(SearchDocument response);
/**
* converts a {@link SearchDocument} to an entity
* @param searchDocument
* @return the entity in a MOno
*/
Mono<T> toEntity(SearchDocument searchDocument);

/**
* converts a {@link SearchDocument} into a SearchHit
* @param searchDocument
* @return
*/
Mono<SearchHit<T>> toSearchHit(SearchDocument searchDocument);
}

protected class ReadSearchDocumentCallback<T> implements SearchDocumentCallback<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ private Object execute(ElasticsearchParameterAccessor parameterAccessor) {
query.addSourceFilter(sourceFilter);
}

Class<?> targetType = processor.getReturnedType().getTypeToRead();
String indexName = queryMethod.getEntityInformation().getIndexName();
IndexCoordinates index = IndexCoordinates.of(indexName);

ReactiveElasticsearchQueryExecution execution = getExecution(parameterAccessor,
new ResultProcessingConverter(processor));

return execution.execute(query, processor.getReturnedType().getDomainType(), targetType, index);
var returnedType = processor.getReturnedType();
return execution.execute(query, returnedType.getDomainType(), returnedType.getTypeToRead(), index);
}

private ReactiveElasticsearchQueryExecution getExecution(ElasticsearchParameterAccessor accessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected MappingContext<? extends ElasticsearchPersistentEntity<?>, Elasticsear
* {@link org.springframework.data.elasticsearch.core.SearchHits} or a collection of
* {@link org.springframework.data.elasticsearch.core.SearchHit}.
*
* @return true if the method has a {@link org.springframework.data.elasticsearch.core.SearchHit}t related return type
* @return true if the method has a {@link org.springframework.data.elasticsearch.core.SearchHit} related return type
* @since 4.0
*/
public boolean isSearchHitMethod() {
Expand Down Expand Up @@ -298,7 +298,8 @@ SourceFilter getSourceFilter(ParameterAccessor parameterAccessor, ElasticsearchC
return fetchSourceFilterBuilder.build();
}

private String[] mapParameters(String[] source, ParameterAccessor parameterAccessor, StringQueryUtil stringQueryUtil) {
private String[] mapParameters(String[] source, ParameterAccessor parameterAccessor,
StringQueryUtil stringQueryUtil) {

List<String> fieldNames = new ArrayList<>();

Expand All @@ -308,7 +309,7 @@ private String[] mapParameters(String[] source, ParameterAccessor parameterAcces
String fieldName = stringQueryUtil.replacePlaceholders(s, parameterAccessor);
// this could be "[\"foo\",\"bar\"]", must be split
if (fieldName.startsWith("[") && fieldName.endsWith("]")) {
//noinspection RegExpRedundantEscape
// noinspection RegExpRedundantEscape
fieldNames.addAll( //
Arrays.asList(fieldName.substring(1, fieldName.length() - 2) //
.replaceAll("\\\"", "") //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static org.springframework.data.repository.util.ClassUtils.*;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.Method;
Expand All @@ -36,8 +35,8 @@
import org.springframework.data.projection.ProjectionFactory;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.util.ReactiveWrapperConverters;
import org.springframework.data.repository.util.ReactiveWrappers;
import org.springframework.data.util.Lazy;
import org.springframework.data.util.ReactiveWrappers;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.ClassUtils;

Expand Down Expand Up @@ -74,17 +73,19 @@ public ReactiveElasticsearchQueryMethod(Method method, RepositoryMetadata metada
if (!multiWrapper) {
throw new IllegalStateException(String.format(
"Method has to use a either multi-item reactive wrapper return type or a wrapped Page/Slice type. Offending method: %s",
method));
method));
}

if (hasParameterOfType(method, Sort.class)) {
throw new IllegalStateException(String.format("Method must not have Pageable *and* Sort parameter. "
+ "Use sorting capabilities on Pageble instead! Offending method: %s", method));
+ "Use sorting capabilities on Pageable instead! Offending method: %s", method));
}
}

this.isCollectionQuery = Lazy.of(() -> (!(isPageQuery() || isSliceQuery())
&& ReactiveWrappers.isMultiValueType(metadata.getReturnType(method).getType()) || super.isCollectionQuery()));
this.isCollectionQuery = Lazy.of(() -> {
return (!(isPageQuery() || isSliceQuery())
&& ReactiveWrappers.isMultiValueType(metadata.getReturnType(method).getType()) || super.isCollectionQuery());
});
}

@Override
Expand Down Expand Up @@ -150,7 +151,7 @@ public ElasticsearchParameters getParameters() {
@Override
protected boolean isAllowedGenericType(ParameterizedType methodGenericReturnType) {
return super.isAllowedGenericType(methodGenericReturnType)
|| Flux.class.isAssignableFrom((Class<?>) methodGenericReturnType.getRawType());
|| ReactiveWrappers.supports((Class<?>) methodGenericReturnType.getRawType());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package org.springframework.data.elasticsearch.repository.support;

import reactor.core.publisher.Flux;

import java.lang.reflect.ParameterizedType;

import org.springframework.data.util.ReactiveWrappers;

/**
* @author Peter-Josef Meisch
* @since 4.0
Expand All @@ -32,6 +32,6 @@ public ReactiveElasticsearchRepositoryMetadata(Class<?> repositoryInterface) {
@Override
protected boolean isAllowedGenericType(ParameterizedType methodGenericReturnType) {
return super.isAllowedGenericType(methodGenericReturnType)
|| Flux.class.isAssignableFrom((Class<?>) methodGenericReturnType.getRawType());
|| ReactiveWrappers.supports((Class<?>) methodGenericReturnType.getRawType());
}
}