diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ScrolledPage.java b/src/main/java/org/springframework/data/elasticsearch/core/ScrolledPage.java index 1445e0e84..0e0d23158 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ScrolledPage.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ScrolledPage.java @@ -7,9 +7,9 @@ /** * @author Artur Konczak * @author Peter-Josef Meisch + * @author Sascha Woo */ public interface ScrolledPage extends Page { - @Nullable - String getScrollId(); + String getScrollId(); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java b/src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java index 784d1eeb8..62104b839 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java @@ -21,11 +21,13 @@ import java.util.function.Function; import org.springframework.data.util.CloseableIterator; +import org.springframework.util.Assert; /** * Utility to support streaming queries. * * @author Mark Paluch + * @author Sascha Woo * @since 3.2 */ abstract class StreamQueries { @@ -33,72 +35,71 @@ abstract class StreamQueries { /** * Stream query results using {@link ScrolledPage}. * - * @param page the initial page. - * @param continueFunction continuation function accepting the current scrollId. - * @param clearScroll cleanup function accepting the current scrollId. + * @param page the initial scrolled page. + * @param continueScrollFunction function to continue scrolling applies to the current scrollId. + * @param clearScrollConsumer consumer to clear the scroll context by accepting the current scrollId. * @param * @return the {@link CloseableIterator}. */ static CloseableIterator streamResults(ScrolledPage page, - Function> continueFunction, Consumer clearScroll) { + Function> continueScrollFunction, Consumer clearScrollConsumer) { - return new CloseableIterator() { + Assert.notNull(page, "page must not be null."); + Assert.notNull(page.getScrollId(), "scrollId must not be null."); + Assert.notNull(continueScrollFunction, "continueScrollFunction must not be null."); + Assert.notNull(clearScrollConsumer, "clearScrollConsumer must not be null."); - /** As we couldn't retrieve single result with scroll, store current hits. */ - private volatile Iterator currentHits = page.iterator(); + return new CloseableIterator() { - /** The scroll id. */ + // As we couldn't retrieve single result with scroll, store current hits. + private volatile Iterator scrollHits = page.iterator(); private volatile String scrollId = page.getScrollId(); - - /** If stream is finished (ie: cluster returns no results. */ - private volatile boolean finished = !currentHits.hasNext(); + private volatile boolean continueScroll = scrollHits.hasNext(); @Override public void close() { + try { - // Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done) - if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) { - clearScroll.accept(scrollId); - } + clearScrollConsumer.accept(scrollId); } finally { - currentHits = null; + scrollHits = null; scrollId = null; } } @Override public boolean hasNext() { - // Test if stream is finished - if (finished) { + + if (!continueScroll) { return false; } - // Test if it remains hits - if (currentHits == null || !currentHits.hasNext()) { - // Do a new request - ScrolledPage scroll = continueFunction.apply(scrollId); - // Save hits and scroll id - currentHits = scroll.iterator(); - finished = !currentHits.hasNext(); - scrollId = scroll.getScrollId(); + + if (!scrollHits.hasNext()) { + ScrolledPage nextPage = continueScrollFunction.apply(scrollId); + scrollHits = nextPage.iterator(); + scrollId = nextPage.getScrollId(); + continueScroll = scrollHits.hasNext(); } - return currentHits.hasNext(); + + return scrollHits.hasNext(); } @Override public T next() { if (hasNext()) { - return currentHits.next(); + return scrollHits.next(); } throw new NoSuchElementException(); } @Override public void remove() { - throw new UnsupportedOperationException("remove"); + throw new UnsupportedOperationException(); } }; } // utility constructor - private StreamQueries() {} + private StreamQueries() { + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java b/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java new file mode 100644 index 000000000..0508ee488 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import static org.assertj.core.api.Assertions.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; +import org.springframework.data.domain.PageImpl; +import org.springframework.data.util.CloseableIterator; +import org.springframework.lang.Nullable; + +/** + * @author Sascha Woo + */ +public class StreamQueriesTest { + + @Test // DATAES-764 + public void shouldCallClearScrollOnIteratorClose() { + + // given + List results = new ArrayList<>(); + results.add("one"); + + ScrolledPage page = new ScrolledPageImpl("1234", results); + + AtomicBoolean clearScrollCalled = new AtomicBoolean(false); + + // when + CloseableIterator closeableIterator = StreamQueries.streamResults( // + page, // + scrollId -> new ScrolledPageImpl(scrollId, Collections.emptyList()), // + scrollId -> clearScrollCalled.set(true)); + + while (closeableIterator.hasNext()) { + closeableIterator.next(); + } + closeableIterator.close(); + + // then + assertThat(clearScrollCalled).isTrue(); + + } + + private static class ScrolledPageImpl extends PageImpl implements ScrolledPage { + + private String scrollId; + + public ScrolledPageImpl(String scrollId, List content) { + super(content); + this.scrollId = scrollId; + } + + @Override + @Nullable + public String getScrollId() { + return scrollId; + } + } +}