From 7df43fdbc441488e67fa92b266a03e2a83861ae8 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 3 Dec 2022 22:25:07 +0100 Subject: [PATCH] Support Kotlin Flow as possible return type in repository functions --- .../elc/ReactiveElasticsearchTemplate.java | 12 ++++-- .../erhlc/ReactiveElasticsearchTemplate.java | 4 +- ...AbstractReactiveElasticsearchTemplate.java | 43 +++++++++++++++---- ...tReactiveElasticsearchRepositoryQuery.java | 4 +- .../query/ElasticsearchQueryMethod.java | 7 +-- .../ReactiveElasticsearchQueryMethod.java | 15 ++++--- ...activeElasticsearchRepositoryMetadata.java | 6 +-- 7 files changed, 61 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index d8be499d4..fe2e782b8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -115,7 +115,7 @@ public Flux saveAll(Mono> entitiesPubli return entitiesPublisher // .flatMapMany(entities -> Flux.fromIterable(entities) // - .concatMap(entity -> maybeCallBeforeConvert(entity, index)) // + .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // ).collectList() // .map(Entities::new) // .flatMapMany(entities -> { @@ -131,7 +131,7 @@ public Flux saveAll(Mono> entitiesPubli BulkResponseItem response = indexAndResponse.getT2(); updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(), response.primaryTerm(), response.version())); - return maybeCallAfterSave(savedEntity, index); + return maybeCallbackAfterSave(savedEntity, index); }); }); } @@ -329,8 +329,8 @@ private Flux doScroll(SearchRequest searchRequest) { Flux> searchResponses = Flux.usingWhen(Mono.fromSupplier(ScrollState::new), // state -> Mono - .from(execute((ClientCallback>>) client1 -> client1 - .search(searchRequest, EntityAsMap.class))) // + .from(execute((ClientCallback>>) client -> client.search(searchRequest, + EntityAsMap.class))) // .expand(entityAsMapSearchResponse -> { state.updateScrollId(entityAsMapSearchResponse.scrollId()); @@ -354,6 +354,10 @@ private Flux doScroll(SearchRequest searchRequest) { private Publisher cleanupScroll(ScrollState state) { + if (state.getScrollIds().isEmpty()) { + return Mono.empty(); + } + return execute((ClientCallback>) client -> client .clearScroll(ClearScrollRequest.of(csr -> csr.scrollId(state.getScrollIds())))); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/erhlc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/erhlc/ReactiveElasticsearchTemplate.java index 95ffc58a9..6426ccb96 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/erhlc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/erhlc/ReactiveElasticsearchTemplate.java @@ -139,7 +139,7 @@ public Flux saveAll(Mono> entitiesPubli return entitiesPublisher // .flatMapMany(entities -> Flux.fromIterable(entities) // - .concatMap(entity -> maybeCallBeforeConvert(entity, index)) // + .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // ).collectList() // .map(Entities::new) // .flatMapMany(entities -> { @@ -158,7 +158,7 @@ public Flux saveAll(Mono> entitiesPubli updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion())); - return maybeCallAfterSave(savedEntity, index); + return maybeCallbackAfterSave(savedEntity, index); }); }); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 6c5f399f6..755398a9c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -311,7 +311,7 @@ public Mono save(T entity, IndexCoordinates index) { Assert.notNull(entity, "Entity must not be null!"); Assert.notNull(index, "index must not be null"); - return maybeCallBeforeConvert(entity, index) + return maybeCallbackBeforeConvert(entity, index) .flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) // .map(it -> { T savedEntity = it.getT1(); @@ -321,7 +321,7 @@ public Mono save(T entity, IndexCoordinates index) { indexResponseMetaData.seqNo(), // indexResponseMetaData.primaryTerm(), // indexResponseMetaData.version())); - }).flatMap(saved -> maybeCallAfterSave(saved, index)); + }).flatMap(saved -> maybeCallbackAfterSave(saved, index)); } abstract protected Mono> doIndex(T entity, IndexCoordinates index); @@ -493,7 +493,7 @@ public Mono closePointInTime(String pit) { // region callbacks - protected Mono maybeCallBeforeConvert(T entity, IndexCoordinates index) { + protected Mono maybeCallbackBeforeConvert(T entity, IndexCoordinates index) { if (null != entityCallbacks) { return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, entity, index); @@ -502,7 +502,7 @@ protected Mono maybeCallBeforeConvert(T entity, IndexCoordinates index) { return Mono.just(entity); } - protected Mono maybeCallAfterSave(T entity, IndexCoordinates index) { + protected Mono maybeCallbackAfterSave(T entity, IndexCoordinates index) { if (null != entityCallbacks) { return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity, index); @@ -511,7 +511,7 @@ protected Mono maybeCallAfterSave(T entity, IndexCoordinates index) { return Mono.just(entity); } - protected Mono maybeCallAfterConvert(T entity, Document document, IndexCoordinates index) { + protected Mono maybeCallbackAfterConvert(T entity, Document document, IndexCoordinates index) { if (null != entityCallbacks) { return entityCallbacks.callback(ReactiveAfterConvertCallback.class, entity, document, index); @@ -528,8 +528,19 @@ protected Mono maybeCallbackAfterLoad(Document document, Class return Mono.just(document); } + /** + * Callback to convert {@link Document} into an entity of type T + * + * @param the entity type + */ protected interface DocumentCallback { + /** + * Convert a document into an entity + * + * @param document the document to convert + * @return a Mono of the entity + */ @NonNull Mono toEntity(@Nullable Document document); } @@ -566,16 +577,30 @@ public Mono toEntity(@Nullable Document document) { documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); // entity = updateIndexedObject(entity, indexedObjectInformation); - return maybeCallAfterConvert(entity, documentAfterLoad, index); + return maybeCallbackAfterConvert(entity, documentAfterLoad, index); }); } } + /** + * Callback to convert a {@link SearchDocument} into different other classes + * @param the entity type + */ protected interface SearchDocumentCallback { - Mono toEntity(SearchDocument response); - - Mono> toSearchHit(SearchDocument response); + /** + * converts a {@link SearchDocument} to an entity + * @param searchDocument + * @return the entity in a MOno + */ + Mono toEntity(SearchDocument searchDocument); + + /** + * converts a {@link SearchDocument} into a SearchHit + * @param searchDocument + * @return + */ + Mono> toSearchHit(SearchDocument searchDocument); } protected class ReadSearchDocumentCallback implements SearchDocumentCallback { diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java index d66fe9d99..d7d406823 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java @@ -94,14 +94,14 @@ private Object execute(ElasticsearchParameterAccessor parameterAccessor) { query.addSourceFilter(sourceFilter); } - Class targetType = processor.getReturnedType().getTypeToRead(); String indexName = queryMethod.getEntityInformation().getIndexName(); IndexCoordinates index = IndexCoordinates.of(indexName); ReactiveElasticsearchQueryExecution execution = getExecution(parameterAccessor, new ResultProcessingConverter(processor)); - return execution.execute(query, processor.getReturnedType().getDomainType(), targetType, index); + var returnedType = processor.getReturnedType(); + return execution.execute(query, returnedType.getDomainType(), returnedType.getTypeToRead(), index); } private ReactiveElasticsearchQueryExecution getExecution(ElasticsearchParameterAccessor accessor, diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchQueryMethod.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchQueryMethod.java index f5268ecb2..d88769d6e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchQueryMethod.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchQueryMethod.java @@ -184,7 +184,7 @@ protected MappingContext, Elasticsear * {@link org.springframework.data.elasticsearch.core.SearchHits} or a collection of * {@link org.springframework.data.elasticsearch.core.SearchHit}. * - * @return true if the method has a {@link org.springframework.data.elasticsearch.core.SearchHit}t related return type + * @return true if the method has a {@link org.springframework.data.elasticsearch.core.SearchHit} related return type * @since 4.0 */ public boolean isSearchHitMethod() { @@ -298,7 +298,8 @@ SourceFilter getSourceFilter(ParameterAccessor parameterAccessor, ElasticsearchC return fetchSourceFilterBuilder.build(); } - private String[] mapParameters(String[] source, ParameterAccessor parameterAccessor, StringQueryUtil stringQueryUtil) { + private String[] mapParameters(String[] source, ParameterAccessor parameterAccessor, + StringQueryUtil stringQueryUtil) { List fieldNames = new ArrayList<>(); @@ -308,7 +309,7 @@ private String[] mapParameters(String[] source, ParameterAccessor parameterAcces String fieldName = stringQueryUtil.replacePlaceholders(s, parameterAccessor); // this could be "[\"foo\",\"bar\"]", must be split if (fieldName.startsWith("[") && fieldName.endsWith("]")) { - //noinspection RegExpRedundantEscape + // noinspection RegExpRedundantEscape fieldNames.addAll( // Arrays.asList(fieldName.substring(1, fieldName.length() - 2) // .replaceAll("\\\"", "") // diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchQueryMethod.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchQueryMethod.java index af9696202..7ede3e7b5 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchQueryMethod.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchQueryMethod.java @@ -17,7 +17,6 @@ import static org.springframework.data.repository.util.ClassUtils.*; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.lang.reflect.Method; @@ -36,8 +35,8 @@ import org.springframework.data.projection.ProjectionFactory; import org.springframework.data.repository.core.RepositoryMetadata; import org.springframework.data.repository.util.ReactiveWrapperConverters; -import org.springframework.data.repository.util.ReactiveWrappers; import org.springframework.data.util.Lazy; +import org.springframework.data.util.ReactiveWrappers; import org.springframework.data.util.TypeInformation; import org.springframework.util.ClassUtils; @@ -74,17 +73,19 @@ public ReactiveElasticsearchQueryMethod(Method method, RepositoryMetadata metada if (!multiWrapper) { throw new IllegalStateException(String.format( "Method has to use a either multi-item reactive wrapper return type or a wrapped Page/Slice type. Offending method: %s", - method)); + method)); } if (hasParameterOfType(method, Sort.class)) { throw new IllegalStateException(String.format("Method must not have Pageable *and* Sort parameter. " - + "Use sorting capabilities on Pageble instead! Offending method: %s", method)); + + "Use sorting capabilities on Pageable instead! Offending method: %s", method)); } } - this.isCollectionQuery = Lazy.of(() -> (!(isPageQuery() || isSliceQuery()) - && ReactiveWrappers.isMultiValueType(metadata.getReturnType(method).getType()) || super.isCollectionQuery())); + this.isCollectionQuery = Lazy.of(() -> { + return (!(isPageQuery() || isSliceQuery()) + && ReactiveWrappers.isMultiValueType(metadata.getReturnType(method).getType()) || super.isCollectionQuery()); + }); } @Override @@ -150,7 +151,7 @@ public ElasticsearchParameters getParameters() { @Override protected boolean isAllowedGenericType(ParameterizedType methodGenericReturnType) { return super.isAllowedGenericType(methodGenericReturnType) - || Flux.class.isAssignableFrom((Class) methodGenericReturnType.getRawType()); + || ReactiveWrappers.supports((Class) methodGenericReturnType.getRawType()); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/ReactiveElasticsearchRepositoryMetadata.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/ReactiveElasticsearchRepositoryMetadata.java index 696f58474..d2bd76d45 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/ReactiveElasticsearchRepositoryMetadata.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/ReactiveElasticsearchRepositoryMetadata.java @@ -15,10 +15,10 @@ */ package org.springframework.data.elasticsearch.repository.support; -import reactor.core.publisher.Flux; - import java.lang.reflect.ParameterizedType; +import org.springframework.data.util.ReactiveWrappers; + /** * @author Peter-Josef Meisch * @since 4.0 @@ -32,6 +32,6 @@ public ReactiveElasticsearchRepositoryMetadata(Class repositoryInterface) { @Override protected boolean isAllowedGenericType(ParameterizedType methodGenericReturnType) { return super.isAllowedGenericType(methodGenericReturnType) - || Flux.class.isAssignableFrom((Class) methodGenericReturnType.getRawType()); + || ReactiveWrappers.supports((Class) methodGenericReturnType.getRawType()); } }