Skip to content

Commit 18be558

Browse files
authored
Reactive implementation of the point in time API.
This PR adds the reactive implementation for the point in time API that was missing in #2273. Original Pull Request #2275 Closes #2274
1 parent 46cd4cd commit 18be558

9 files changed

+332
-0
lines changed

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient.java

+42
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,48 @@ public Mono<ClearScrollResponse> clearScroll(
276276

277277
return clearScroll(fn.apply(new ClearScrollRequest.Builder()).build());
278278
}
279+
280+
/**
281+
* @since 5.0
282+
*/
283+
public Mono<OpenPointInTimeResponse> openPointInTime(OpenPointInTimeRequest request) {
284+
285+
Assert.notNull(request, "request must not be null");
286+
287+
return Mono.fromFuture(transport.performRequestAsync(request, OpenPointInTimeRequest._ENDPOINT, transportOptions));
288+
}
289+
290+
/**
291+
* @since 5.0
292+
*/
293+
public Mono<OpenPointInTimeResponse> openPointInTime(
294+
Function<OpenPointInTimeRequest.Builder, ObjectBuilder<OpenPointInTimeRequest>> fn) {
295+
296+
Assert.notNull(fn, "fn must not be null");
297+
298+
return openPointInTime(fn.apply(new OpenPointInTimeRequest.Builder()).build());
299+
}
300+
301+
/**
302+
* @since 5.0
303+
*/
304+
public Mono<ClosePointInTimeResponse> closePointInTime(ClosePointInTimeRequest request) {
305+
306+
Assert.notNull(request, "request must not be null");
307+
308+
return Mono.fromFuture(transport.performRequestAsync(request, ClosePointInTimeRequest._ENDPOINT, transportOptions));
309+
}
310+
311+
/**
312+
* @since 5.0
313+
*/
314+
public Mono<ClosePointInTimeResponse> closePointInTime(
315+
Function<ClosePointInTimeRequest.Builder, ObjectBuilder<ClosePointInTimeRequest>> fn) {
316+
317+
Assert.notNull(fn, "fn must not be null");
318+
319+
return closePointInTime(fn.apply(new ClosePointInTimeRequest.Builder()).build());
320+
}
279321
// endregion
280322

281323
}

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java

+25
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import reactor.core.publisher.Mono;
3131
import reactor.util.function.Tuple2;
3232

33+
import java.time.Duration;
3334
import java.util.Collection;
3435
import java.util.HashMap;
3536
import java.util.List;
@@ -408,6 +409,30 @@ public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> e
408409
});
409410
}
410411

412+
@Override
413+
public Mono<String> openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {
414+
415+
Assert.notNull(index, "index must not be null");
416+
Assert.notNull(keepAlive, "keepAlive must not be null");
417+
Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null");
418+
419+
var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable);
420+
return Mono
421+
.from(execute((ClientCallback<Publisher<OpenPointInTimeResponse>>) client -> client.openPointInTime(request)))
422+
.map(OpenPointInTimeResponse::id);
423+
}
424+
425+
@Override
426+
public Mono<Boolean> closePointInTime(String pit) {
427+
428+
Assert.notNull(pit, "pit must not be null");
429+
430+
ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit);
431+
return Mono
432+
.from(execute((ClientCallback<Publisher<ClosePointInTimeResponse>>) client -> client.closePointInTime(request)))
433+
.map(ClosePointInTimeResponse::succeeded);
434+
}
435+
411436
// endregion
412437

413438
@Override

Diff for: src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import reactor.core.publisher.Mono;
2020
import reactor.util.function.Tuple2;
2121

22+
import java.time.Duration;
2223
import java.util.Collection;
2324
import java.util.List;
2425
import java.util.stream.Collectors;
@@ -29,6 +30,7 @@
2930
import org.springframework.context.ApplicationContext;
3031
import org.springframework.context.ApplicationContextAware;
3132
import org.springframework.data.convert.EntityReader;
33+
import org.springframework.data.elasticsearch.client.UnsupportedClientOperationException;
3234
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
3335
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
3436
import org.springframework.data.elasticsearch.core.document.Document;
@@ -476,6 +478,17 @@ public Mono<Long> count(Query query, Class<?> entityType, IndexCoordinates index
476478
}
477479

478480
abstract protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates index);
481+
482+
@Override
483+
public Mono<String> openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {
484+
throw new UnsupportedClientOperationException(getClass(), "openPointInTime");
485+
}
486+
487+
@Override
488+
public Mono<Boolean> closePointInTime(String pit) {
489+
throw new UnsupportedClientOperationException(getClass(), "closePointInTime");
490+
}
491+
479492
// endregion
480493

481494
// region callbacks

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java

