diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a0bb39..a571e52e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features 1. [#112](https://github.com/influxdata/influxdb-client-python/pull/113): Support timestamp with different timezone in _convert_timestamp 1. [#120](https://github.com/influxdata/influxdb-client-python/pull/120): ciso8601 is an optional dependency and has to be installed separably +1. [#121](https://github.com/influxdata/influxdb-client-python/pull/121): Added query_data_frame_stream method ### Bug Fixes 1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py index 304a3398..1a8b7da1 100644 --- a/influxdb_client/client/query_api.py +++ b/influxdb_client/client/query_api.py @@ -105,6 +105,27 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N from ..extras import pd + _generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index) + _dataFrames = list(_generator) + + if len(_dataFrames) == 0: + return pd.DataFrame(columns=[], index=None) + elif len(_dataFrames) == 1: + return _dataFrames[0] + else: + return _dataFrames + + def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None): + """ + Synchronously executes the Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame']. + Note that if a query returns more then one table than the client generates a DataFrame for each of them. + + :param query: the Flux query + :param org: organization name (optional if already specified in InfluxDBClient) + :param data_frame_index: the list of columns that are used as DataFrame index + :return: + """ + if org is None: org = self._influxdb_client.org @@ -113,14 +134,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame, data_frame_index=data_frame_index) - _dataFrames = list(_parser.generator()) - - if len(_dataFrames) == 0: - return pd.DataFrame(columns=[], index=None) - elif len(_dataFrames) == 1: - return _dataFrames[0] - else: - return _dataFrames + return _parser.generator() # private helper for c @staticmethod