Skip to content

feat: bucket pagination #658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Features
1. [#657](https://github.com/influxdata/influxdb-client-python/pull/657): Prefer datetime.fromisoformat over dateutil.parse in Python 3.11+

1. [#658](https://github.com/influxdata/influxdb-client-python/pull/658): Add `find_buckets_iter` function that allow iterate through all pages of buckets.
## 1.43.0 [2024-05-17]

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion examples/buckets_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
List all Buckets
"""
print(f"\n------- List -------\n")
buckets = buckets_api.find_buckets().buckets
buckets = buckets_api.find_buckets_iter()
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
for bucket in buckets]))
print("---")
Expand Down
32 changes: 32 additions & 0 deletions influxdb_client/client/_pages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@


class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after

@staticmethod
def empty():
return _Page([], False, None)

@staticmethod
def initial(after):
return _Page([], True, after)


class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page

def __iter__(self):
return self

def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)
30 changes: 30 additions & 0 deletions influxdb_client/client/bucket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param
from influxdb_client.client._pages import _Page, _PageIterator


class BucketsApi(object):
Expand Down Expand Up @@ -117,3 +118,32 @@ def find_buckets(self, **kwargs):
:return: Buckets
"""
return self._buckets_service.get_buckets(**kwargs)

def find_buckets_iter(self, **kwargs):
"""Iterate over all buckets with pagination.

:key str name: Only returns buckets with the specified name
:key str org: The organization name.
:key str org_id: The organization ID.
:key str after: The last resource ID from which to seek from (but not including).
:key int limit: the maximum number of buckets in one page
:return: Buckets iterator
"""

def get_next_page(page: _Page):
return self._find_buckets_next_page(page, **kwargs)

return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))

def _find_buckets_next_page(self, page: _Page, **kwargs):
if not page.has_next:
return _Page.empty()

kw_args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
response = self._buckets_service.get_buckets(**kw_args)

buckets = response.buckets
has_next = response.links.next is not None
last_id = buckets[-1].id if buckets else None

return _Page(buckets, has_next, last_id)
33 changes: 1 addition & 32 deletions influxdb_client/client/tasks_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,7 @@

from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \
AddResourceMemberRequestBody, RunManually, Run, LogEvent


class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after

@staticmethod
def empty():
return _Page([], False, None)

@staticmethod
def initial(after):
return _Page([], True, after)


class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page

def __iter__(self):
return self

def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)
from influxdb_client.client._pages import _Page, _PageIterator


class TasksApi(object):
Expand Down
47 changes: 43 additions & 4 deletions tests/test_BucketsApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,65 @@ def test_create_bucket_retention_list(self):

self.delete_test_bucket(my_bucket)

def test_pagination(self):
def test_find_buckets(self):
my_org = self.find_my_org()
buckets = self.buckets_api.find_buckets().buckets
buckets = self.buckets_api.find_buckets(limit=100).buckets
size = len(buckets)

# create 2 buckets
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)

buckets = self.buckets_api.find_buckets().buckets
buckets = self.buckets_api.find_buckets(limit=size + 2).buckets
self.assertEqual(size + 2, len(buckets))

# offset 1
buckets = self.buckets_api.find_buckets(offset=1).buckets
buckets = self.buckets_api.find_buckets(offset=1, limit=size + 2).buckets
self.assertEqual(size + 1, len(buckets))

# count 1
buckets = self.buckets_api.find_buckets(limit=1).buckets
self.assertEqual(1, len(buckets))

def test_find_buckets_iter(self):
def count_unique_ids(items):
return len(set(map(lambda item: item.id, items)))

my_org = self.find_my_org()
more_buckets = 10
num_of_buckets = count_unique_ids(self.buckets_api.find_buckets_iter()) + more_buckets

a_bucket_name = None
for _ in range(more_buckets):
bucket_name = self.generate_name("it find_buckets_iter")
self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org)
a_bucket_name = bucket_name

# get no buckets
buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name + "blah")
self.assertEqual(count_unique_ids(buckets), 0)

# get bucket by name
buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name)
self.assertEqual(count_unique_ids(buckets), 1)

# get buckets in 3-4 batches
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets // 3)
self.assertEqual(count_unique_ids(buckets), num_of_buckets)

# get buckets in one batch
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets)
self.assertEqual(count_unique_ids(buckets), num_of_buckets)

# get buckets in one batch, requesting too much
buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets + 1)
self.assertEqual(count_unique_ids(buckets), num_of_buckets)

# skip some buckets
*_, skip_bucket = self.buckets_api.find_buckets(limit=num_of_buckets // 3).buckets
buckets = self.buckets_api.find_buckets_iter(after=skip_bucket.id)
self.assertEqual(count_unique_ids(buckets), num_of_buckets - num_of_buckets // 3)

def test_update_bucket(self):
my_org = self.find_my_org()

Expand Down