Skip to content

Commit 940d5d3

Browse files
authored
feat(query): improve Query UX (influxdata#450)
1 parent f02716a commit 940d5d3

14 files changed

+764
-78
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
### Features
1212
1. [#440](https://github.com/influxdata/influxdb-client-python/pull/440): Add possibility to specify timestamp column and its timezone [DataFrame]
13+
1. [#450](https://github.com/influxdata/influxdb-client-python/pull/450): Improve Query UX - simplify serialization to JSON and add possibility to serialize query results as a flattened list of values
1314

1415
### Bug Fixes
1516
1. [#457](https://github.com/influxdata/influxdb-client-python/pull/457): Formatting nanoseconds to Flux AST

README.rst

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -615,10 +615,9 @@ Queries
615615
The result retrieved by `QueryApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py>`_ could be formatted as a:
616616

617617
1. Flux data structure: `FluxTable <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_, `FluxColumn <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22>`_ and `FluxRecord <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31>`_
618-
2. Query bind parameters
619-
3. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
620-
4. Raw unprocessed results as a ``str`` iterator
621-
5. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
618+
2. :class:`~influxdb_client.client.flux_table.CSVIterator` which will iterate over CSV lines
619+
3. Raw unprocessed results as a ``str`` iterator
620+
4. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
622621

623622
The API also support streaming ``FluxRecord`` via `query_stream <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77>`_, see example below:
624623

docs/api.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ QueryApi
2020
.. autoclass:: influxdb_client.client.flux_table.FluxRecord
2121
:members:
2222

23+
.. autoclass:: influxdb_client.client.flux_table.TableList
24+
:members:
25+
26+
.. autoclass:: influxdb_client.client.flux_table.CSVIterator
27+
:members:
28+
2329
WriteApi
2430
""""""""
2531
.. autoclass:: influxdb_client.WriteApi

examples/query_response_to_json.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,5 @@
1919
"""
2020
Serialize to JSON
2121
"""
22-
import json
23-
from influxdb_client.client.flux_table import FluxStructureEncoder
24-
25-
output = json.dumps(tables, cls=FluxStructureEncoder, indent=2)
22+
output = tables.to_json(indent=5)
2623
print(output)

influxdb_client/client/_base.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
from __future__ import absolute_import
33

44
import base64
5-
import codecs
65
import configparser
7-
import csv
86
import logging
97
import os
108
from datetime import datetime, timedelta
11-
from typing import Iterator, List, Generator, Any, Union, Iterable, AsyncGenerator
9+
from typing import List, Generator, Any, Union, Iterable, AsyncGenerator
1210

1311
from urllib3 import HTTPResponse
1412

@@ -17,9 +15,10 @@
1715
Duration, StringLiteral, ArrayExpression, ImportDeclaration, MemberExpression, MemberAssignment, File, \
1816
WriteService, QueryService, DeleteService, DeletePredicateRequest
1917
from influxdb_client.client.flux_csv_parser import FluxResponseMetadataMode, FluxCsvParser, FluxSerializationMode
20-
from influxdb_client.client.flux_table import FluxTable, FluxRecord
18+
from influxdb_client.client.flux_table import FluxRecord, TableList, CSVIterator
2119
from influxdb_client.client.util.date_utils import get_date_helper
2220
from influxdb_client.client.util.helpers import get_org_query_param
21+
from influxdb_client.client.warnings import MissingPivotFunction
2322
from influxdb_client.client.write.dataframe_serializer import DataframeSerializer
2423
from influxdb_client.rest import _UTF_8_encoding
2524

@@ -214,9 +213,9 @@ def __init__(self, influxdb_client, query_options=None):
214213
"""Base implementation for Queryable API."""
215214

216215
def _to_tables(self, response, query_options=None, response_metadata_mode:
217-
FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> List[FluxTable]:
216+
FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> TableList:
218217
"""
219-
Parse HTTP response to FluxTables.
218+
Parse HTTP response to TableList.
220219
221220
:param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.
222221
"""
@@ -225,9 +224,9 @@ def _to_tables(self, response, query_options=None, response_metadata_mode:
225224
return _parser.table_list()
226225

227226
async def _to_tables_async(self, response, query_options=None, response_metadata_mode:
228-
FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> List[FluxTable]:
227+
FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> TableList:
229228
"""
230-
Parse HTTP response to FluxTables.
229+
Parse HTTP response to TableList.
231230
232231
:param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.
233232
"""
@@ -236,9 +235,9 @@ async def _to_tables_async(self, response, query_options=None, response_metadata
236235
pass
237236
return parser.table_list()
238237

239-
def _to_csv(self, response: HTTPResponse) -> Iterator[List[str]]:
238+
def _to_csv(self, response: HTTPResponse) -> CSVIterator:
240239
"""Parse HTTP response to CSV."""
241-
return csv.reader(codecs.iterdecode(response, _UTF_8_encoding))
240+
return CSVIterator(response)
242241

243242
def _to_flux_record_stream(self, response, query_options=None,
244243
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> \
@@ -320,7 +319,7 @@ def _get_query_options(self):
320319
from influxdb_client.client.query_api import QueryOptions
321320
return QueryOptions(profilers=self._influxdb_client.profilers)
322321

323-
def _create_query(self, query, dialect=default_dialect, params: dict = None):
322+
def _create_query(self, query, dialect=default_dialect, params: dict = None, **kwargs):
324323
query_options = self._get_query_options()
325324
profilers = query_options.profilers if query_options is not None else None
326325
q = Query(query=query, dialect=dialect, extern=_BaseQueryApi._build_flux_ast(params, profilers))
@@ -331,6 +330,9 @@ def _create_query(self, query, dialect=default_dialect, params: dict = None):
331330
print("===============")
332331
print(query)
333332

333+
if kwargs.get('dataframe_query', False):
334+
MissingPivotFunction.print_warning(query)
335+
334336
return q
335337

336338
@staticmethod

influxdb_client/client/flux_csv_parser.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from enum import Enum
88
from typing import List
99

10-
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord
10+
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord, TableList
1111
from influxdb_client.client.util.date_utils import get_date_helper
1212
from influxdb_client.rest import _UTF_8_encoding
1313

@@ -71,7 +71,7 @@ def __init__(self, response, serialization_mode: FluxSerializationMode,
7171
Acceptable types: `urllib3.response.HTTPResponse`, `aiohttp.client_reqrep.ClientResponse`.
7272
"""
7373
self._response = response
74-
self.tables = []
74+
self.tables = TableList()
7575
self._serialization_mode = serialization_mode
7676
self._response_metadata_mode = response_metadata_mode
7777
self._data_frame_index = data_frame_index
@@ -344,12 +344,12 @@ def _is_profiler_table(self, table: FluxTable) -> bool:
344344
return any(filter(lambda column: (column.default_value == "_profiler" and column.label == "result"),
345345
table.columns))
346346

347-
def table_list(self) -> List[FluxTable]:
347+
def table_list(self) -> TableList:
348348
"""Get the list of flux tables."""
349349
if not self._profilers:
350350
return self.tables
351351
else:
352-
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))
352+
return TableList(filter(lambda table: not self._is_profiler_table(table), self.tables))
353353

354354
def _print_profiler_info(self, flux_record: FluxRecord):
355355
if flux_record.get_measurement().startswith("profiler/"):

influxdb_client/client/flux_table.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
44
The data model consists of tables, records, columns.
55
"""
6+
import codecs
7+
import csv
8+
from http.client import HTTPResponse
69
from json import JSONEncoder
10+
from typing import List, Iterator
11+
from influxdb_client.rest import _UTF_8_encoding
712

813

914
class FluxStructure:
@@ -137,3 +142,145 @@ def __str__(self):
137142
def __repr__(self):
138143
"""Format for inspection."""
139144
return f"<{type(self).__name__}: field={self.values.get('_field')}, value={self.values.get('_value')}>"
145+
146+
147+
class TableList(List[FluxTable]):
148+
""":class:`~influxdb_client.client.flux_table.FluxTable` list with additionally functional to better handle of query result.""" # noqa: E501
149+
150+
def to_values(self, columns: List['str'] = None) -> List[List[object]]:
151+
"""
152+
Serialize query results to a flattened list of values.
153+
154+
:param columns: if not ``None`` then only specified columns are presented in results
155+
:return: :class:`~list` of values
156+
157+
Output example:
158+
159+
.. code-block:: python
160+
161+
[
162+
['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3],
163+
['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3],
164+
...
165+
]
166+
167+
Configure required columns:
168+
169+
.. code-block:: python
170+
171+
from influxdb_client import InfluxDBClient
172+
173+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
174+
175+
# Query: using Table structure
176+
tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)')
177+
178+
# Serialize to values
179+
output = tables.to_values(columns=['location', '_time', '_value'])
180+
print(output)
181+
"""
182+
183+
def filter_values(record):
184+
if columns is not None:
185+
return [record.values.get(k) for k in columns]
186+
return record.values.values()
187+
188+
return self._to_values(filter_values)
189+
190+
def to_json(self, columns: List['str'] = None, **kwargs) -> str:
191+
"""
192+
Serialize query results to a JSON formatted :class:`~str`.
193+
194+
:param columns: if not ``None`` then only specified columns are presented in results
195+
:return: :class:`~str`
196+
197+
The query results is flattened to array:
198+
199+
.. code-block:: javascript
200+
201+
[
202+
{
203+
"_measurement": "mem",
204+
"_start": "2021-06-23T06:50:11.897825+00:00",
205+
"_stop": "2021-06-25T06:50:11.897825+00:00",
206+
"_time": "2020-02-27T16:20:00.897825+00:00",
207+
"region": "north",
208+
"_field": "usage",
209+
"_value": 15
210+
},
211+
{
212+
"_measurement": "mem",
213+
"_start": "2021-06-23T06:50:11.897825+00:00",
214+
"_stop": "2021-06-25T06:50:11.897825+00:00",
215+
"_time": "2020-02-27T16:20:01.897825+00:00",
216+
"region": "west",
217+
"_field": "usage",
218+
"_value": 10
219+
},
220+
...
221+
]
222+
223+
The JSON format could be configured via ``**kwargs`` arguments:
224+
225+
.. code-block:: python
226+
227+
from influxdb_client import InfluxDBClient
228+
229+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
230+
231+
# Query: using Table structure
232+
tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)')
233+
234+
# Serialize to JSON
235+
output = tables.to_json(indent=5)
236+
print(output)
237+
238+
For all available options see - `json.dump <https://docs.python.org/3/library/json.html#json.dump>`_.
239+
"""
240+
if 'indent' not in kwargs:
241+
kwargs['indent'] = 2
242+
243+
def filter_values(record):
244+
if columns is not None:
245+
return {k: v for (k, v) in record.values.items() if k in columns}
246+
return record.values
247+
248+
import json
249+
return json.dumps(self._to_values(filter_values), cls=FluxStructureEncoder, **kwargs)
250+
251+
def _to_values(self, mapping):
252+
return [mapping(record) for table in self for record in table.records]
253+
254+
255+
class CSVIterator(Iterator[List[str]]):
256+
""":class:`Iterator[List[str]]` with additionally functional to better handle of query result."""
257+
258+
def __init__(self, response: HTTPResponse) -> None:
259+
"""Initialize ``csv.reader``."""
260+
self.delegate = csv.reader(codecs.iterdecode(response, _UTF_8_encoding))
261+
262+
def __iter__(self):
263+
"""Return an iterator object."""
264+
return self.delegate.__iter__()
265+
266+
def __next__(self):
267+
"""Retrieve the next item from the iterator."""
268+
return self.delegate.__next__()
269+
270+
def to_values(self) -> List[List[str]]:
271+
"""
272+
Serialize query results to a flattened list of values.
273+
274+
:return: :class:`~list` of values
275+
276+
Output example:
277+
278+
.. code-block:: python
279+
280+
[
281+
['New York', '2022-06-14T08:00:51.749072045Z', '24.3'],
282+
['Prague', '2022-06-14T08:00:51.749072045Z', '25.3'],
283+
...
284+
]
285+
"""
286+
return list(self.delegate)

0 commit comments

Comments
 (0)