|
18 | 18 | import reactor.core.publisher.Flux;
|
19 | 19 | import reactor.core.publisher.Mono;
|
20 | 20 |
|
| 21 | +import java.util.Collections; |
21 | 22 | import java.util.List;
|
| 23 | +import java.util.function.Function; |
| 24 | +import java.util.function.UnaryOperator; |
22 | 25 |
|
23 | 26 | import org.reactivestreams.Publisher;
|
24 | 27 | import org.springframework.data.domain.Example;
|
| 28 | +import org.springframework.data.domain.Page; |
| 29 | +import org.springframework.data.domain.Pageable; |
25 | 30 | import org.springframework.data.domain.Sort;
|
26 | 31 | import org.springframework.data.r2dbc.convert.R2dbcConverter;
|
27 | 32 | import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
|
28 | 33 | import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
|
29 | 34 | import org.springframework.data.r2dbc.core.ReactiveDataAccessStrategy;
|
| 35 | +import org.springframework.data.r2dbc.core.ReactiveSelectOperation; |
30 | 36 | import org.springframework.data.r2dbc.repository.R2dbcRepository;
|
31 | 37 | import org.springframework.data.relational.core.mapping.RelationalPersistentProperty;
|
32 | 38 | import org.springframework.data.relational.core.query.Criteria;
|
33 | 39 | import org.springframework.data.relational.core.query.Query;
|
34 | 40 | import org.springframework.data.relational.repository.query.RelationalEntityInformation;
|
35 | 41 | import org.springframework.data.relational.repository.query.RelationalExampleMapper;
|
| 42 | +import org.springframework.data.repository.query.FluentQuery; |
36 | 43 | import org.springframework.data.repository.reactive.ReactiveSortingRepository;
|
37 | 44 | import org.springframework.data.util.Lazy;
|
38 | 45 | import org.springframework.data.util.Streamable;
|
@@ -432,11 +439,130 @@ public <S extends T> Mono<Boolean> exists(Example<S> example) {
|
432 | 439 | return this.entityOperations.exists(query, example.getProbeType());
|
433 | 440 | }
|
434 | 441 |
|
| 442 | + @Override |
| 443 | + public <S extends T, R, P extends Publisher<R>> P findBy(Example<S> example, |
| 444 | + Function<FluentQuery.ReactiveFluentQuery<S>, P> queryFunction) { |
| 445 | + |
| 446 | + Assert.notNull(example, "Sample must not be null!"); |
| 447 | + Assert.notNull(queryFunction, "Query function must not be null!"); |
| 448 | + |
| 449 | + return queryFunction.apply(new ReactiveFluentQueryByExample<>(example, example.getProbeType())); |
| 450 | + } |
| 451 | + |
435 | 452 | private RelationalPersistentProperty getIdProperty() {
|
436 | 453 | return this.idProperty.get();
|
437 | 454 | }
|
438 | 455 |
|
439 | 456 | private Query getIdQuery(Object id) {
|
440 | 457 | return Query.query(Criteria.where(getIdProperty().getName()).is(id));
|
441 | 458 | }
|
| 459 | + |
| 460 | + /** |
| 461 | + * {@link org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery} using {@link Example}. |
| 462 | + * |
| 463 | + * @author Mark Paluch |
| 464 | + * @since 1.4 |
| 465 | + */ |
| 466 | + class ReactiveFluentQueryByExample<S, T> extends ReactiveFluentQuerySupport<Example<S>, T> { |
| 467 | + |
| 468 | + ReactiveFluentQueryByExample(Example<S> example, Class<T> resultType) { |
| 469 | + this(example, Sort.unsorted(), resultType, Collections.emptyList()); |
| 470 | + } |
| 471 | + |
| 472 | + ReactiveFluentQueryByExample(Example<S> example, Sort sort, Class<T> resultType, List<String> fieldsToInclude) { |
| 473 | + super(example, sort, resultType, fieldsToInclude); |
| 474 | + } |
| 475 | + |
| 476 | + @Override |
| 477 | + protected <R> ReactiveFluentQueryByExample<S, R> create(Example<S> predicate, Sort sort, Class<R> resultType, |
| 478 | + List<String> fieldsToInclude) { |
| 479 | + return new ReactiveFluentQueryByExample<>(predicate, sort, resultType, fieldsToInclude); |
| 480 | + } |
| 481 | + |
| 482 | + /* |
| 483 | + * (non-Javadoc) |
| 484 | + * @see org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery#one() |
| 485 | + */ |
| 486 | + @Override |
| 487 | + public Mono<T> one() { |
| 488 | + return createQuery().one(); |
| 489 | + } |
| 490 | + |
| 491 | + /* |
| 492 | + * (non-Javadoc) |
| 493 | + * @see org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery#first() |
| 494 | + */ |
| 495 | + @Override |
| 496 | + public Mono<T> first() { |
| 497 | + return createQuery().first(); |
| 498 | + } |
| 499 | + |
| 500 | + /* |
| 501 | + * (non-Javadoc) |
| 502 | + * @see org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery#all() |
| 503 | + */ |
| 504 | + @Override |
| 505 | + public Flux<T> all() { |
| 506 | + return createQuery().all(); |
| 507 | + } |
| 508 | + |
| 509 | + /* |
| 510 | + * (non-Javadoc) |
| 511 | + * @see org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery#page(org.springframework.data.domain.Pageable) |
| 512 | + */ |
| 513 | + @Override |
| 514 | + public Mono<Page<T>> page(Pageable pageable) { |
| 515 | + |
| 516 | + Assert.notNull(pageable, "Pageable must not be null!"); |
| 517 | + |
| 518 | + Mono<List<T>> items = createQuery(q -> q.with(pageable)).all().collectList(); |
| 519 | + |
| 520 | + return items.flatMap(content -> ReactivePageableExecutionUtils.getPage(content, pageable, this.count())); |
| 521 | + } |
| 522 | + |
| 523 | + /* |
| 524 | + * (non-Javadoc) |
| 525 | + * @see org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery#count() |
| 526 | + */ |
| 527 | + @Override |
| 528 | + public Mono<Long> count() { |
| 529 | + return createQuery().count(); |
| 530 | + } |
| 531 | + |
| 532 | + /* |
| 533 | + * (non-Javadoc) |
| 534 | + * @see org.springframework.data.repository.query.FluentQuery.ReactiveFluentQuery#exists() |
| 535 | + */ |
| 536 | + @Override |
| 537 | + public Mono<Boolean> exists() { |
| 538 | + return createQuery().exists(); |
| 539 | + } |
| 540 | + |
| 541 | + private ReactiveSelectOperation.TerminatingSelect<T> createQuery() { |
| 542 | + return createQuery(UnaryOperator.identity()); |
| 543 | + } |
| 544 | + |
| 545 | + @SuppressWarnings("unchecked") |
| 546 | + private ReactiveSelectOperation.TerminatingSelect<T> createQuery(UnaryOperator<Query> queryCustomizer) { |
| 547 | + |
| 548 | + Query query = exampleMapper.getMappedExample(getPredicate()); |
| 549 | + |
| 550 | + if (getSort().isSorted()) { |
| 551 | + query = query.sort(getSort()); |
| 552 | + } |
| 553 | + |
| 554 | + if (!getFieldsToInclude().isEmpty()) { |
| 555 | + query = query.columns(getFieldsToInclude().toArray(new String[0])); |
| 556 | + } |
| 557 | + |
| 558 | + query = queryCustomizer.apply(query); |
| 559 | + |
| 560 | + ReactiveSelectOperation.ReactiveSelect<S> select = entityOperations.select(getPredicate().getProbeType()); |
| 561 | + |
| 562 | + if (getResultType() != getPredicate().getProbeType()) { |
| 563 | + return select.as(getResultType()).matching(query); |
| 564 | + } |
| 565 | + return (ReactiveSelectOperation.TerminatingSelect<T>) select.matching(query); |
| 566 | + } |
| 567 | + } |
442 | 568 | }
|
0 commit comments