Skip to content

Commit e22e2bb

Browse files
xhaggisothawo
authored andcommitted
DATAES-764 - StreamQueries#streamResults does not clear scroll context when finished.
Original PR: #406 (cherry picked from commit f103bdb)
1 parent b11f8d2 commit e22e2bb

File tree

3 files changed

+112
-32
lines changed

3 files changed

+112
-32
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ScrolledPage.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
package org.springframework.data.elasticsearch.core;
33

44
import org.springframework.data.domain.Page;
5+
import org.springframework.lang.Nullable;
56

67
/**
78
* @author Artur Konczak
9+
* @author Peter-Josef Meisch
10+
* @author Sascha Woo
811
*/
912
public interface ScrolledPage<T> extends Page<T> {
1013

11-
String getScrollId();
12-
14+
String getScrollId();
1315
}

src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java

+31-30
Original file line numberDiff line numberDiff line change
@@ -21,84 +21,85 @@
2121
import java.util.function.Function;
2222

2323
import org.springframework.data.util.CloseableIterator;
24+
import org.springframework.util.Assert;
2425

2526
/**
2627
* Utility to support streaming queries.
2728
*
2829
* @author Mark Paluch
30+
* @author Sascha Woo
2931
* @since 3.2
3032
*/
3133
abstract class StreamQueries {
3234

3335
/**
3436
* Stream query results using {@link ScrolledPage}.
3537
*
36-
* @param page the initial page.
37-
* @param continueFunction continuation function accepting the current scrollId.
38-
* @param clearScroll cleanup function accepting the current scrollId.
38+
* @param page the initial scrolled page.
39+
* @param continueScrollFunction function to continue scrolling applies to the current scrollId.
40+
* @param clearScrollConsumer consumer to clear the scroll context by accepting the current scrollId.
3941
* @param <T>
4042
* @return the {@link CloseableIterator}.
4143
*/
4244
static <T> CloseableIterator<T> streamResults(ScrolledPage<T> page,
43-
Function<String, ScrolledPage<T>> continueFunction, Consumer<String> clearScroll) {
45+
Function<String, ScrolledPage<T>> continueScrollFunction, Consumer<String> clearScrollConsumer) {
4446

45-
return new CloseableIterator<T>() {
47+
Assert.notNull(page, "page must not be null.");
48+
Assert.notNull(page.getScrollId(), "scrollId must not be null.");
49+
Assert.notNull(continueScrollFunction, "continueScrollFunction must not be null.");
50+
Assert.notNull(clearScrollConsumer, "clearScrollConsumer must not be null.");
4651

47-
/** As we couldn't retrieve single result with scroll, store current hits. */
48-
private volatile Iterator<T> currentHits = page.iterator();
52+
return new CloseableIterator<T>() {
4953

50-
/** The scroll id. */
54+
// As we couldn't retrieve single result with scroll, store current hits.
55+
private volatile Iterator<T> scrollHits = page.iterator();
5156
private volatile String scrollId = page.getScrollId();
52-
53-
/** If stream is finished (ie: cluster returns no results. */
54-
private volatile boolean finished = !currentHits.hasNext();
57+
private volatile boolean continueScroll = scrollHits.hasNext();
5558

5659
@Override
5760
public void close() {
61+
5862
try {
59-
// Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done)
60-
if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) {
61-
clearScroll.accept(scrollId);
62-
}
63+
clearScrollConsumer.accept(scrollId);
6364
} finally {
64-
currentHits = null;
65+
scrollHits = null;
6566
scrollId = null;
6667
}
6768
}
6869

6970
@Override
7071
public boolean hasNext() {
71-
// Test if stream is finished
72-
if (finished) {
72+
73+
if (!continueScroll) {
7374
return false;
7475
}
75-
// Test if it remains hits
76-
if (currentHits == null || !currentHits.hasNext()) {
77-
// Do a new request
78-
ScrolledPage<T> scroll = continueFunction.apply(scrollId);
79-
// Save hits and scroll id
80-
currentHits = scroll.iterator();
81-
finished = !currentHits.hasNext();
82-
scrollId = scroll.getScrollId();
76+
77+
if (!scrollHits.hasNext()) {
78+
ScrolledPage<T> nextPage = continueScrollFunction.apply(scrollId);
79+
scrollHits = nextPage.iterator();
80+
scrollId = nextPage.getScrollId();
81+
continueScroll = scrollHits.hasNext();
8382
}
84-
return currentHits.hasNext();
83+
84+
return scrollHits.hasNext();
8585
}
8686

8787
@Override
8888
public T next() {
8989
if (hasNext()) {
90-
return currentHits.next();
90+
return scrollHits.next();
9191
}
9292
throw new NoSuchElementException();
9393
}
9494

9595
@Override
9696
public void remove() {
97-
throw new UnsupportedOperationException("remove");
97+
throw new UnsupportedOperationException();
9898
}
9999
};
100100
}
101101

102102
// utility constructor
103-
private StreamQueries() {}
103+
private StreamQueries() {
104+
}
104105
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.core;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import org.junit.jupiter.api.Test;
26+
import org.springframework.data.domain.PageImpl;
27+
import org.springframework.data.util.CloseableIterator;
28+
import org.springframework.lang.Nullable;
29+
30+
/**
31+
* @author Sascha Woo
32+
*/
33+
public class StreamQueriesTest {
34+
35+
@Test // DATAES-764
36+
public void shouldCallClearScrollOnIteratorClose() {
37+
38+
// given
39+
List<String> results = new ArrayList<>();
40+
results.add("one");
41+
42+
ScrolledPage<String> page = new ScrolledPageImpl("1234", results);
43+
44+
AtomicBoolean clearScrollCalled = new AtomicBoolean(false);
45+
46+
// when
47+
CloseableIterator<String> closeableIterator = StreamQueries.streamResults( //
48+
page, //
49+
scrollId -> new ScrolledPageImpl(scrollId, Collections.emptyList()), //
50+
scrollId -> clearScrollCalled.set(true));
51+
52+
while (closeableIterator.hasNext()) {
53+
closeableIterator.next();
54+
}
55+
closeableIterator.close();
56+
57+
// then
58+
assertThat(clearScrollCalled).isTrue();
59+
60+
}
61+
62+
private static class ScrolledPageImpl extends PageImpl<String> implements ScrolledPage<String> {
63+
64+
private String scrollId;
65+
66+
public ScrolledPageImpl(String scrollId, List<String> content) {
67+
super(content);
68+
this.scrollId = scrollId;
69+
}
70+
71+
@Override
72+
@Nullable
73+
public String getScrollId() {
74+
return scrollId;
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)