From 91849142851454413832e6155b9b96ad31fffacf Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 1 Apr 2021 16:02:11 +0200 Subject: [PATCH 1/5] feat: bind query parameters (#219) --- CHANGELOG.md | 3 + README.rst | 35 ++++- examples/query.py | 43 ++++++- influxdb_client/client/query_api.py | 81 ++++++++++-- tests/test_QueryApi.py | 193 ++++++++++++++++++++++++++++ 5 files changed, 333 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d362466b..f2eaa3b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.17.0 [unreleased] +### Features +1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters + ## 1.16.0 [2021-04-01] ### Features diff --git a/README.rst b/README.rst index 697061ba..ec0618fb 100644 --- a/README.rst +++ b/README.rst @@ -464,9 +464,10 @@ Queries The result retrieved by `QueryApi `_ could be formatted as a: 1. Flux data structure: `FluxTable `_, `FluxColumn `_ and `FluxRecord `_ -2. `csv.reader `__ which will iterate over CSV lines -3. Raw unprocessed results as a ``str`` iterator -4. `Pandas DataFrame `_ +2. Query bind parameters +3. `csv.reader `__ which will iterate over CSV lines +4. Raw unprocessed results as a ``str`` iterator +5. `Pandas DataFrame `_ The API also support streaming ``FluxRecord`` via `query_stream `_, see example below: @@ -502,6 +503,34 @@ The API also support streaming ``FluxRecord`` via `query_stream range(start: _start) + |> filter(fn: (r) => r["_measurement"] == "my_measurement") + |> filter(fn: (r) => r["_field"] == "temperature") + |> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam) + |> aggregateWindow(every: _every, fn: mean, createEmpty: true) + |> sort(columns: ["_time"], desc: _desc) + ''', params=p) + + for table in tables: + print(table) + for record in table.records: + print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"])) + + print() + print() + """ Query: using Stream """ diff --git a/examples/query.py b/examples/query.py index 6b2a84fe..540bdf02 100644 --- a/examples/query.py +++ b/examples/query.py @@ -1,7 +1,9 @@ +import datetime as datetime + from influxdb_client import InfluxDBClient, Point, Dialect from influxdb_client.client.write_api import SYNCHRONOUS -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") +client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True) write_api = client.write_api(write_options=SYNCHRONOUS) query_api = client.query_api() @@ -28,6 +30,34 @@ print() print() +""" +Query: using Bind parameters +""" + +p = {"_start": datetime.timedelta(hours=-1), + "_location": "Prague", + "_desc": True, + "_floatParam": 25.1, + "_every": datetime.timedelta(minutes=5) + } + +tables = query_api.query(''' + from(bucket:"my-bucket") |> range(start: _start) + |> filter(fn: (r) => r["_measurement"] == "my_measurement") + |> filter(fn: (r) => r["_field"] == "temperature") + |> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam) + |> aggregateWindow(every: _every, fn: mean, createEmpty: true) + |> sort(columns: ["_time"], desc: _desc) +''', params=p) + +for table in tables: + print(table) + for record in table.records: + print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"])) + +print() +print() + """ Query: using Stream """ @@ -66,10 +96,13 @@ """ Query: using Pandas DataFrame """ -data_frame = query_api.query_data_frame('from(bucket:"my-bucket") ' - '|> range(start: -10m) ' - '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") ' - '|> keep(columns: ["location", "temperature"])') +data_frame = query_api.query_data_frame(''' +from(bucket:"my-bucket") + |> range(start: -10m) + |> filter(fn: (r) => r["_measurement"] == "my_measurement") + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> keep(columns: ["_time","location", "temperature"]) +''') print(data_frame.to_string()) """ diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index 2aea0b9f..bb409166 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -6,9 +6,12 @@ import codecs import csv +from datetime import datetime, timedelta from typing import List, Generator, Any +from pytz import UTC -from influxdb_client import Dialect +from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \ + VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression from influxdb_client import Query, QueryService from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode from influxdb_client.client.flux_table import FluxTable, FluxRecord @@ -29,51 +32,54 @@ def __init__(self, influxdb_client): self._influxdb_client = influxdb_client self._query_api = QueryService(influxdb_client.api_client) - def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect): + def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, params: dict = None): """ Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file. :param query: a Flux query :param org: organization name (optional if already specified in InfluxDBClient) :param dialect: csv dialect format + :param params: bind parameters :return: The returned object is an iterator. Each iteration returns a row of the CSV file (which can span multiple input lines). """ if org is None: org = self._influxdb_client.org - response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False, + response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False, _preload_content=False) return csv.reader(codecs.iterdecode(response, 'utf-8')) - def query_raw(self, query: str, org=None, dialect=default_dialect): + def query_raw(self, query: str, org=None, dialect=default_dialect, params: dict = None): """ Execute synchronous Flux query and return result as raw unprocessed result as a str. :param query: a Flux query :param org: organization name (optional if already specified in InfluxDBClient) :param dialect: csv dialect format + :param params: bind parameters :return: str """ if org is None: org = self._influxdb_client.org - result = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False, + result = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False, _preload_content=False) return result - def query(self, query: str, org=None) -> List['FluxTable']: + def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']: """ Execute synchronous Flux query and return result as a List['FluxTable']. :param query: the Flux query :param org: organization name (optional if already specified in InfluxDBClient) + :param params: bind parameters :return: """ if org is None: org = self._influxdb_client.org - response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), + response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params), async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables) @@ -82,12 +88,14 @@ def query(self, query: str, org=None) -> List['FluxTable']: return _parser.tables - def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, None]: + def query_stream(self, query: str, org=None, params: dict = None) -> Generator['FluxRecord', Any, None]: """ Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord']. :param query: the Flux query + :param params: the Flux query parameters :param org: organization name (optional if already specified in InfluxDBClient) + :param params: bind parameters :return: """ if org is None: @@ -100,7 +108,7 @@ def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, Non return _parser.generator() - def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None): + def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None): """ Execute synchronous Flux query and return Pandas DataFrame. @@ -109,11 +117,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N :param query: the Flux query :param org: organization name (optional if already specified in InfluxDBClient) :param data_frame_index: the list of columns that are used as DataFrame index + :param params: bind parameters :return: """ from ..extras import pd - _generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index) + _generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params) _dataFrames = list(_generator) if len(_dataFrames) == 0: @@ -123,7 +132,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N else: return _dataFrames - def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None): + def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None): """ Execute synchronous Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame']. @@ -132,12 +141,13 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s :param query: the Flux query :param org: organization name (optional if already specified in InfluxDBClient) :param data_frame_index: the list of columns that are used as DataFrame index + :param params: bind parameters :return: """ if org is None: org = self._influxdb_client.org - response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), + response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params), async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame, @@ -146,10 +156,53 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s # private helper for c @staticmethod - def _create_query(query, dialect=default_dialect): - created = Query(query=query, dialect=dialect) + def _create_query(query, dialect=default_dialect, params: dict = None): + created = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params)) return created + @staticmethod + def _params_to_extern_ast(params: dict) -> List['OptionStatement']: + + statements = [] + for key, value in params.items(): + + if isinstance(value, bool): + literal = BooleanLiteral("BooleanLiteral", value) + elif isinstance(value, int): + literal = IntegerLiteral("IntegerLiteral", str(value)) + elif isinstance(value, float): + literal = FloatLiteral("FloatLiteral", value) + elif isinstance(value, datetime): + if not value.tzinfo: + value = UTC.localize(value) + else: + value = value.astimezone(UTC) + literal = DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) + elif isinstance(value, timedelta): + # convert to microsecodns + _micro_delta = int(value / timedelta(microseconds=1)) + if _micro_delta < 0: + literal = UnaryExpression("UnaryExpression", argument=DurationLiteral("DurationLiteral", [ + Duration(magnitude=-_micro_delta, unit="us")]), operator="-") + else: + literal = DurationLiteral("DurationLiteral", [Duration(magnitude=_micro_delta, unit="us")]) + elif isinstance(value, str): + literal = StringLiteral("StringLiteral", str(value)) + else: + literal = value + + statements.append(OptionStatement("OptionStatement", + VariableAssignment("VariableAssignment", Identifier("Identifier", key), + literal))) + return statements + + @staticmethod + def _build_flux_ast(params: dict = None): + if params is None: + return None + + return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params)) + def __del__(self): """Close QueryAPI.""" pass diff --git a/tests/test_QueryApi.py b/tests/test_QueryApi.py index 706b9cb1..f592cee9 100644 --- a/tests/test_QueryApi.py +++ b/tests/test_QueryApi.py @@ -1,5 +1,9 @@ +import datetime +import json import unittest +from influxdb_client import QueryApi, DurationLiteral, Duration, CallExpression, Expression, UnaryExpression, Identifier +from influxdb_client.client.util.date_utils import get_date_helper from tests.base_test import BaseTest @@ -47,6 +51,195 @@ def test_query_flux_csv(self): print("Values count: ", val_count) + def test_query_ast(self): + q = ''' + from(bucket:stringParam) + |> range(start: startDuration, stop: callParam) + |> filter(fn: (r) => r["_measurement"] == "my_measurement") + |> filter(fn: (r) => r["_value"] > intParam) + |> filter(fn: (r) => r["_value"] > floatParam) + |> aggregateWindow(every: durationParam, fn: mean, createEmpty: true) + |> sort(columns: ["_time"], desc: booleanParam) + ''' + + p = { + "stringParam": "my-bucket", + "stopParam": get_date_helper().parse_date("2021-03-20T15:59:10.607352Z"), + "intParam": 2, + "durationParam": DurationLiteral("DurationLiteral", [Duration(magnitude=1, unit="d")]), + "startDuration": UnaryExpression(type="UnaryExpression", + argument=DurationLiteral("DurationLiteral", + [Duration(magnitude=30, unit="d")]), + operator="-"), + "callParam": CallExpression(type="CallExpression", callee=Identifier(type="Identifier", name="now")), + "timedelta": datetime.timedelta(minutes=10), + "floatParam": 14.01, + "booleanParam": True, + } + + csv_result = self.client.query_api().query_csv(query=q, params=p) + + self.assertIsNotNone(csv_result) + + val_count = 0 + for row in csv_result: + for cell in row: + val_count += 1 + + print("Values count: ", val_count) + + def test_parameter_ast(self): + test_data = [["stringParam", "my-bucket", { + "imports": [], + "body": [ + { + "type": "OptionStatement", + "assignment": { + "type": "VariableAssignment", + "id": { + "type": "Identifier", + "name": "stringParam" + }, + "init": { + "type": "StringLiteral", + "value": "my-bucket" + } + } + } + ] + }], ["datetimeParam", get_date_helper().parse_date("2021-03-20T15:59:10.607352Z"), { + "body": [ + { + "assignment": { + "id": { + "name": "datetimeParam", + "type": "Identifier" + }, + "init": { + "type": "DateTimeLiteral", + "value": "2021-03-20T15:59:10.607352Z" + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] + }], ["timeDeltaParam", datetime.timedelta(hours=1), { + "body": [ + { + "assignment": { + "id": { + "name": "timeDeltaParam", + "type": "Identifier" + }, + "init": { + "type": "DurationLiteral", + "values": [ + { + "magnitude": 3600000000, + "unit": "us" + } + ] + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] + }], ["timeDeltaNegativeParam", datetime.timedelta(minutes=-5), { + "body": [ + { + "assignment": { + "id": { + "name": "timeDeltaNegativeParam", + "type": "Identifier" + }, + "init": { + "argument": { + "type": "DurationLiteral", + "values": [ + { + "magnitude": 300000000, + "unit": "us" + } + ] + }, + "operator": "-", + "type": "UnaryExpression" + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] + }], ["booleanParam", True, { + "body": [ + { + "assignment": { + "id": { + "name": "booleanParam", + "type": "Identifier" + }, + "init": { + "type": "BooleanLiteral", + "value": True + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] + }], ["intParam", int(10), { + "body": [ + { + "assignment": { + "id": { + "name": "intParam", + "type": "Identifier" + }, + "init": { + "type": "IntegerLiteral", + "value": "10" + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] + }], ["floatParam", 10.333, + { + "body": [ + { + "assignment": { + "id": { + "name": "floatParam", + "type": "Identifier" + }, + "init": { + "type": "FloatLiteral", + "value": 10.333 + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] + }]] + + for data in test_data: + param = {data[0]: data[1]} + print("testing: ", param) + ast = QueryApi._build_flux_ast(param) + got_sanitized = self.client.api_client.sanitize_for_serialization(ast) + self.assertEqual(json.dumps(got_sanitized, sort_keys=True, indent=2), + json.dumps(data[2], sort_keys=True, indent=2)) + if __name__ == '__main__': unittest.main() From f3a97704dfa7d3dbf6cd23f349f64aaa6c8782c1 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 1 Apr 2021 17:41:59 +0200 Subject: [PATCH 2/5] feat: bind query parameters (#219) --- influxdb_client/client/query_api.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index bb409166..0e83c69b 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -45,8 +45,8 @@ def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, pa """ if org is None: org = self._influxdb_client.org - response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False, - _preload_content=False) + response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), + async_req=False, _preload_content=False) return csv.reader(codecs.iterdecode(response, 'utf-8')) @@ -67,7 +67,7 @@ def query_raw(self, query: str, org=None, dialect=default_dialect, params: dict return result - def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']: + def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']: """ Execute synchronous Flux query and return result as a List['FluxTable']. @@ -201,7 +201,7 @@ def _build_flux_ast(params: dict = None): if params is None: return None - return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params)) + return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params)) def __del__(self): """Close QueryAPI.""" From 492073942047df67cff5d268f28e135fdc03ebd6 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 1 Apr 2021 18:03:10 +0200 Subject: [PATCH 3/5] feat: bind query parameters (#219) --- tests/test_QueryApi.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_QueryApi.py b/tests/test_QueryApi.py index f592cee9..be2dd8a1 100644 --- a/tests/test_QueryApi.py +++ b/tests/test_QueryApi.py @@ -125,6 +125,24 @@ def test_parameter_ast(self): } ], "imports": [] + }], ["datetimeNoTZParam", datetime.datetime(2021, 3, 20, 15, 59, 10, 607352), { + "body": [ + { + "assignment": { + "id": { + "name": "datetimeNoTZParam", + "type": "Identifier" + }, + "init": { + "type": "DateTimeLiteral", + "value": "2021-03-20T15:59:10.607352Z" + }, + "type": "VariableAssignment" + }, + "type": "OptionStatement" + } + ], + "imports": [] }], ["timeDeltaParam", datetime.timedelta(hours=1), { "body": [ { From b179e4bc99fbfa01ff1154599c7cb33c1d9ed279 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 6 Apr 2021 15:54:18 +0200 Subject: [PATCH 4/5] feat: bind query parameters (#219) --- influxdb_client/client/query_api.py | 11 +++++------ influxdb_client/client/util/date_utils.py | 14 ++++++++++++++ influxdb_client/client/write/point.py | 6 +----- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index 0e83c69b..e46da1d6 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -8,13 +8,13 @@ import csv from datetime import datetime, timedelta from typing import List, Generator, Any -from pytz import UTC from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \ VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression from influxdb_client import Query, QueryService from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode from influxdb_client.client.flux_table import FluxTable, FluxRecord +from influxdb_client.client.util.date_utils import get_date_helper class QueryApi(object): @@ -101,7 +101,7 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator[' if org is None: org = self._influxdb_client.org - response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), + response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params), async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream) @@ -165,6 +165,8 @@ def _params_to_extern_ast(params: dict) -> List['OptionStatement']: statements = [] for key, value in params.items(): + if value is None: + continue if isinstance(value, bool): literal = BooleanLiteral("BooleanLiteral", value) @@ -173,10 +175,7 @@ def _params_to_extern_ast(params: dict) -> List['OptionStatement']: elif isinstance(value, float): literal = FloatLiteral("FloatLiteral", value) elif isinstance(value, datetime): - if not value.tzinfo: - value = UTC.localize(value) - else: - value = value.astimezone(UTC) + value = get_date_helper().to_utc(value) literal = DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ')) elif isinstance(value, timedelta): # convert to microsecodns diff --git a/influxdb_client/client/util/date_utils.py b/influxdb_client/client/util/date_utils.py index 1bea4b9f..f7557b19 100644 --- a/influxdb_client/client/util/date_utils.py +++ b/influxdb_client/client/util/date_utils.py @@ -1,6 +1,8 @@ """Utils to get right Date parsing function.""" +import datetime from dateutil import parser +from pytz import UTC date_helper = None @@ -30,6 +32,18 @@ def to_nanoseconds(self, delta): return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros + def to_utc(self, value: datetime): + """ + Convert datetime to UTC timezone. + + :param value: datetime + :return: datetime in UTC + """ + if not value.tzinfo: + return UTC.localize(value) + else: + return value.astimezone(UTC) + def get_date_helper() -> DateHelper: """ diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index d9ca5adb..8e5884ed 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -204,11 +204,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime): if isinstance(timestamp, datetime): - if not timestamp.tzinfo: - timestamp = UTC.localize(timestamp) - else: - timestamp = timestamp.astimezone(UTC) - timestamp = timestamp - EPOCH + timestamp = date_helper.to_utc(timestamp) - EPOCH ns = date_helper.to_nanoseconds(timestamp) From a9d5adfcca8bfa1dd378963992d8ef89d298a321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Wed, 7 Apr 2021 07:18:24 +0200 Subject: [PATCH 5/5] fix: duplicated docs --- influxdb_client/client/query_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index e46da1d6..56831721 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -93,7 +93,6 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator[' Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord']. :param query: the Flux query - :param params: the Flux query parameters :param org: organization name (optional if already specified in InfluxDBClient) :param params: bind parameters :return: