Skip to content

Commit 9da33f0

Browse files
committed
chore: influxdb_client/client/write: fix data_frame_to_list_of_points
Fix the possibility of data corruption by using a much simpler regular expression to fix up the results.
1 parent 91dcafb commit 9da33f0

File tree

4 files changed

+302
-110
lines changed

4 files changed

+302
-110
lines changed

.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,3 @@ sandbox
114114
# OpenAPI-generator
115115
/.openapi-generator*
116116
**/writer.pickle
117-
/tests/data_frame_file.csv

influxdb_client/client/write/dataframe_serializer.py

+142-60
Original file line numberDiff line numberDiff line change
@@ -5,115 +5,197 @@
55
"""
66

77
import re
8-
from functools import reduce
9-
from itertools import chain
8+
import math
109

1110
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT
1211

1312

14-
def _replace(data_frame):
15-
from ...extras import np
16-
17-
# string columns
18-
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}
19-
20-
# number columns
21-
other_cols = set(data_frame.columns) - obj_cols
22-
23-
obj_nans = (f'{k}=nan' for k in obj_cols)
24-
other_nans = (f'{k}=nani?' for k in other_cols)
25-
26-
replacements = [
27-
('|'.join(chain(obj_nans, other_nans)), ''),
28-
(',{2,}', ','),
29-
('|'.join([', ,', ', ', ' ,']), ' '),
30-
]
31-
32-
return replacements
33-
34-
3513
def _itertuples(data_frame):
3614
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
3715
return zip(data_frame.index, *cols)
3816

3917

40-
def _is_nan(x):
41-
return x != x
18+
def _not_nan(x):
19+
return x == x
4220

4321

4422
def _any_not_nan(p, indexes):
45-
return any(map(lambda inx: not _is_nan(p[inx]), indexes))
23+
return any(map(lambda x: _not_nan(p[x]), indexes))
4624

4725

4826
def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
4927
"""Serialize DataFrame into LineProtocols."""
28+
# This function is hard to understand but for good reason:
29+
# the approach used here is considerably more efficient
30+
# than the alternatives.
31+
#
32+
# We build up a Python expression that efficiently converts a data point
33+
# tuple into line-protocol entry, and then evaluate the expression
34+
# as a lambda so that we can call it. This avoids the overhead of
35+
# invoking a function on every data value - we only have one function
36+
# call per row instead. The expression consists of exactly
37+
# one f-string, so we build up the parts of it as segments
38+
# that are concatenated together to make the full f-string inside
39+
# the lambda.
40+
#
41+
# Things are made a little more complex because fields and tags with NaN
42+
# values and empty tags are omitted from the generated line-protocol
43+
# output.
44+
#
45+
# As an example, say we have a data frame with two value columns:
46+
# a float
47+
# b int
48+
#
49+
# This will generate a lambda expression to be evaluated that looks like
50+
# this:
51+
#
52+
# lambda p: f"""{measurement_name} {keys[0]}={p[1]},{keys[1]}={p[2]}i {p[0].value}"""
53+
#
54+
# This lambda is then executed for each row p.
55+
#
56+
# When NaNs are present, the expression looks like this:
57+
#
58+
# lambda p: f"""{measurement_name} {"" if math.isnan(p[1]) else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}"""
59+
#
60+
# When there's a NaN value in column a, we'll end up with a comma at the start of the
61+
# fields, so we run a regexp substitution after generating the line-protocol entries
62+
# to remove this.
63+
#
64+
# We're careful to run these potentially costly extra steps only when NaN values actually
65+
# exist in the data.
66+
5067
from ...extras import pd, np
5168
if not isinstance(data_frame, pd.DataFrame):
5269
raise TypeError('Must be DataFrame, but type was: {0}.'
5370
.format(type(data_frame)))
5471

55-
if 'data_frame_measurement_name' not in kwargs:
72+
data_frame_measurement_name = kwargs.get('data_frame_measurement_name')
73+
if data_frame_measurement_name is None:
5674
raise TypeError('"data_frame_measurement_name" is a Required Argument')
5775

76+
data_frame = data_frame.copy(deep=False)
5877
if isinstance(data_frame.index, pd.PeriodIndex):
5978
data_frame.index = data_frame.index.to_timestamp()
6079
else:
80+
# TODO: this is almost certainly not what you want
81+
# when the index is the default RangeIndex.
82+
# Instead, it would probably be better to leave
83+
# out the timestamp unless a time column is explicitly
84+
# enabled.
6185
data_frame.index = pd.to_datetime(data_frame.index)
6286

6387
if data_frame.index.tzinfo is None:
6488
data_frame.index = data_frame.index.tz_localize('UTC')
6589

66-
measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
6790
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
6891
data_frame_tag_columns = set(data_frame_tag_columns or [])
6992

93+
# keys holds a list of string keys.
94+
keys = []
95+
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
7096
tags = []
97+
# fields holds a list of field f-string segments ordered alphebetically by field key
7198
fields = []
72-
fields_indexes = []
73-
keys = []
99+
# field_indexes holds the index into each row of all the fields.
100+
field_indexes = []
74101

75102
if point_settings.defaultTags:
76103
for key, value in point_settings.defaultTags.items():
77-
data_frame[key] = value
78-
data_frame_tag_columns.add(key)
79-
80-
for index, (key, value) in enumerate(data_frame.dtypes.items()):
104+
# Avoid overwriting existing data if there's a column
105+
# that already exists with the default tag's name.
106+
# Note: when a new column is added, the old DataFrame
107+
# that we've made a shallow copy of is unaffected.
108+
# TODO: when there are NaN or empty values in
109+
# the column, we could make a deep copy of the
110+
# data and fill in those values with the default tag value.
111+
if key not in data_frame.columns:
112+
data_frame[key] = value
113+
data_frame_tag_columns.add(key)
114+
115+
# Get a list of all the columns sorted by field/tag key.
116+
# We want to iterate through the columns in sorted order
117+
# so that we know when we're on the first field so we
118+
# can know whether a comma is needed for that
119+
# field.
120+
columns = sorted(enumerate(data_frame.dtypes.items()), key=lambda col: col[1][0])
121+
122+
# null_columns has a bool value for each column holding
123+
# whether that column contains any null (NaN or None) values.
124+
null_columns = data_frame.isnull().any()
125+
126+
# Iterate through the columns building up the expression for each column.
127+
for index, (key, value) in columns:
81128
key = str(key)
129+
key_format = f'{{keys[{len(keys)}]}}'
82130
keys.append(key.translate(_ESCAPE_KEY))
83-
key_format = f'{{keys[{index}]}}'
131+
# The field index is one more than the column index because the
132+
# time index is at column zero in the finally zipped-together
133+
# result columns.
134+
field_index = index + 1
135+
val_format = f'p[{field_index}]'
84136

85-
index_value = index + 1
86137
if key in data_frame_tag_columns:
87-
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index_value}]).translate(_ESCAPE_KEY)}}"})
88-
elif issubclass(value.type, np.integer):
89-
fields.append(f"{key_format}={{p[{index_value}]}}i")
90-
fields_indexes.append(index_value)
91-
elif issubclass(value.type, (np.float, np.bool_)):
92-
fields.append(f"{key_format}={{p[{index_value}]}}")
93-
fields_indexes.append(index_value)
138+
# This column is a tag column.
139+
if null_columns[index]:
140+
key_value = f"""{{
141+
'' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else
142+
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
143+
}}"""
144+
else:
145+
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
146+
tags.append(key_value)
147+
continue
148+
149+
# This column is a field column.
150+
# Note: no comma separator is needed for the first field.
151+
# It's important to omit it because when the first
152+
# field column has no nulls, we don't run the comma-removal
153+
# regexp substitution step.
154+
sep = '' if len(field_indexes) == 0 else ','
155+
if issubclass(value.type, np.integer):
156+
field_value = f"{sep}{key_format}={{{val_format}}}i"
157+
elif issubclass(value.type, np.bool_):
158+
field_value = f'{sep}{key_format}={{{val_format}}}'
159+
elif issubclass(value.type, np.float):
160+
if null_columns[index]:
161+
field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}"""
162+
else:
163+
field_value = f'{sep}{key_format}={{{val_format}}}'
94164
else:
95-
fields.append(f"{key_format}=\"{{str(p[{index_value}]).translate(_ESCAPE_STRING)}}\"")
96-
fields_indexes.append(index_value)
97-
98-
tags.sort(key=lambda x: x['key'])
99-
tags = ','.join(map(lambda y: y['value'], tags))
100-
101-
fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
102-
' ', ','.join(fields), ' {p[0].value}')
103-
f = eval("lambda p: f'{}'".format(''.join(fmt)),
104-
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING,
105-
'keys': keys})
165+
if null_columns[index]:
166+
field_value = f"""{{
167+
'' if type({val_format}) == float and math.isnan({val_format}) else
168+
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
169+
}}"""
170+
else:
171+
field_value = f'''{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'''
172+
field_indexes.append(field_index)
173+
fields.append(field_value)
174+
175+
measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT)
176+
177+
tags = ''.join(tags)
178+
fields = ''.join(fields)
179+
timestamp = '{p[0].value}'
180+
181+
f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
182+
'measurement_name': measurement_name,
183+
'_ESCAPE_KEY': _ESCAPE_KEY,
184+
'_ESCAPE_STRING': _ESCAPE_STRING,
185+
'keys': keys,
186+
'math': math,
187+
})
106188

