Skip to content

feat: added callback function for getting profilers #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 17, 2022
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

### Bug Fixes
1. [#375](https://github.com/influxdata/influxdb-client-python/pull/375): Construct `InfluxDBError` without HTTP response
1. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame]
1. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float`
1. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame]
1. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log
2. [#378](https://github.com/influxdata/influxdb-client-python/pull/378): Correct serialization DataFrame with nan values [DataFrame]
3. [#384](https://github.com/influxdata/influxdb-client-python/pull/384): Timeout can be specified as a `float`
4. [#380](https://github.com/influxdata/influxdb-client-python/pull/380): Correct data types for querying [DataFrame]
5. [#391](https://github.com/influxdata/influxdb-client-python/pull/391): Ping function uses debug for log

### Features
1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Added callback function for getting profilers with example and test

### CI
1. [#370](https://github.com/influxdata/influxdb-client-python/pull/370): Add Python 3.10 to CI builds
Expand Down
42 changes: 42 additions & 0 deletions examples/query_with_profilers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.flux_table import FluxRecord
from influxdb_client.client.query_api import QueryOptions
from influxdb_client.client.write_api import SYNCHRONOUS

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:

"""
Define callback to process profiler results.
"""
class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)


callback = ProfilersCallback()

write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])

"""
Pass callback to QueryOptions
"""
query_api = client.query_api(
query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))

"""
Perform query
"""
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
31 changes: 17 additions & 14 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ class FluxCsvParser(object):
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""

def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
data_frame_index: List[str] = None, query_options: object = None) -> None:
"""Initialize defaults."""
self._response = response
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
self._data_frame_values = []
self._profilers = profilers
self._profilers = query_options.profilers if query_options is not None else None
self._profiler_callback = query_options.profiler_callback if query_options is not None else None
pass

def __enter__(self):
Expand Down Expand Up @@ -289,16 +290,18 @@ def table_list(self) -> List[FluxTable]:
else:
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))

@staticmethod
def _print_profiler_info(flux_record: FluxRecord):
def _print_profiler_info(self, flux_record: FluxRecord):
if flux_record.get_measurement().startswith("profiler/"):
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
if self._profiler_callback:
self._profiler_callback(flux_record)
else:
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
22 changes: 13 additions & 9 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
from influxdb_client.client.flux_table import FluxTable, FluxRecord
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.util.helpers import get_org_query_param
from typing import Callable


class QueryOptions(object):
"""Query options."""

def __init__(self, profilers: List[str] = None) -> None:
def __init__(self, profilers: List[str] = None, profiler_callback: Callable = None) -> None:
"""
Initialize query options.

:param profilers: list of enabled flux profilers
:param profiler_callback: callback function return profilers (FluxRecord)
"""
self.profilers = profilers
self.profiler_callback = profiler_callback


class QueryApi(object):
Expand Down Expand Up @@ -101,7 +104,7 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables,
profilers=self._profilers())
query_options=self._get_query_options())

list(_parser.generator())

Expand All @@ -123,7 +126,7 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
profilers=self._profilers())
query_options=self._get_query_options())

return _parser.generator()

Expand Down Expand Up @@ -176,17 +179,18 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index,
profilers=self._profilers())
query_options=self._get_query_options())
return _parser.generator()

def _profilers(self):
def _get_query_options(self):
if self._query_options and self._query_options.profilers:
return self._query_options.profilers
else:
return self._influxdb_client.profilers
return self._query_options
elif self._influxdb_client.profilers:
return QueryOptions(profilers=self._influxdb_client.profilers)

def _create_query(self, query, dialect=default_dialect, params: dict = None):
profilers = self._profilers()
query_options = self._get_query_options()
profilers = query_options.profilers if query_options is not None else None
q = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params, profilers))

if profilers:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_QueryApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,27 @@ def test_query_profiler_present(self):
print(f"Profiler record: {flux_record}")
self.assertTrue(found_profiler_records)

def test_profilers_callback(self):

class ProfilersCallback(object):
def __init__(self):
self.records = []

def __call__(self, flux_record):
self.records.append(flux_record.values)

def get_record(self, num, val):
return (self.records[num])[val]

callback = ProfilersCallback()

query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"],
profiler_callback=callback))
query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

self.assertEqual("profiler/query", callback.get_record(0, "_measurement"))
self.assertEqual("profiler/operator", callback.get_record(1, "_measurement"))

def test_profiler_ast(self):

expect = {
Expand Down