Skip to content

Commit 0c99bfb

Browse files
committed
fix: handle null values in Flux data
1 parent 27777d1 commit 0c99bfb

File tree

3 files changed

+44
-14
lines changed

3 files changed

+44
-14
lines changed

influxdb_client/client/_base.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,23 +277,27 @@ async def _to_flux_record_stream_async(self, response, query_options=None, respo
277277
return (await _parser.__aenter__()).generator_async()
278278

279279
def _to_data_frame_stream(self, data_frame_index, response, query_options=None,
280-
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full):
280+
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full,
281+
use_extension_dtypes=False):
281282
"""
282283
Parse HTTP response to DataFrame stream.
283284
284285
:param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.
285286
"""
286-
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)
287+
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode,
288+
use_extension_dtypes)
287289
return _parser.generator()
288290

289291
async def _to_data_frame_stream_async(self, data_frame_index, response, query_options=None, response_metadata_mode:
290-
FluxResponseMetadataMode = FluxResponseMetadataMode.full):
292+
FluxResponseMetadataMode = FluxResponseMetadataMode.full,
293+
use_extension_dtypes=False):
291294
"""
292295
Parse HTTP response to DataFrame stream.
293296
294297
:param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.
295298
"""
296-
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)
299+
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode,
300+
use_extension_dtypes)
297301
return (await _parser.__aenter__()).generator_async()
298302

299303
def _to_tables_parser(self, response, query_options, response_metadata_mode):
@@ -304,10 +308,12 @@ def _to_flux_record_stream_parser(self, query_options, response, response_metada
304308
return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
305309
query_options=query_options, response_metadata_mode=response_metadata_mode)
306310

307-
def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode):
311+
def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode,
312+
use_extension_dtypes):
308313
return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
309314
data_frame_index=data_frame_index, query_options=query_options,
310-
response_metadata_mode=response_metadata_mode)
315+
response_metadata_mode=response_metadata_mode,
316+
use_extension_dtypes=use_extension_dtypes)
311317

312318
def _to_data_frames(self, _generator):
313319
"""Parse stream of DataFrames into expected type."""

influxdb_client/client/flux_csv_parser.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ class FluxCsvParser(object):
6464

6565
def __init__(self, response, serialization_mode: FluxSerializationMode,
6666
data_frame_index: List[str] = None, query_options=None,
67-
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> None:
67+
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full,
68+
use_extension_dtypes=False) -> None:
6869
"""
6970
Initialize defaults.
7071
@@ -75,6 +76,7 @@ def __init__(self, response, serialization_mode: FluxSerializationMode,
7576
self.tables = TableList()
7677
self._serialization_mode = serialization_mode
7778
self._response_metadata_mode = response_metadata_mode
79+
self._use_extension_dtypes = use_extension_dtypes
7880
self._data_frame_index = data_frame_index
7981
self._data_frame_values = []
8082
self._profilers = query_options.profilers if query_options is not None else None
@@ -129,6 +131,8 @@ def _parse_flux_response(self):
129131
# Return latest DataFrame
130132
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
131133
df = self._prepare_data_frame()
134+
if self._use_extension_dtypes:
135+
df = df.convert_dtypes()
132136
if not self._is_profiler_table(metadata.table):
133137
yield df
134138

@@ -143,6 +147,8 @@ async def _parse_flux_response_async(self):
143147
# Return latest DataFrame
144148
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
145149
df = self._prepare_data_frame()
150+
if self._use_extension_dtypes:
151+
df = df.convert_dtypes()
146152
if not self._is_profiler_table(metadata.table):
147153
yield df
148154
finally:
@@ -171,6 +177,8 @@ def _parse_flux_response_row(self, metadata, csv):
171177
# Return already parsed DataFrame
172178
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
173179
df = self._prepare_data_frame()
180+
if self._use_extension_dtypes:
181+
df = df.convert_dtypes()
174182
if not self._is_profiler_table(metadata.table):
175183
yield df
176184

@@ -211,7 +219,7 @@ def _parse_flux_response_row(self, metadata, csv):
211219
pass
212220
else:
213221

214-
# to int converions todo
222+
# to int conversions todo
215223
current_id = int(csv[2])
216224
if metadata.table_id == -1:
217225
metadata.table_id = current_id
@@ -273,8 +281,10 @@ def _to_value(self, str_val, column):
273281
default_value = column.default_value
274282
if default_value == '' or default_value is None:
275283
if self._serialization_mode is FluxSerializationMode.dataFrame:
276-
from ..extras import np
277-
return self._to_value(np.nan, column)
284+
if self._use_extension_dtypes:
285+
from ..extras import pd
286+
return pd.NA
287+
return None
278288
return None
279289
return self._to_value(default_value, column)
280290

influxdb_client/client/query_api.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
222222
async_req=False, _preload_content=False, _return_http_data_only=False)
223223
return self._to_flux_record_stream(response, query_options=self._get_query_options())
224224

225-
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
225+
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
226+
use_extension_dtypes: bool = False):
226227
"""
227228
Execute synchronous Flux query and return Pandas DataFrame.
228229
@@ -234,6 +235,11 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
234235
If not specified the default value from ``InfluxDBClient.org`` is used.
235236
:param data_frame_index: the list of columns that are used as DataFrame index
236237
:param params: bind parameters
238+
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
239+
Useful for queries with ``pivot`` function.
240+
When data has missing values, column data type may change (to ``object`` or ``float64``).
241+
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
242+
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
237243
:return: :class:`~DataFrame` or :class:`~List[DataFrame]`
238244
239245
.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
@@ -250,10 +256,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
250256
- https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/
251257
- https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/
252258
""" # noqa: E501
253-
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params)
259+
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params,
260+
use_extension_dtypes=use_extension_dtypes)
254261
return self._to_data_frames(_generator)
255262

256-
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
263+
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
264+
use_extension_dtypes: bool = False):
257265
"""
258266
Execute synchronous Flux query and return stream of Pandas DataFrame as a :class:`~Generator[DataFrame]`.
259267
@@ -265,6 +273,11 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
265273
If not specified the default value from ``InfluxDBClient.org`` is used.
266274
:param data_frame_index: the list of columns that are used as DataFrame index
267275
:param params: bind parameters
276+
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
277+
Useful for queries with ``pivot`` function.
278+
When data has missing values, column data type may change (to ``object`` or ``float64``).
279+
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
280+
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
268281
:return: :class:`~Generator[DataFrame]`
269282
270283
.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
@@ -289,7 +302,8 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
289302

290303
return self._to_data_frame_stream(data_frame_index=data_frame_index,
291304
response=response,
292-
query_options=self._get_query_options())
305+
query_options=self._get_query_options(),
306+
use_extension_dtypes=use_extension_dtypes)
293307

294308
def __del__(self):
295309
"""Close QueryAPI."""

0 commit comments

Comments
 (0)