Skip to content

Commit 8fe0417

Browse files
committed
Support Kotlin Flow as possible return type in repository functions.
Original Pull Request #2387 Closes #2386 (cherry picked from commit 1fa6c9f)
1 parent 03b522d commit 8fe0417

7 files changed

+61
-30
lines changed

src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
115115

116116
return entitiesPublisher //
117117
.flatMapMany(entities -> Flux.fromIterable(entities) //
118-
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
118+
.concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) //
119119
).collectList() //
120120
.map(Entities::new) //
121121
.flatMapMany(entities -> {
@@ -131,7 +131,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
131131
BulkResponseItem response = indexAndResponse.getT2();
132132
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(),
133133
response.primaryTerm(), response.version()));
134-
return maybeCallAfterSave(savedEntity, index);
134+
return maybeCallbackAfterSave(savedEntity, index);
135135
});
136136
});
137137
}
@@ -329,8 +329,8 @@ private Flux<SearchDocument> doScroll(SearchRequest searchRequest) {
329329

330330
Flux<ResponseBody<EntityAsMap>> searchResponses = Flux.usingWhen(Mono.fromSupplier(ScrollState::new), //
331331
state -> Mono
332-
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client1 -> client1
333-
.search(searchRequest, EntityAsMap.class))) //
332+
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
333+
EntityAsMap.class))) //
334334
.expand(entityAsMapSearchResponse -> {
335335

336336
state.updateScrollId(entityAsMapSearchResponse.scrollId());
@@ -354,6 +354,10 @@ private Flux<SearchDocument> doScroll(SearchRequest searchRequest) {
354354

355355
private Publisher<?> cleanupScroll(ScrollState state) {
356356

357+
if (state.getScrollIds().isEmpty()) {
358+
return Mono.empty();
359+
}
360+
357361
return execute((ClientCallback<Publisher<ClearScrollResponse>>) client -> client
358362
.clearScroll(ClearScrollRequest.of(csr -> csr.scrollId(state.getScrollIds()))));
359363
}

src/main/java/org/springframework/data/elasticsearch/client/erhlc/ReactiveElasticsearchTemplate.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
139139

140140
return entitiesPublisher //
141141
.flatMapMany(entities -> Flux.fromIterable(entities) //
142-
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
142+
.concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) //
143143
).collectList() //
144144
.map(Entities::new) //
145145
.flatMapMany(entities -> {
@@ -158,7 +158,7 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
158158
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.getId(), response.getSeqNo(),
159159
response.getPrimaryTerm(), response.getVersion()));
160160

161-
return maybeCallAfterSave(savedEntity, index);
161+
return maybeCallbackAfterSave(savedEntity, index);
162162
});
163163
});
164164
}

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
311311
Assert.notNull(entity, "Entity must not be null!");
312312
Assert.notNull(index, "index must not be null");
313313

314-
return maybeCallBeforeConvert(entity, index)
314+
return maybeCallbackBeforeConvert(entity, index)
315315
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) //
316316
.map(it -> {
317317
T savedEntity = it.getT1();
@@ -321,7 +321,7 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
321321
indexResponseMetaData.seqNo(), //
322322
indexResponseMetaData.primaryTerm(), //
323323
indexResponseMetaData.version()));
324-
}).flatMap(saved -> maybeCallAfterSave(saved, index));
324+
}).flatMap(saved -> maybeCallbackAfterSave(saved, index));
325325
}
326326

