Skip to content

chore: influxdb_client/client/write: fix data_frame_to_list_of_points #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,3 @@ sandbox
# OpenAPI-generator
/.openapi-generator*
**/writer.pickle
/tests/data_frame_file.csv
31 changes: 17 additions & 14 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
### CI
1. [#179](https://github.com/influxdata/influxdb-client-python/pull/179): Updated default docker image to v2.0.3

### Bug Fixes
1. [#183](https://github.com/influxdata/influxdb-client-python/pull/183): Fixes to DataFrame writing.

## 1.13.0 [2020-12-04]

### Features
1. [#171](https://github.com/influxdata/influxdb-client-python/pull/171): CSV parser is able to parse export from UI

### Bug Fixes
1. [#170](https://github.com/influxdata/influxdb-client-python/pull/170): Skip DataFrame rows without data - all fields are nan.
1. [#170](https://github.com/influxdata/influxdb-client-python/pull/170): Skip DataFrame rows without data - all fields are nan.

### CI
1. [#175](https://github.com/influxdata/influxdb-client-python/pull/175): Updated default docker image to v2.0.2
Expand All @@ -25,7 +28,7 @@
1. [#161](https://github.com/influxdata/influxdb-client-python/pull/161): Added logging message for retries

### Bug Fixes
1. [#164](https://github.com/influxdata/influxdb-client-python/pull/164): Excluded tests from packaging
1. [#164](https://github.com/influxdata/influxdb-client-python/pull/164): Excluded tests from packaging

## 1.11.0 [2020-10-02]

Expand All @@ -38,16 +41,16 @@
1. [#156](https://github.com/influxdata/influxdb-client-python/pull/156): Removed labels in organization API, removed Pkg* structure and package service

### Bug Fixes
1. [#154](https://github.com/influxdata/influxdb-client-python/pull/154): Fixed escaping string fields in DataFrame serialization
1. [#154](https://github.com/influxdata/influxdb-client-python/pull/154): Fixed escaping string fields in DataFrame serialization

## 1.10.0 [2020-08-14]

### Features
1. [#140](https://github.com/influxdata/influxdb-client-python/pull/140): Added exponential backoff strategy for batching writes, Allowed to configure default retry strategy. Default value for `retry_interval` is 5_000 milliseconds.
1. [#140](https://github.com/influxdata/influxdb-client-python/pull/140): Added exponential backoff strategy for batching writes, Allowed to configure default retry strategy. Default value for `retry_interval` is 5_000 milliseconds.
1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp`
1. [#145](https://github.com/influxdata/influxdb-client-python/pull/145): Api generator was moved to influxdb-clients-apigen
1. [#145](https://github.com/influxdata/influxdb-client-python/pull/145): Api generator was moved to influxdb-clients-apigen

## 1.9.0 [2020-07-17]

Expand All @@ -58,10 +61,10 @@
1. [#132](https://github.com/influxdata/influxdb-client-python/pull/132): Use microseconds resolutions for data points

### Bug Fixes
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values
1. [#130](https://github.com/influxdata/influxdb-client-python/pull/130): Use `Retry-After` header value for Retryable error codes
1. [#130](https://github.com/influxdata/influxdb-client-python/pull/130): Use `Retry-After` header value for Retryable error codes

## 1.8.0 [2020-06-19]

Expand Down Expand Up @@ -153,13 +156,13 @@
1. [#27](https://github.com/influxdata/influxdb-client-python/issues/27): Added possibility to write bytes type of data
1. [#30](https://github.com/influxdata/influxdb-client-python/issues/30): Added support for streaming a query response
1. [#35](https://github.com/influxdata/influxdb-client-python/pull/35): FluxRecord supports dictionary-style access
1. [#31](https://github.com/influxdata/influxdb-client-python/issues/31): Added support for delete metrics
1. [#31](https://github.com/influxdata/influxdb-client-python/issues/31): Added support for delete metrics

### API
1. [#28](https://github.com/bonitoo-io/influxdb-client-python/pull/28): Updated swagger to latest version

### Bug Fixes
1. [#19](https://github.com/bonitoo-io/influxdb-client-python/pull/19): Removed strict checking of enum values
1. [#19](https://github.com/bonitoo-io/influxdb-client-python/pull/19): Removed strict checking of enum values

### Documentation
1. [#22](https://github.com/bonitoo-io/influxdb-client-python/issues/22): Documented how to connect to InfluxCloud
Expand All @@ -168,8 +171,8 @@

### Features
1. [#2](https://github.com/bonitoo-io/influxdb-client-python/issues/2): The write client is able to write data in batches (configuration: `batch_size`, `flush_interval`, `jitter_interval`, `retry_interval`)
1. [#5](https://github.com/bonitoo-io/influxdb-client-python/issues/5): Added support for gzip compression of query response and write body
1. [#5](https://github.com/bonitoo-io/influxdb-client-python/issues/5): Added support for gzip compression of query response and write body

### API
1. [#10](https://github.com/bonitoo-io/influxdb-client-python/pull/10): Updated swagger to latest version

Expand All @@ -178,5 +181,5 @@
1. [#7](https://github.com/bonitoo-io/influxdb-client-python/issues/7): Drop NaN and infinity values from fields when writing to InfluxDB

### CI
1. [#11](https://github.com/bonitoo-io/influxdb-client-python/pull/11): Switch CI to CircleCI
1. [#12](https://github.com/bonitoo-io/influxdb-client-python/pull/12): CI generate code coverage report on CircleCI
1. [#11](https://github.com/bonitoo-io/influxdb-client-python/pull/11): Switch CI to CircleCI
1. [#12](https://github.com/bonitoo-io/influxdb-client-python/pull/12): CI generate code coverage report on CircleCI
204 changes: 144 additions & 60 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,115 +5,199 @@
"""

import re
from functools import reduce
from itertools import chain
import math

from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT


def _replace(data_frame):
from ...extras import np

# string columns
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}

# number columns
other_cols = set(data_frame.columns) - obj_cols

obj_nans = (f'{k}=nan' for k in obj_cols)
other_nans = (f'{k}=nani?' for k in other_cols)

replacements = [
('|'.join(chain(obj_nans, other_nans)), ''),
(',{2,}', ','),
('|'.join([', ,', ', ', ' ,']), ' '),
]

return replacements


def _itertuples(data_frame):
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
return zip(data_frame.index, *cols)


def _is_nan(x):
return x != x
def _not_nan(x):
return x == x


def _any_not_nan(p, indexes):
return any(map(lambda inx: not _is_nan(p[inx]), indexes))
return any(map(lambda x: _not_nan(p[x]), indexes))


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
"""Serialize DataFrame into LineProtocols."""
# This function is hard to understand but for good reason:
# the approach used here is considerably more efficient
# than the alternatives.
#
# We build up a Python expression that efficiently converts a data point
# tuple into line-protocol entry, and then evaluate the expression
# as a lambda so that we can call it. This avoids the overhead of
# invoking a function on every data value - we only have one function
# call per row instead. The expression consists of exactly
# one f-string, so we build up the parts of it as segments
# that are concatenated together to make the full f-string inside
# the lambda.
#
# Things are made a little more complex because fields and tags with NaN
# values and empty tags are omitted from the generated line-protocol
# output.
#
# As an example, say we have a data frame with two value columns:
# a float
# b int
#
# This will generate a lambda expression to be evaluated that looks like
# this:
#
# lambda p: f"""{measurement_name} {keys[0]}={p[1]},{keys[1]}={p[2]}i {p[0].value}"""
#
# This lambda is then executed for each row p.
#
# When NaNs are present, the expression looks like this (split
# across two lines to satisfy the code-style checker)
#
# lambda p: f"""{measurement_name} {"" if math.isnan(p[1])
# else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}"""
#
# When there's a NaN value in column a, we'll end up with a comma at the start of the
# fields, so we run a regexp substitution after generating the line-protocol entries
# to remove this.
#
# We're careful to run these potentially costly extra steps only when NaN values actually
# exist in the data.

from ...extras import pd, np
if not isinstance(data_frame, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(data_frame)))

if 'data_frame_measurement_name' not in kwargs:
data_frame_measurement_name = kwargs.get('data_frame_measurement_name')
if data_frame_measurement_name is None:
raise TypeError('"data_frame_measurement_name" is a Required Argument')

data_frame = data_frame.copy(deep=False)
if isinstance(data_frame.index, pd.PeriodIndex):
data_frame.index = data_frame.index.to_timestamp()
else:
# TODO: this is almost certainly not what you want
# when the index is the default RangeIndex.
# Instead, it would probably be better to leave
# out the timestamp unless a time column is explicitly
# enabled.
data_frame.index = pd.to_datetime(data_frame.index)

if data_frame.index.tzinfo is None:
data_frame.index = data_frame.index.tz_localize('UTC')

measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
data_frame_tag_columns = set(data_frame_tag_columns or [])

# keys holds a list of string keys.
keys = []
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
tags = []
# fields holds a list of field f-string segments ordered alphebetically by field key
fields = []
fields_indexes = []
keys = []
# field_indexes holds the index into each row of all the fields.
field_indexes = []

if point_settings.defaultTags:
for key, value in point_settings.defaultTags.items():
data_frame[key] = value
data_frame_tag_columns.add(key)

for index, (key, value) in enumerate(data_frame.dtypes.items()):
# Avoid overwriting existing data if there's a column
# that already exists with the default tag's name.
# Note: when a new column is added, the old DataFrame
# that we've made a shallow copy of is unaffected.
# TODO: when there are NaN or empty values in
# the column, we could make a deep copy of the
# data and fill in those values with the default tag value.
if key not in data_frame.columns:
data_frame[key] = value
data_frame_tag_columns.add(key)

# Get a list of all the columns sorted by field/tag key.
# We want to iterate through the columns in sorted order
# so that we know when we're on the first field so we
# can know whether a comma is needed for that
# field.
columns = sorted(enumerate(data_frame.dtypes.items()), key=lambda col: col[1][0])

# null_columns has a bool value for each column holding
# whether that column contains any null (NaN or None) values.
null_columns = data_frame.isnull().any()

# Iterate through the columns building up the expression for each column.
for index, (key, value) in columns:
key = str(key)
key_format = f'{{keys[{len(keys)}]}}'
keys.append(key.translate(_ESCAPE_KEY))
key_format = f'{{keys[{index}]}}'
# The field index is one more than the column index because the
# time index is at column zero in the finally zipped-together
# result columns.
field_index = index + 1
val_format = f'p[{field_index}]'

index_value = index + 1
if key in data_frame_tag_columns:
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index_value}]).translate(_ESCAPE_KEY)}}"})
elif issubclass(value.type, np.integer):
fields.append(f"{key_format}={{p[{index_value}]}}i")
fields_indexes.append(index_value)
elif issubclass(value.type, (np.float, np.bool_)):
fields.append(f"{key_format}={{p[{index_value}]}}")
fields_indexes.append(index_value)
# This column is a tag column.
if null_columns[index]:
key_value = f"""{{
'' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
}}"""
else:
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
tags.append(key_value)
continue

# This column is a field column.
# Note: no comma separator is needed for the first field.
# It's important to omit it because when the first
# field column has no nulls, we don't run the comma-removal
# regexp substitution step.
sep = '' if len(field_indexes) == 0 else ','
if issubclass(value.type, np.integer):
field_value = f"{sep}{key_format}={{{val_format}}}i"
elif issubclass(value.type, np.bool_):
field_value = f'{sep}{key_format}={{{val_format}}}'
elif issubclass(value.type, np.float):
if null_columns[index]:
field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}"""
else:
field_value = f'{sep}{key_format}={{{val_format}}}'
else:
fields.append(f"{key_format}=\"{{str(p[{index_value}]).translate(_ESCAPE_STRING)}}\"")
fields_indexes.append(index_value)

tags.sort(key=lambda x: x['key'])
tags = ','.join(map(lambda y: y['value'], tags))

fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)),
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING,
'keys': keys})
if null_columns[index]:
field_value = f"""{{
'' if type({val_format}) == float and math.isnan({val_format}) else
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
}}"""
else:
field_value = f'''{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'''
field_indexes.append(field_index)
fields.append(field_value)

measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT)

tags = ''.join(tags)
fields = ''.join(fields)
timestamp = '{p[0].value}'

f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
'measurement_name': measurement_name,
'_ESCAPE_KEY': _ESCAPE_KEY,
'_ESCAPE_STRING': _ESCAPE_STRING,
'keys': keys,
'math': math,
})

for k, v in dict(data_frame.dtypes).items():
if k in data_frame_tag_columns:
data_frame[k].replace('', np.nan, inplace=True)

isnull = data_frame.isnull().any(axis=1)

if isnull.any():
rep = _replace(data_frame)
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
for p in filter(lambda x: _any_not_nan(x, fields_indexes), _itertuples(data_frame)))
first_field_maybe_null = null_columns[field_indexes[0] - 1]
if first_field_maybe_null:
# When the first field is null (None/NaN), we'll have
# a spurious leading comma which needs to be removed.
lp = (re.sub('^((\\ |[^ ])* ),', '\\1', f(p))
for p in filter(lambda x: _any_not_nan(x, field_indexes), _itertuples(data_frame)))
return list(lp)
else:
return list(map(f, _itertuples(data_frame)))
28 changes: 25 additions & 3 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,32 @@
from influxdb_client.domain.write_precision import WritePrecision

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))

DEFAULT_WRITE_PRECISION = WritePrecision.NS
_ESCAPE_MEASUREMENT = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
_ESCAPE_KEY = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
_ESCAPE_STRING = str.maketrans({'\"': r"\"", "\\": r"\\"})

_ESCAPE_MEASUREMENT = str.maketrans({
'\\': r'\\', # Note: this is wrong. Backslashes are not escaped like this in measurements.
',': r'\,',
' ': r'\ ',
'\n': r'\n',
'\t': r'\t',
'\r': r'\r',
})

_ESCAPE_KEY = str.maketrans({
'\\': r'\\', # Note: this is wrong. Backslashes are not escaped like this in keys.
',': r'\,',
'=': r'\=',
' ': r'\ ',
'\n': r'\n',
'\t': r'\t',
'\r': r'\r',
})

_ESCAPE_STRING = str.maketrans({
'"': r'\"',
'\\': r'\\',
})


class Point(object):
Expand Down
Loading