107189
for k, v in dict(data_frame.dtypes).items():
108190
if k in data_frame_tag_columns:
109191
data_frame[k].replace('', np.nan, inplace=True)
110192

111-
isnull = data_frame.isnull().any(axis=1)
112-
113-
if isnull.any():
114-
rep = _replace(data_frame)
115-
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
116-
for p in filter(lambda x: _any_not_nan(x, fields_indexes), _itertuples(data_frame)))
193+
first_field_maybe_null = null_columns[field_indexes[0] - 1]
194+
if first_field_maybe_null:
195+
# When the first field is null (None/NaN), we'll have
196+
# a spurious leading comma which needs to be removed.
197+
lp = (re.sub('^((\\ |[^ ])* ),', '\\1', f(p))
198+
for p in filter(lambda x: _any_not_nan(x, field_indexes), _itertuples(data_frame)))
117199
return list(lp)
118200
else:
119201
return list(map(f, _itertuples(data_frame)))

influxdb_client/client/write/point.py

+25-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,32 @@
1414
from influxdb_client.domain.write_precision import WritePrecision
1515

1616
EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
17+
1718
DEFAULT_WRITE_PRECISION = WritePrecision.NS
18-
_ESCAPE_MEASUREMENT = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
19-
_ESCAPE_KEY = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
20-
_ESCAPE_STRING = str.maketrans({'\"': r"\"", "\\": r"\\"})
19+
20+
_ESCAPE_MEASUREMENT = str.maketrans({
21+
'\\': r'\\', # Note: this is wrong. Backslashes are not escaped like this in measurements.
22+
',': r'\,',
23+
' ': r'\ ',
24+
'\n': r'\n',
25+
'\t': r'\t',
26+
'\r': r'\r',
27+
})
28+
29+
_ESCAPE_KEY = str.maketrans({
30+
'\\': r'\\', # Note: this is wrong. Backslashes are not escaped like this in keys.
31+
',': r'\,',
32+
'=': r'\=',
33+
' ': r'\ ',
34+
'\n': r'\n',
35+
'\t': r'\t',
36+
'\r': r'\r',
37+
})
38+
39+
_ESCAPE_STRING = str.maketrans({
40+
'"': r'\"',
41+
'\\': r'\\',
42+
})
2143

2244

2345
class Point(object):

0 commit comments

Comments
 (0)