From ac993c6e889b0775b988ecc6f33be42c7e9836e4 Mon Sep 17 00:00:00 2001 From: Reese Date: Fri, 23 Sep 2022 16:33:11 -0400 Subject: [PATCH 1/6] feat: adds a page method utilizing PIT --- elasticsearch_dsl/search.py | 47 ++++++++++++++++++++++++++- tests/test_integration/test_search.py | 9 +++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index c07fe7e71..79c5cf59e 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -18,7 +18,7 @@ import collections.abc import copy -from elasticsearch.exceptions import TransportError +from elasticsearch.exceptions import NotFoundError, TransportError from elasticsearch.helpers import scan from .aggs import A, AggBase @@ -726,6 +726,51 @@ def scan(self): for hit in scan(es, query=self.to_dict(), index=self._index, **self._params): yield self._get_result(hit) + def page(self): + """ + Turn the search into a paged search utilizing Point in Time (PIT) and search_after. + Returns a generator that will iterate over all the documents matching the query. + """ + search = self._clone() + + # A sort is required to page search results. We use the optimized default if sort is None. + # https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html + if not search._sort: + search._sort = ['_shard_doc'] + + keep_alive = search._params.pop("keep_alive", "30s") + pit = search._using.open_point_in_time(index=search._index, keep_alive=keep_alive) + pit_id = pit['id'] + + # The index is passed with Point in Time (PIT). + search._index = None + search._extra.update(pit={"id": pit['id'], "keep_alive": keep_alive}) + + es = get_connection(search._using) + + response = es.search(body=search.to_dict(), **search._params) + while hits := response["hits"]["hits"]: + for hit in hits: + yield self._get_result(hit) + + # If we have fewer hits than our batch size, we know there are no more results. + if len(hits) < search._params.get('size', 0): + break + + last_document = hits[-1] + pit_id = response.pit_id + search._extra.update( + pit={"id": pit_id, "keep_alive": keep_alive}, + search_after=last_document["sort"] + ) + response = es.search(body=search.to_dict(), **search._params) + + # Try to close the PIT unless it is already closed. + try: + search._using.close_point_in_time(body={"id": pit_id}) + except NotFoundError: + pass + def delete(self): """ delete() executes the query by delegating to delete_by_query() diff --git a/tests/test_integration/test_search.py b/tests/test_integration/test_search.py index e3ce061eb..6add0e9d5 100644 --- a/tests/test_integration/test_search.py +++ b/tests/test_integration/test_search.py @@ -110,6 +110,15 @@ def test_scan_iterates_through_all_docs(data_client): assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits} +def test_page_iterates_through_all_docs(data_client): + s = Search(index="flat-git") + + commits = list(s.page()) + + assert 52 == len(commits) + assert {d["_id"] for d in FLAT_DATA} == {c.meta.id for c in commits} + + def test_response_is_cached(data_client): s = Repository.search() repos = list(s) From aca5124eea6823174fe7607ab41cabbe0cd7c99a Mon Sep 17 00:00:00 2001 From: Reese Date: Sat, 24 Sep 2022 10:00:05 -0400 Subject: [PATCH 2/6] fix: attribute error, lint --- elasticsearch_dsl/search.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index 79c5cf59e..f6241945b 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -736,11 +736,11 @@ def page(self): # A sort is required to page search results. We use the optimized default if sort is None. # https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html if not search._sort: - search._sort = ['_shard_doc'] + search._sort = ["_shard_doc"] keep_alive = search._params.pop("keep_alive", "30s") pit = search._using.open_point_in_time(index=search._index, keep_alive=keep_alive) - pit_id = pit['id'] + pit_id = pit["id"] # The index is passed with Point in Time (PIT). search._index = None @@ -754,13 +754,13 @@ def page(self): yield self._get_result(hit) # If we have fewer hits than our batch size, we know there are no more results. - if len(hits) < search._params.get('size', 0): + if len(hits) < search._params.get("size", 0): break last_document = hits[-1] - pit_id = response.pit_id + pit_id = response["pit_id"] search._extra.update( - pit={"id": pit_id, "keep_alive": keep_alive}, + pit={"id": pit_id, "keep_alive": keep_alive}, search_after=last_document["sort"] ) response = es.search(body=search.to_dict(), **search._params) From 989afa8d5356e0b0e4dc3ea1862e771a2b0da1f2 Mon Sep 17 00:00:00 2001 From: Reese Date: Sat, 24 Sep 2022 10:02:40 -0400 Subject: [PATCH 3/6] fix: black formatting --- elasticsearch_dsl/search.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index f6241945b..cacd5c851 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -739,12 +739,14 @@ def page(self): search._sort = ["_shard_doc"] keep_alive = search._params.pop("keep_alive", "30s") - pit = search._using.open_point_in_time(index=search._index, keep_alive=keep_alive) + pit = search._using.open_point_in_time( + index=search._index, keep_alive=keep_alive + ) pit_id = pit["id"] # The index is passed with Point in Time (PIT). search._index = None - search._extra.update(pit={"id": pit['id'], "keep_alive": keep_alive}) + search._extra.update(pit={"id": pit["id"], "keep_alive": keep_alive}) es = get_connection(search._using) @@ -761,7 +763,7 @@ def page(self): pit_id = response["pit_id"] search._extra.update( pit={"id": pit_id, "keep_alive": keep_alive}, - search_after=last_document["sort"] + search_after=last_document["sort"], ) response = es.search(body=search.to_dict(), **search._params) From 02ad7a1f4c7948afc737d6a5c8493a37a7d13d5b Mon Sep 17 00:00:00 2001 From: reese-allison <42069854+reese-allison@users.noreply.github.com> Date: Sat, 22 Oct 2022 19:50:40 -0400 Subject: [PATCH 4/6] Update elasticsearch_dsl/search.py Co-authored-by: Rostyslav Khudov <59306666+rkhudov@users.noreply.github.com> --- elasticsearch_dsl/search.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index cacd5c851..d5ad3ab78 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -739,8 +739,11 @@ def page(self): search._sort = ["_shard_doc"] keep_alive = search._params.pop("keep_alive", "30s") - pit = search._using.open_point_in_time( - index=search._index, keep_alive=keep_alive + es = get_connection(search._using) + + pit = es.open_point_in_time( + index=search._index, + keep_alive=keep_alive, ) pit_id = pit["id"] From d7bc7a482f33f3c410e496afd5f2019176824f29 Mon Sep 17 00:00:00 2001 From: reese-allison <42069854+reese-allison@users.noreply.github.com> Date: Sat, 22 Oct 2022 19:51:13 -0400 Subject: [PATCH 5/6] Update elasticsearch_dsl/search.py Co-authored-by: Rostyslav Khudov <59306666+rkhudov@users.noreply.github.com> --- elasticsearch_dsl/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index d5ad3ab78..ac5f53f3d 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -749,7 +749,7 @@ def page(self): # The index is passed with Point in Time (PIT). search._index = None - search._extra.update(pit={"id": pit["id"], "keep_alive": keep_alive}) + search._extra.update(pit={"id": pit_id, "keep_alive": keep_alive}) es = get_connection(search._using) From b31ec200a82c2a69b025cf8f283a9bdba6a8281c Mon Sep 17 00:00:00 2001 From: reese-allison <42069854+reese-allison@users.noreply.github.com> Date: Sat, 22 Oct 2022 19:55:39 -0400 Subject: [PATCH 6/6] Use es_connection for close_point_in_time --- elasticsearch_dsl/search.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/elasticsearch_dsl/search.py b/elasticsearch_dsl/search.py index ac5f53f3d..44fbf782d 100644 --- a/elasticsearch_dsl/search.py +++ b/elasticsearch_dsl/search.py @@ -751,8 +751,6 @@ def page(self): search._index = None search._extra.update(pit={"id": pit_id, "keep_alive": keep_alive}) - es = get_connection(search._using) - response = es.search(body=search.to_dict(), **search._params) while hits := response["hits"]["hits"]: for hit in hits: @@ -772,7 +770,7 @@ def page(self): # Try to close the PIT unless it is already closed. try: - search._using.close_point_in_time(body={"id": pit_id}) + es.close_point_in_time(body={"id": pit_id}) except NotFoundError: pass