327327
abstract protected <T> Mono<Tuple2<T, IndexResponseMetaData>> doIndex(T entity, IndexCoordinates index);
@@ -493,7 +493,7 @@ public Mono<Boolean> closePointInTime(String pit) {
493493

494494
// region callbacks
495495

496-
protected <T> Mono<T> maybeCallBeforeConvert(T entity, IndexCoordinates index) {
496+
protected <T> Mono<T> maybeCallbackBeforeConvert(T entity, IndexCoordinates index) {
497497

498498
if (null != entityCallbacks) {
499499
return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, entity, index);
@@ -502,7 +502,7 @@ protected <T> Mono<T> maybeCallBeforeConvert(T entity, IndexCoordinates index) {
502502
return Mono.just(entity);
503503
}
504504

505-
protected <T> Mono<T> maybeCallAfterSave(T entity, IndexCoordinates index) {
505+
protected <T> Mono<T> maybeCallbackAfterSave(T entity, IndexCoordinates index) {
506506

507507
if (null != entityCallbacks) {
508508
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity, index);
@@ -511,7 +511,7 @@ protected <T> Mono<T> maybeCallAfterSave(T entity, IndexCoordinates index) {
511511
return Mono.just(entity);
512512
}
513513

514-
protected <T> Mono<T> maybeCallAfterConvert(T entity, Document document, IndexCoordinates index) {
514+
protected <T> Mono<T> maybeCallbackAfterConvert(T entity, Document document, IndexCoordinates index) {
515515

516516
if (null != entityCallbacks) {
517517
return entityCallbacks.callback(ReactiveAfterConvertCallback.class, entity, document, index);
@@ -528,8 +528,19 @@ protected <T> Mono<Document> maybeCallbackAfterLoad(Document document, Class<T>
528528
return Mono.just(document);
529529
}
530530

531+
/**
532+
* Callback to convert {@link Document} into an entity of type T
533+
*
534+
* @param <T> the entity type
535+
*/
531536
protected interface DocumentCallback<T> {
532537

538+
/**
539+
* Convert a document into an entity
540+
*
541+
* @param document the document to convert
542+
* @return a Mono of the entity
543+
*/
533544
@NonNull
534545
Mono<T> toEntity(@Nullable Document document);
535546
}
@@ -566,16 +577,30 @@ public Mono<T> toEntity(@Nullable Document document) {
566577
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); //
567578
entity = updateIndexedObject(entity, indexedObjectInformation);
568579

569-
return maybeCallAfterConvert(entity, documentAfterLoad, index);
580+
return maybeCallbackAfterConvert(entity, documentAfterLoad, index);
570581
});
571582
}
572583
}
573584

585+
/**
586+
* Callback to convert a {@link SearchDocument} into different other classes
587+
* @param <T> the entity type
588+
*/
574589
protected interface SearchDocumentCallback<T> {
575590

576-
Mono<T> toEntity(SearchDocument response);
577-
578-
Mono<SearchHit<T>> toSearchHit(SearchDocument response);
591+
/**
592+
* converts a {@link SearchDocument} to an entity
593+
* @param searchDocument
594+
* @return the entity in a MOno
595+
*/
596+
Mono<T> toEntity(SearchDocument searchDocument);
597+
598+
/**
599+
* converts a {@link SearchDocument} into a SearchHit
600+
* @param searchDocument
601+
* @return
602+
*/
603+
Mono<SearchHit<T>> toSearchHit(SearchDocument searchDocument);
579604
}
580605

581606
protected class ReadSearchDocumentCallback<T> implements SearchDocumentCallback<T> {

src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,14 @@ private Object execute(ElasticsearchParameterAccessor parameterAccessor) {
9494
query.addSourceFilter(sourceFilter);
9595
}
9696

97-
Class<?> targetType = processor.getReturnedType().getTypeToRead();
9897
String indexName = queryMethod.getEntityInformation().getIndexName();
9998
IndexCoordinates index = IndexCoordinates.of(indexName);
10099

101100
ReactiveElasticsearchQueryExecution execution = getExecution(parameterAccessor,
102101
new ResultProcessingConverter(processor));
103102

104-
return execution.execute(query, processor.getReturnedType().getDomainType(), targetType, index);
103+
var returnedType = processor.getReturnedType();
104+
return execution.execute(query, returnedType.getDomainType(), returnedType.getTypeToRead(), index);
105105
}
106106

107107
private ReactiveElasticsearchQueryExecution getExecution(ElasticsearchParameterAccessor accessor,

src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchQueryMethod.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ protected MappingContext<? extends ElasticsearchPersistentEntity<?>, Elasticsear
184184
* {@link org.springframework.data.elasticsearch.core.SearchHits} or a collection of
185185
* {@link org.springframework.data.elasticsearch.core.SearchHit}.
186186
*
187-
* @return true if the method has a {@link org.springframework.data.elasticsearch.core.SearchHit}t related return type
187+
* @return true if the method has a {@link org.springframework.data.elasticsearch.core.SearchHit} related return type
188188
* @since 4.0
189189
*/
190190
public boolean isSearchHitMethod() {
@@ -298,7 +298,8 @@ SourceFilter getSourceFilter(ParameterAccessor parameterAccessor, ElasticsearchC
298298
return fetchSourceFilterBuilder.build();
299299
}
300300

301-
private String[] mapParameters(String[] source, ParameterAccessor parameterAccessor, StringQueryUtil stringQueryUtil) {
301+
private String[] mapParameters(String[] source, ParameterAccessor parameterAccessor,
302+
StringQueryUtil stringQueryUtil) {
302303

303304
List<String> fieldNames = new ArrayList<>();
304305

@@ -308,7 +309,7 @@ private String[] mapParameters(String[] source, ParameterAccessor parameterAcces
308309
String fieldName = stringQueryUtil.replacePlaceholders(s, parameterAccessor);
309310
// this could be "[\"foo\",\"bar\"]", must be split
310311
if (fieldName.startsWith("[") && fieldName.endsWith("]")) {
311-
//noinspection RegExpRedundantEscape
312+
// noinspection RegExpRedundantEscape
312313
fieldNames.addAll( //
313314
Arrays.asList(fieldName.substring(1, fieldName.length() - 2) //
314315
.replaceAll("\\\"", "") //

src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchQueryMethod.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import static org.springframework.data.repository.util.ClassUtils.*;
1919

20-
import reactor.core.publisher.Flux;
2120
import reactor.core.publisher.Mono;
2221

2322
import java.lang.reflect.Method;
@@ -36,8 +35,8 @@
3635
import org.springframework.data.projection.ProjectionFactory;
3736
import org.springframework.data.repository.core.RepositoryMetadata;
3837
import org.springframework.data.repository.util.ReactiveWrapperConverters;
39-
import org.springframework.data.repository.util.ReactiveWrappers;
4038
import org.springframework.data.util.Lazy;
39+
import org.springframework.data.util.ReactiveWrappers;
4140
import org.springframework.data.util.TypeInformation;
4241
import org.springframework.util.ClassUtils;
4342

@@ -74,17 +73,19 @@ public ReactiveElasticsearchQueryMethod(Method method, RepositoryMetadata metada
7473
if (!multiWrapper) {
7574
throw new IllegalStateException(String.format(
7675
"Method has to use a either multi-item reactive wrapper return type or a wrapped Page/Slice type. Offending method: %s",
77-
method));
76+
method));
7877
}
7978

8079
if (hasParameterOfType(method, Sort.class)) {
8180
throw new IllegalStateException(String.format("Method must not have Pageable *and* Sort parameter. "
82-
+ "Use sorting capabilities on Pageble instead! Offending method: %s", method));
81+
+ "Use sorting capabilities on Pageable instead! Offending method: %s", method));
8382
}
8483
}
8584

86-
this.isCollectionQuery = Lazy.of(() -> (!(isPageQuery() || isSliceQuery())
87-
&& ReactiveWrappers.isMultiValueType(metadata.getReturnType(method).getType()) || super.isCollectionQuery()));
85+
this.isCollectionQuery = Lazy.of(() -> {
86+
return (!(isPageQuery() || isSliceQuery())
87+
&& ReactiveWrappers.isMultiValueType(metadata.getReturnType(method).getType()) || super.isCollectionQuery());
88+
});
8889
}
8990

9091
@Override
@@ -150,7 +151,7 @@ public ElasticsearchParameters getParameters() {
150151
@Override
151152
protected boolean isAllowedGenericType(ParameterizedType methodGenericReturnType) {
152153
return super.isAllowedGenericType(methodGenericReturnType)
153-
|| Flux.class.isAssignableFrom((Class<?>) methodGenericReturnType.getRawType());
154+
|| ReactiveWrappers.supports((Class<?>) methodGenericReturnType.getRawType());
154155
}
155156

156157
}

src/main/java/org/springframework/data/elasticsearch/repository/support/ReactiveElasticsearchRepositoryMetadata.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
*/
1616
package org.springframework.data.elasticsearch.repository.support;
1717

18-
import reactor.core.publisher.Flux;
19-
2018
import java.lang.reflect.ParameterizedType;
2119

20+
import org.springframework.data.util.ReactiveWrappers;
21+
2222
/**
2323
* @author Peter-Josef Meisch
2424
* @since 4.0
@@ -32,6 +32,6 @@ public ReactiveElasticsearchRepositoryMetadata(Class<?> repositoryInterface) {
3232
@Override
3333
protected boolean isAllowedGenericType(ParameterizedType methodGenericReturnType) {
3434
return super.isAllowedGenericType(methodGenericReturnType)
35-
|| Flux.class.isAssignableFrom((Class<?>) methodGenericReturnType.getRawType());
35+
|| ReactiveWrappers.supports((Class<?>) methodGenericReturnType.getRawType());
3636
}
3737
}

0 commit comments

Comments
 (0)