Skip to content

DATAES-459 - Return ScrolledPage instead of Page while using startScr… #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/asciidoc/reference/elasticsearch-misc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withPageable(PageRequest.of(0, 10))
.build();

Page<SampleEntity> scroll = elasticsearchTemplate.startScroll(1000, searchQuery, SampleEntity.class);
ScrolledPage<SampleEntity> scroll = elasticsearchTemplate.startScroll(1000, searchQuery, SampleEntity.class);

String scrollId = ((ScrolledPage) scroll).getScrollId();
String scrollId = scroll.getScrollId();
List<SampleEntity> sampleEntities = new ArrayList<>();
while (scroll.hasContent()) {
sampleEntities.addAll(scroll.getContent());
scrollId = ((ScrolledPage) scroll).getScrollId();
scrollId = scroll.getScrollId();
scroll = elasticsearchTemplate.continueScroll(scrollId, 1000, SampleEntity.class);
}
elasticsearchTemplate.clearScroll(scrollId);
Expand Down Expand Up @@ -74,4 +74,4 @@ while (stream.hasNext()) {
sampleEntities.add(stream.next());
}
----
====
====
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* @author Mohsin Husen
* @author Kevin Leturc
* @author Zetang Zeng
* @author Dmitriy Yakovlev
*/
public interface ElasticsearchOperations {

Expand Down Expand Up @@ -544,7 +545,7 @@ default List<List<?>> queryForList(List<SearchQuery> queries, List<Class<?>> cla
* @param clazz The class of entity to retrieve.
* @return The scan id for input query.
*/
<T> Page<T> startScroll(long scrollTimeInMillis, SearchQuery query, Class<T> clazz);
<T> ScrolledPage<T> startScroll(long scrollTimeInMillis, SearchQuery query, Class<T> clazz);

/**
* Returns scrolled page for given query
Expand All @@ -555,7 +556,7 @@ default List<List<?>> queryForList(List<SearchQuery> queries, List<Class<?>> cla
* @param mapper Custom impl to map result to entities
* @return The scan id for input query.
*/
<T> Page<T> startScroll(long scrollTimeInMillis, SearchQuery query, Class<T> clazz, SearchResultMapper mapper);
<T> ScrolledPage<T> startScroll(long scrollTimeInMillis, SearchQuery query, Class<T> clazz, SearchResultMapper mapper);

/**
* Returns scrolled page for given query
Expand All @@ -566,7 +567,7 @@ default List<List<?>> queryForList(List<SearchQuery> queries, List<Class<?>> cla
* @param clazz The class of entity to retrieve.
* @return The scan id for input query.
*/
<T> Page<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz);
<T> ScrolledPage<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz);

/**
* Returns scrolled page for given query
Expand All @@ -577,11 +578,11 @@ default List<List<?>> queryForList(List<SearchQuery> queries, List<Class<?>> cla
* @param mapper Custom impl to map result to entities
* @return The scan id for input query.
*/
<T> Page<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz, SearchResultMapper mapper);
<T> ScrolledPage<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz, SearchResultMapper mapper);