+10
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,14 @@ public interface ReactiveSearchHits<T> {
7272
* @return wether the {@link SearchHits} has a suggest response.
7373
*/
7474
boolean hasSuggest();
75+
76+
/**
77+
* When doing a search with a point in time, the response contains a new point in time id value.
78+
*
79+
* @return the new point in time id, if one was returned from Elasticsearch
80+
* @since 5.0
81+
*/
82+
@Nullable
83+
String getPointInTimeId();
84+
7585
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java

+9
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,13 @@ public Suggest getSuggest() {
7878
public boolean hasSuggest() {
7979
return delegate.hasSuggest();
8080
}
81+
82+
/**
83+
* @since 5.0
84+
*/
85+
@Nullable
86+
@Override
87+
public String getPointInTimeId() {
88+
return delegate.getPointInTimeId();
89+
}
8190
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java

+33
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

21+
import java.time.Duration;
2122
import java.util.List;
2223

2324
import org.springframework.data.domain.Pageable;
@@ -271,6 +272,38 @@ <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType,
271272
*/
272273
Mono<Suggest> suggest(Query query, Class<?> entityType, IndexCoordinates index);
273274

275+
/**
276+
* Opens a point in time (pit) in Elasticsearch.
277+
*
278+
* @param index the index name(s) to use
279+
* @param keepAlive the duration the pit shoult be kept alive
280+
* @return the pit identifier
281+
* @since 5.0
282+
*/
283+
default Mono<String> openPointInTime(IndexCoordinates index, Duration keepAlive) {
284+
return openPointInTime(index, keepAlive, false);
285+
}
286+
287+
/**
288+
* Opens a point in time (pit) in Elasticsearch.
289+
*
290+
* @param index the index name(s) to use
291+
* @param keepAlive the duration the pit shoult be kept alive
292+
* @param ignoreUnavailable if {$literal true} the call will fail if any of the indices is missing or closed
293+
* @return the pit identifier
294+
* @since 5.0
295+
*/
296+
Mono<String> openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable);
297+
298+
/**
299+
* Closes a point in time
300+
*
301+
* @param pit the pit identifier as returned by {@link #openPointInTime(IndexCoordinates, Duration, Boolean)}
302+
* @return {@literal true} on success
303+
* @since 5.0
304+
*/
305+
Mono<Boolean> closePointInTime(String pit);
306+
274307
// region helper
275308
/**
276309
* Creates a {@link Query} to find all documents. Must be implemented by the concrete implementations to provide an
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.core;
17+
18+
import org.springframework.context.annotation.Bean;
19+
import org.springframework.context.annotation.Configuration;
20+
import org.springframework.context.annotation.Import;
21+
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration;
22+
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
23+
import org.springframework.test.context.ContextConfiguration;
24+
25+
/**
26+
* @author Peter-Josef Meisch
27+
* @since 5.0
28+
*/
29+
@ContextConfiguration(classes = ReactivePointInTimeELCIntegrationTests.Config.class)
30+
public class ReactivePointInTimeELCIntegrationTests extends ReactivePointInTimeIntegrationTests {
31+
32+
@Configuration
33+
@Import({ ReactiveElasticsearchTemplateConfiguration.class })
34+
static class Config {
35+
@Bean
36+
IndexNameProvider indexNameProvider() {
37+
return new IndexNameProvider("reactive-point-in-time");
38+
}
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.core;
17+
18+
import org.junit.jupiter.api.Disabled;
19+
import org.springframework.context.annotation.Bean;
20+
import org.springframework.context.annotation.Configuration;
21+
import org.springframework.context.annotation.Import;
22+
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
23+
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
24+
import org.springframework.test.context.ContextConfiguration;
25+
26+
/**
27+
* This test class is disabled on purpose. PIT will be introduced in Spring Data Elasticsearch 5.0 where the old
28+
* RestHighLevelClient and the {@link org.springframework.data.elasticsearch.client.erhlc.ElasticsearchRestTemplate} are
29+
* deprecated. We therefore do not add new features to this implementation anymore. Furthermore we cannot copy the
30+
* necessary code for the reactive implementation like we did before, as point in time was introduced in Elasticsearch
31+
* 7.12 after the license change.
32+
*
33+
* @author Peter-Josef Meisch
34+
*/
35+
@Disabled
36+
@ContextConfiguration(classes = ReactivePointInTimeERHLCIntegrationTests.Config.class)
37+
public class ReactivePointInTimeERHLCIntegrationTests extends ReactivePointInTimeIntegrationTests {
38+
39+
@Configuration
40+
@Import({ ReactiveElasticsearchRestTemplateConfiguration.class })
41+
static class Config {
42+
@Bean
43+
IndexNameProvider indexNameProvider() {
44+
return new IndexNameProvider("reactive-point-in-time-es7");
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)