Skip to content

Commit 46cd4cd

Browse files
authored
Implement the point in time API.
Original Pull Request #2273 Closes #1684
1 parent a4ed730 commit 46cd4cd

26 files changed

+470
-48
lines changed

Diff for: src/main/asciidoc/reference/elasticsearch-misc.adoc

+41-2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ Query searchQuery = NativeQuery.builder()
121121
.withPageable(PageRequest.of(0, 10))
122122
.build();
123123
124-
SearchHitsIterator<SampleEntity> stream = elasticsearchOperations.searchForStream(searchQuery, SampleEntity.class,
124+
SearchHitsIterator<SampleEntity> stream = elasticsearchOperations.searchForStream(searchQuery, SampleEntity.class,
125125
index);
126126
127127
List<SampleEntity> sampleEntities = new ArrayList<>();
@@ -134,7 +134,7 @@ stream.close();
134134
====
135135

136136
There are no methods in the `SearchOperations` API to access the scroll id, if it should be necessary to access this,
137-
the following methods of the `AbstractElasticsearchTemplate` can be used (this is the base implementation for the
137+
the following methods of the `AbstractElasticsearchTemplate` can be used (this is the base implementation for the
138138
different `ElasticsearchOperations` implementations:
139139

140140
====
@@ -275,3 +275,42 @@ SearchHits<SomethingToBuy> searchHits = operations.search(query, SomethingToBuy.
275275
====
276276

277277
This works with every implementation of the `Query` interface.
278+
279+
[[elasticsearch.misc.point-in-time]]
280+
== Point In Time (PIT) API
281+
282+
`ElasticsearchOperations` supports the point in time API of Elasticsearch (see https://www.elastic
283+
.co/guide/en/elasticsearch/reference/8.3/point-in-time-api.html). The following code snippet shows how to use this
284+
feature with a fictional `Person` class:
285+
286+
====
287+
[source,java]
288+
----
289+
ElasticsearchOperations operations; // autowired
290+
Duration tenSeconds = Duration.ofSeconds(10);
291+
292+
String pit = operations.openPointInTime(IndexCoordinates.of("person"), tenSeconds); <.>
293+
294+
// create query for the pit
295+
Query query1 = new CriteriaQueryBuilder(Criteria.where("lastName").is("Smith"))
296+
.withPointInTime(new Query.PointInTime(pit, tenSeconds)) <.>
297+
.build();
298+
SearchHits<Person> searchHits1 = operations.search(query1, Person.class);
299+
// do something with the data
300+
301+
// create 2nd query for the pit, use the id returned in the previous result
302+
Query query2 = new CriteriaQueryBuilder(Criteria.where("lastName").is("Miller"))
303+
.withPointInTime(
304+
new Query.PointInTime(searchHits1.getPointInTimeId(), tenSeconds)) <.>
305+
.build();
306+
SearchHits<Person> searchHits2 = operations.search(query2, Person.class);
307+
// do something with the data
308+
309+
operations.closePointInTime(searchHits2.getPointInTimeId()); <.>
310+
311+
----
312+
<.> create a point in time for an index (can be multiple names) and a keep-alive duration and retrieve its id
313+
<.> pass that id into the query to search together with the next keep-alive value
314+
<.> for the next query, use the id returned from the previous search
315+
<.> when done, close the point in time using the last returned id
316+
====

Diff for: src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java

-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
import javax.net.ssl.HostnameVerifier;
2727
import javax.net.ssl.SSLContext;
2828

29-
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
30-
import org.springframework.data.elasticsearch.client.erhlc.ReactiveRestClients;
31-
import org.springframework.data.elasticsearch.client.erhlc.RestClients;
3229
import org.springframework.http.HttpHeaders;
3330
import org.springframework.lang.Nullable;
3431
import org.springframework.web.reactive.function.client.WebClient;

Diff for: src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java

-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint;
3232
import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder;
3333
import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder;
34-
import org.springframework.data.elasticsearch.client.erhlc.ReactiveRestClients;
35-
import org.springframework.data.elasticsearch.client.erhlc.RestClients;
3634
import org.springframework.http.HttpHeaders;
3735
import org.springframework.lang.Nullable;
3836
import org.springframework.util.Assert;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.client;
17+
18+
/**
19+
* @author Peter-Josef Meisch
20+
*/
21+
public class UnsupportedClientOperationException extends RuntimeException {
22+
public UnsupportedClientOperationException(Class<?> clientClass, String operation) {
23+
super("Client %1$s does not support the operation %2$s".formatted(clientClass, operation));
24+
}
25+
}

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/DocumentAdapters.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public static SearchDocument from(Hit<?> hit, JsonpMapper jsonpMapper) {
7373
Map<String, SearchDocumentResponse> innerHits = new LinkedHashMap<>();
7474
hit.innerHits().forEach((name, innerHitsResult) -> {
7575
// noinspection ReturnOfNull
76-
innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null,
76+
innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null, null,
7777
searchDocument -> null, jsonpMapper));
7878
});
7979

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import co.elastic.clients.transport.Version;
2828

2929
import java.io.IOException;
30+
import java.time.Duration;
3031
import java.util.ArrayList;
3132
import java.util.HashMap;
3233
import java.util.Iterator;
@@ -475,8 +476,30 @@ private List<SearchHits<?>> doMultiSearch(List<MultiSearchQueryParameter> multiS
475476
/**
476477
* value class combining the information needed for a single query in a multisearch request.
477478
*/
478-
record MultiSearchQueryParameter(Query query, Class<?> clazz, IndexCoordinates index) {
479+
record MultiSearchQueryParameter(Query query, Class<?> clazz, IndexCoordinates index) {
479480
}
481+
482+
@Override
483+
public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {
484+
485+
Assert.notNull(index, "index must not be null");
486+
Assert.notNull(keepAlive, "keepAlive must not be null");
487+
Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null");
488+
489+
var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable);
490+
return execute(client -> client.openPointInTime(request)).id();
491+
}
492+
493+
@Override
494+
public Boolean closePointInTime(String pit) {
495+
496+
Assert.notNull(pit, "pit must not be null");
497+
498+
ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit);
499+
var response = execute(client -> client.closePointInTime(request));
500+
return response.succeeded();
501+
}
502+
480503
// endregion
481504

482505
// region client callback

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java

+38-19
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,7 @@
3232
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
3333
import co.elastic.clients.elasticsearch._types.query_dsl.Like;
3434
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
35-
import co.elastic.clients.elasticsearch.core.BulkRequest;
36-
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
37-
import co.elastic.clients.elasticsearch.core.DeleteRequest;
38-
import co.elastic.clients.elasticsearch.core.GetRequest;
39-
import co.elastic.clients.elasticsearch.core.IndexRequest;
40-
import co.elastic.clients.elasticsearch.core.MgetRequest;
41-
import co.elastic.clients.elasticsearch.core.MsearchRequest;
42-
import co.elastic.clients.elasticsearch.core.SearchRequest;
43-
import co.elastic.clients.elasticsearch.core.UpdateByQueryRequest;
44-
import co.elastic.clients.elasticsearch.core.UpdateRequest;
35+
import co.elastic.clients.elasticsearch.core.*;
4536
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
4637
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
4738
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
@@ -52,6 +43,7 @@
5243
import co.elastic.clients.elasticsearch.core.search.Rescore;
5344
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
5445
import co.elastic.clients.elasticsearch.indices.*;
46+
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
5547
import co.elastic.clients.elasticsearch.indices.update_aliases.Action;
5648
import co.elastic.clients.json.JsonData;
5749
import co.elastic.clients.json.JsonpDeserializer;
@@ -1164,10 +1156,24 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
11641156
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity(clazz);
11651157

11661158
builder //
1167-
.index(Arrays.asList(indexNames)) //
11681159
.version(true) //
11691160
.trackScores(query.getTrackScores());
11701161

1162+
var pointInTime = query.getPointInTime();
1163+
if (pointInTime != null) {
1164+
builder.pit(pb -> pb.id(pointInTime.id()).keepAlive(time(pointInTime.keepAlive())));
1165+
} else {
1166+
builder.index(Arrays.asList(indexNames));
1167+
1168+
if (query.getRoute() != null) {
1169+
builder.routing(query.getRoute());
1170+
}
1171+
1172+
if (query.getPreference() != null) {
1173+
builder.preference(query.getPreference());
1174+
}
1175+
}
1176+
11711177
if (persistentEntity != null && persistentEntity.hasSeqNoPrimaryTermProperty()) {
11721178
builder.seqNoPrimaryTerm(true);
11731179
}
@@ -1205,10 +1211,6 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
12051211
builder.minScore((double) query.getMinScore());
12061212
}
12071213

1208-
if (query.getPreference() != null) {
1209-
builder.preference(query.getPreference());
1210-
}
1211-
12121214
builder.searchType(searchType(query.getSearchType()));
12131215

12141216
if (query.getSort() != null) {
@@ -1233,10 +1235,6 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
12331235
builder.trackTotalHits(th -> th.count(query.getTrackTotalHitsUpTo()));
12341236
}
12351237

1236-
if (query.getRoute() != null) {
1237-
builder.routing(query.getRoute());
1238-
}
1239-
12401238
builder.timeout(timeStringMs(query.getTimeout()));
12411239

12421240
if (query.getExplain()) {
@@ -1507,6 +1505,27 @@ public co.elastic.clients.elasticsearch._types.query_dsl.MoreLikeThisQuery moreL
15071505
return moreLikeThisQuery;
15081506
}
15091507

1508+
public OpenPointInTimeRequest searchOpenPointInTimeRequest(IndexCoordinates index, Duration keepAlive,
1509+
Boolean ignoreUnavailable) {
1510+
1511+
Assert.notNull(index, "index must not be null");
1512+
Assert.notNull(keepAlive, "keepAlive must not be null");
1513+
Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null");
1514+
1515+
return OpenPointInTimeRequest.of(opit -> opit //
1516+
.index(Arrays.asList(index.getIndexNames())) //
1517+
.ignoreUnavailable(ignoreUnavailable) //
1518+
.keepAlive(time(keepAlive)) //
1519+
);
1520+
}
1521+
1522+
public ClosePointInTimeRequest searchClosePointInTime(String pit) {
1523+
1524+
Assert.notNull(pit, "pit must not be null");
1525+
1526+
return ClosePointInTimeRequest.of(cpit -> cpit.id(pit));
1527+
}
1528+
15101529
// endregion
15111530

15121531
// region helper functions

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/SearchDocumentResponseBuilder.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ public static <T> SearchDocumentResponse from(ResponseBody<EntityAsMap> response
7575
String scrollId = responseBody.scrollId();
7676
Map<String, Aggregate> aggregations = responseBody.aggregations();
7777
Map<String, List<Suggestion<EntityAsMap>>> suggest = responseBody.suggest();
78+
var pointInTimeId = responseBody.pitId();
7879

79-
return from(hitsMetadata, scrollId, aggregations, suggest, entityCreator, jsonpMapper);
80+
return from(hitsMetadata, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper);
8081
}
8182

8283
/**
@@ -93,8 +94,9 @@ public static <T> SearchDocumentResponse from(ResponseBody<EntityAsMap> response
9394
* @return the {@link SearchDocumentResponse}
9495
*/
9596
public static <T> SearchDocumentResponse from(HitsMetadata<?> hitsMetadata, @Nullable String scrollId,
96-
@Nullable Map<String, Aggregate> aggregations, Map<String, List<Suggestion<EntityAsMap>>> suggestES,
97-
SearchDocumentResponse.EntityCreator<T> entityCreator, JsonpMapper jsonpMapper) {
97+
@Nullable String pointInTimeId, @Nullable Map<String, Aggregate> aggregations,
98+
Map<String, List<Suggestion<EntityAsMap>>> suggestES, SearchDocumentResponse.EntityCreator<T> entityCreator,
99+
JsonpMapper jsonpMapper) {
98100

99101
Assert.notNull(hitsMetadata, "hitsMetadata must not be null");
100102

@@ -126,7 +128,7 @@ public static <T> SearchDocumentResponse from(HitsMetadata<?> hitsMetadata, @Nul
126128

127129
Suggest suggest = suggestFrom(suggestES, entityCreator);
128130

129-
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments,
131+
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchDocuments,
130132
aggregationsContainer, suggest);
131133
}
132134

Diff for: src/main/java/org/springframework/data/elasticsearch/client/erhlc/SearchDocumentResponseBuilder.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable S
113113
: null;
114114
Suggest suggest = suggestFrom(suggestES, entityCreator);
115115

116-
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments,
116+
// no pointInTimeId for the deprecated implementation
117+
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, null, searchDocuments,
117118
aggregationsContainer, suggest);
118119
}
119120