<T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz);
<T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz, SearchResultMapper mapper);
<T> ScrolledPage<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz);
<T> ScrolledPage<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz, SearchResultMapper mapper);
/**
* Clears the search contexts associated with specified scroll ids.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public <T> Page<T> queryForPage(StringQuery query, Class<T> clazz, SearchResultM
@Override
public <T> CloseableIterator<T> stream(CriteriaQuery query, Class<T> clazz) {
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
return doStream(scrollTimeInMillis, (ScrolledPage<T>) startScroll(scrollTimeInMillis, query, clazz), clazz,
return doStream(scrollTimeInMillis, startScroll(scrollTimeInMillis, query, clazz), clazz,
resultsMapper);
}

Expand All @@ -508,7 +508,7 @@ public <T> CloseableIterator<T> stream(SearchQuery query, Class<T> clazz) {
@Override
public <T> CloseableIterator<T> stream(SearchQuery query, final Class<T> clazz, final SearchResultMapper mapper) {
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
return doStream(scrollTimeInMillis, (ScrolledPage<T>) startScroll(scrollTimeInMillis, query, clazz, mapper), clazz,
return doStream(scrollTimeInMillis, startScroll(scrollTimeInMillis, query, clazz, mapper), clazz,
mapper);
}

Expand Down Expand Up @@ -547,7 +547,7 @@ public boolean hasNext() {
// Test if it remains hits
if (currentHits == null || !currentHits.hasNext()) {
// Do a new request
final ScrolledPage<T> scroll = (ScrolledPage<T>) continueScroll(scrollId, scrollTimeInMillis, clazz, mapper);
final ScrolledPage<T> scroll = continueScroll(scrollId, scrollTimeInMillis, clazz, mapper);
// Save hits and scroll id
currentHits = scroll.iterator();
finished = !currentHits.hasNext();
Expand Down Expand Up @@ -878,14 +878,14 @@ public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz,
}
};

Page<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
ScrolledPage<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
deleteEntryResultMapper);
BulkRequest request = new BulkRequest();
List<SearchHit> documentsToDelete = new ArrayList<>();

do {
documentsToDelete.addAll(scrolledResult.getContent());
scrolledResult = continueScroll(((ScrolledPage<T>) scrolledResult).getScrollId(), scrollTimeInMillis,
scrolledResult = continueScroll(scrolledResult.getScrollId(), scrollTimeInMillis,
SearchHit.class, deleteEntryResultMapper);
} while (scrolledResult.getContent().size() != 0);

Expand All @@ -903,7 +903,7 @@ public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz,
}
}

clearScroll(((ScrolledPage<T>) scrolledResult).getScrollId());
clearScroll(scrolledResult.getScrollId());
}

@Override
Expand Down Expand Up @@ -999,29 +999,29 @@ private SearchResponse doScroll(SearchRequest request, SearchQuery searchQuery)
}
}

public <T> Page<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz) {
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz) {
SearchResponse response = doScroll(prepareScroll(searchQuery, scrollTimeInMillis, clazz), searchQuery);
return resultsMapper.mapResults(response, clazz, null);
}

public <T> Page<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz) {
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz) {
SearchResponse response = doScroll(prepareScroll(criteriaQuery, scrollTimeInMillis, clazz), criteriaQuery);
return resultsMapper.mapResults(response, clazz, null);
}

public <T> Page<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz,
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz,
SearchResultMapper mapper) {
SearchResponse response = doScroll(prepareScroll(searchQuery, scrollTimeInMillis, clazz), searchQuery);
return mapper.mapResults(response, clazz, null);
}

public <T> Page<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz,
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz,
SearchResultMapper mapper) {
SearchResponse response = doScroll(prepareScroll(criteriaQuery, scrollTimeInMillis, clazz), criteriaQuery);
return mapper.mapResults(response, clazz, null);
}

public <T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz) {
public <T> ScrolledPage<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz) {
SearchScrollRequest request = new SearchScrollRequest(scrollId);
request.scroll(TimeValue.timeValueMillis(scrollTimeInMillis));
SearchResponse response;
Expand All @@ -1033,7 +1033,7 @@ public <T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMi
return resultsMapper.mapResults(response, clazz, Pageable.unpaged());
}

public <T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz,
public <T> ScrolledPage<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz,
SearchResultMapper mapper) {
SearchScrollRequest request = new SearchScrollRequest(scrollId);
request.scroll(TimeValue.timeValueMillis(scrollTimeInMillis));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public <T> Page<T> queryForPage(StringQuery query, Class<T> clazz, SearchResultM
@Override
public <T> CloseableIterator<T> stream(CriteriaQuery query, Class<T> clazz) {
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
return doStream(scrollTimeInMillis, (ScrolledPage<T>) startScroll(scrollTimeInMillis, query, clazz), clazz,
return doStream(scrollTimeInMillis, startScroll(scrollTimeInMillis, query, clazz), clazz,
resultsMapper);
}

Expand All @@ -449,7 +449,7 @@ public <T> CloseableIterator<T> stream(SearchQuery query, Class<T> clazz) {
@Override
public <T> CloseableIterator<T> stream(SearchQuery query, final Class<T> clazz, final SearchResultMapper mapper) {
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
return doStream(scrollTimeInMillis, (ScrolledPage<T>) startScroll(scrollTimeInMillis, query, clazz, mapper), clazz,
return doStream(scrollTimeInMillis, startScroll(scrollTimeInMillis, query, clazz, mapper), clazz,
mapper);
}

Expand Down Expand Up @@ -488,7 +488,7 @@ public boolean hasNext() {
// Test if it remains hits
if (currentHits == null || !currentHits.hasNext()) {
// Do a new request
final ScrolledPage<T> scroll = (ScrolledPage<T>) continueScroll(scrollId, scrollTimeInMillis, clazz, mapper);
final ScrolledPage<T> scroll = continueScroll(scrollId, scrollTimeInMillis, clazz, mapper);
// Save hits and scroll id
currentHits = scroll.iterator();
finished = !currentHits.hasNext();
Expand Down Expand Up @@ -765,14 +765,14 @@ public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz,
}
};

Page<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
ScrolledPage<SearchHit> scrolledResult = startScroll(scrollTimeInMillis, searchQuery, SearchHit.class,
deleteEntryResultMapper);
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
List<SearchHit> documentsToDelete = new ArrayList<>();

do {
documentsToDelete.addAll(scrolledResult.getContent());
scrolledResult = continueScroll(((ScrolledPage<T>) scrolledResult).getScrollId(), scrollTimeInMillis,
scrolledResult = continueScroll(scrolledResult.getScrollId(), scrollTimeInMillis,
SearchHit.class, deleteEntryResultMapper);
} while (scrolledResult.getContent().size() != 0);

Expand All @@ -784,7 +784,7 @@ public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz,
bulkRequestBuilder.execute().actionGet();
}

clearScroll(((ScrolledPage<T>) scrolledResult).getScrollId());
clearScroll(scrolledResult.getScrollId());
}

@Override
Expand Down Expand Up @@ -862,35 +862,35 @@ private SearchResponse doScroll(SearchRequestBuilder requestBuilder, SearchQuery
return getSearchResponse(requestBuilder.setQuery(searchQuery.getQuery()));
}

public <T> Page<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz) {
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz) {
SearchResponse response = doScroll(prepareScroll(searchQuery, scrollTimeInMillis, clazz), searchQuery);
return resultsMapper.mapResults(response, clazz, null);
}

public <T> Page<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz) {
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz) {
SearchResponse response = doScroll(prepareScroll(criteriaQuery, scrollTimeInMillis, clazz), criteriaQuery);
return resultsMapper.mapResults(response, clazz, null);
}

public <T> Page<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz,
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, SearchQuery searchQuery, Class<T> clazz,
SearchResultMapper mapper) {
SearchResponse response = doScroll(prepareScroll(searchQuery, scrollTimeInMillis, clazz), searchQuery);
return mapper.mapResults(response, clazz, null);
}

public <T> Page<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz,
public <T> ScrolledPage<T> startScroll(long scrollTimeInMillis, CriteriaQuery criteriaQuery, Class<T> clazz,
SearchResultMapper mapper) {
SearchResponse response = doScroll(prepareScroll(criteriaQuery, scrollTimeInMillis, clazz), criteriaQuery);
return mapper.mapResults(response, clazz, null);
}

public <T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz) {
public <T> ScrolledPage<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz) {
SearchResponse response = getSearchResponse(
client.prepareSearchScroll(scrollId).setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).execute());
return resultsMapper.mapResults(response, clazz, Pageable.unpaged());
}

public <T> Page<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz,
public <T> ScrolledPage<T> continueScroll(@Nullable String scrollId, long scrollTimeInMillis, Class<T> clazz,
SearchResultMapper mapper) {
SearchResponse response = getSearchResponse(
client.prepareSearchScroll(scrollId).setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).execute());
Expand Down
Loading