Skip to content

feat: query bind parameters #220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.17.0 [unreleased]

### Features
1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters

## 1.16.0 [2021-04-01]

### Features
Expand Down
35 changes: 32 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,10 @@ Queries
The result retrieved by `QueryApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py>`_ could be formatted as a:

1. Flux data structure: `FluxTable <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_, `FluxColumn <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22>`_ and `FluxRecord <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31>`_
2. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
3. Raw unprocessed results as a ``str`` iterator
4. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
2. Query bind parameters
3. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
4. Raw unprocessed results as a ``str`` iterator
5. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_

The API also support streaming ``FluxRecord`` via `query_stream <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77>`_, see example below:

Expand Down Expand Up @@ -502,6 +503,34 @@ The API also support streaming ``FluxRecord`` via `query_stream <https://github.
print()
print()

"""
Query: using Bind parameters
"""

p = {"_start": datetime.timedelta(hours=-1),
"_location": "Prague",
"_desc": True,
"_floatParam": 25.1,
"_every": datetime.timedelta(minutes=5)
}

tables = query_api.query('''
from(bucket:"my-bucket") |> range(start: _start)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> filter(fn: (r) => r["_field"] == "temperature")
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
|> sort(columns: ["_time"], desc: _desc)
''', params=p)

for table in tables:
print(table)
for record in table.records:
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))

print()
print()

"""
Query: using Stream
"""
Expand Down
43 changes: 38 additions & 5 deletions examples/query.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime as datetime

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True)

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
Expand All @@ -28,6 +30,34 @@
print()
print()

"""
Query: using Bind parameters
"""

p = {"_start": datetime.timedelta(hours=-1),
"_location": "Prague",
"_desc": True,
"_floatParam": 25.1,
"_every": datetime.timedelta(minutes=5)
}

tables = query_api.query('''
from(bucket:"my-bucket") |> range(start: _start)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> filter(fn: (r) => r["_field"] == "temperature")
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
|> sort(columns: ["_time"], desc: _desc)
''', params=p)

for table in tables:
print(table)
for record in table.records:
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))

print()
print()

"""
Query: using Stream
"""
Expand Down Expand Up @@ -66,10 +96,13 @@
"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
data_frame = query_api.query_data_frame('''
from(bucket:"my-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time","location", "temperature"])
''')
print(data_frame.to_string())

"""
Expand Down
83 changes: 67 additions & 16 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

import codecs
import csv
from datetime import datetime, timedelta
from typing import List, Generator, Any

from influxdb_client import Dialect
from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \
VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord
from influxdb_client.client.util.date_utils import get_date_helper


class QueryApi(object):
Expand All @@ -29,51 +32,54 @@ def __init__(self, influxdb_client):
self._influxdb_client = influxdb_client
self._query_api = QueryService(influxdb_client.api_client)

def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect):
def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, params: dict = None):
"""
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.

:param query: a Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param dialect: csv dialect format
:param params: bind parameters
:return: The returned object is an iterator. Each iteration returns a row of the CSV file
(which can span multiple input lines).
"""
if org is None:
org = self._influxdb_client.org
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
_preload_content=False)
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params),
async_req=False, _preload_content=False)

return csv.reader(codecs.iterdecode(response, 'utf-8'))

def query_raw(self, query: str, org=None, dialect=default_dialect):
def query_raw(self, query: str, org=None, dialect=default_dialect, params: dict = None):
"""
Execute synchronous Flux query and return result as raw unprocessed result as a str.

:param query: a Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param dialect: csv dialect format
:param params: bind parameters
:return: str
"""
if org is None:
org = self._influxdb_client.org
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False,
_preload_content=False)

return result

def query(self, query: str, org=None) -> List['FluxTable']:
def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
"""
Execute synchronous Flux query and return result as a List['FluxTable'].

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)
Expand All @@ -82,25 +88,26 @@ def query(self, query: str, org=None) -> List['FluxTable']:

return _parser.tables

def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, None]:
def query_stream(self, query: str, org=None, params: dict = None) -> Generator['FluxRecord', Any, None]:
"""
Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord'].

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)

return _parser.generator()

def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None):
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
"""
Execute synchronous Flux query and return Pandas DataFrame.

Expand All @@ -109,11 +116,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:return:
"""
from ..extras import pd

_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index)
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params)
_dataFrames = list(_generator)

if len(_dataFrames) == 0:
Expand All @@ -123,7 +131,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
else:
return _dataFrames

def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None):
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
"""
Execute synchronous Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame'].

Expand All @@ -132,12 +140,13 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
Expand All @@ -146,10 +155,52 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

# private helper for c
@staticmethod
def _create_query(query, dialect=default_dialect):
created = Query(query=query, dialect=dialect)
def _create_query(query, dialect=default_dialect, params: dict = None):
created = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params))
return created

@staticmethod
def _params_to_extern_ast(params: dict) -> List['OptionStatement']:

statements = []
for key, value in params.items():
if value is None:
continue

if isinstance(value, bool):
literal = BooleanLiteral("BooleanLiteral", value)
elif isinstance(value, int):
literal = IntegerLiteral("IntegerLiteral", str(value))
elif isinstance(value, float):
literal = FloatLiteral("FloatLiteral", value)
elif isinstance(value, datetime):
value = get_date_helper().to_utc(value)
literal = DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
elif isinstance(value, timedelta):
# convert to microsecodns
_micro_delta = int(value / timedelta(microseconds=1))
if _micro_delta < 0:
literal = UnaryExpression("UnaryExpression", argument=DurationLiteral("DurationLiteral", [
Duration(magnitude=-_micro_delta, unit="us")]), operator="-")
else:
literal = DurationLiteral("DurationLiteral", [Duration(magnitude=_micro_delta, unit="us")])
elif isinstance(value, str):
literal = StringLiteral("StringLiteral", str(value))
else:
literal = value

statements.append(OptionStatement("OptionStatement",
VariableAssignment("VariableAssignment", Identifier("Identifier", key),
literal)))
return statements

@staticmethod
def _build_flux_ast(params: dict = None):
if params is None:
return None

return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params))

def __del__(self):
"""Close QueryAPI."""
pass
14 changes: 14 additions & 0 deletions influxdb_client/client/util/date_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Utils to get right Date parsing function."""
import datetime

from dateutil import parser
from pytz import UTC

date_helper = None

Expand Down Expand Up @@ -30,6 +32,18 @@ def to_nanoseconds(self, delta):

return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros

def to_utc(self, value: datetime):
"""
Convert datetime to UTC timezone.

:param value: datetime
:return: datetime in UTC
"""
if not value.tzinfo:
return UTC.localize(value)
else:
return value.astimezone(UTC)


def get_date_helper() -> DateHelper:
"""
Expand Down
6 changes: 1 addition & 5 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime):

if isinstance(timestamp, datetime):
if not timestamp.tzinfo:
timestamp = UTC.localize(timestamp)
else:
timestamp = timestamp.astimezone(UTC)
timestamp = timestamp - EPOCH
timestamp = date_helper.to_utc(timestamp) - EPOCH

ns = date_helper.to_nanoseconds(timestamp)

Expand Down
Loading