Diff for: src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

+11
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.context.ApplicationContext;
2929
import org.springframework.context.ApplicationContextAware;
3030
import org.springframework.data.convert.EntityReader;
31+
import org.springframework.data.elasticsearch.client.UnsupportedClientOperationException;
3132
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
3233
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
3334
import org.springframework.data.elasticsearch.core.document.Document;
@@ -423,6 +424,16 @@ public void searchScrollClear(String scrollId) {
423424

424425
abstract public void searchScrollClear(List<String> scrollIds);
425426

427+
@Override
428+
public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {
429+
throw new UnsupportedClientOperationException(getClass(), "openPointInTime");
430+
}
431+
432+
@Override
433+
public Boolean closePointInTime(String pit) {
434+
throw new UnsupportedClientOperationException(getClass(), "closePointInTime");
435+
}
436+
426437
// endregion
427438

428439
// region Helper methods

Diff for: src/main/java/org/springframework/data/elasticsearch/core/SearchHitMapping.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ private SearchHitsImpl<T> mapHitsFromResponse(SearchDocumentResponse searchDocum
8585
long totalHits = searchDocumentResponse.getTotalHits();
8686
float maxScore = searchDocumentResponse.getMaxScore();
8787
String scrollId = searchDocumentResponse.getScrollId();
88+
String pointInTimeId = searchDocumentResponse.getPointInTimeId();
8889

8990
List<SearchHit<T>> searchHits = new ArrayList<>();
9091
List<SearchDocument> searchDocuments = searchDocumentResponse.getSearchDocuments();
@@ -100,7 +101,8 @@ private SearchHitsImpl<T> mapHitsFromResponse(SearchDocumentResponse searchDocum
100101
Suggest suggest = searchDocumentResponse.getSuggest();
101102
mapHitsInCompletionSuggestion(suggest);
102103

103-
return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, searchHits, aggregations, suggest);
104+
return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchHits,
105+
aggregations, suggest);
104106
}
105107

106108
@SuppressWarnings("unchecked")
@@ -232,6 +234,7 @@ private SearchHits<?> mapInnerDocuments(SearchHits<SearchDocument> searchHits, C
232234
searchHits.getTotalHitsRelation(), //
233235
searchHits.getMaxScore(), //
234236
scrollId, //
237+
searchHits.getPointInTimeId(), //
235238
convertedSearchHits, //
236239
searchHits.getAggregations(), //
237240
searchHits.getSuggest());

Diff for: src/main/java/org/springframework/data/elasticsearch/core/SearchHits.java

+8
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,12 @@ default Iterator<SearchHit<T>> iterator() {
100100
return getSearchHits().iterator();
101101
}
102102

103+
/**
104+
* When doing a search with a point in time, the response contains a new point in time id value.
105+
*
106+
* @return the new point in time id, if one was returned from Elasticsearch
107+
* @since 5.0
108+
*/
109+
@Nullable
110+
String getPointInTimeId();
103111
}

0 commit comments

Comments
 (0)