Skip to content

Commit 76bdb62

Browse files
authored
feat: add supports for write structured data - NamedTuple, Data Classes (#330)
1 parent 74d89a7 commit 76bdb62

File tree

12 files changed

+411
-26
lines changed

12 files changed

+411
-26
lines changed

.circleci/config.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ jobs:
108108
pydocstyle --count influxdb_client
109109
check-examples:
110110
docker:
111-
- image: *default-python
111+
- image: "cimg/python:3.8"
112112
environment:
113113
PIPENV_VENV_IN_PROJECT: true
114114
- image: *default-influxdb
@@ -123,6 +123,7 @@ jobs:
123123
export PYTHONPATH="$PWD"
124124
python examples/monitoring_and_alerting.py
125125
python examples/buckets_management.py
126+
python examples/write_structured_data.py
126127
check-sphinx:
127128
docker:
128129
- image: *default-python

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.22.0 [unreleased]
22

3+
### Features
4+
1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add supports for write structured data - `NamedTuple`, `Data Classes`
5+
36
### Documentation
47
1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst)
58

README.rst

+8-5
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,14 @@ The data could be written as
362362

363363
1. ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol
364364
2. `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__ structure
365-
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
366-
4. List of above items
367-
5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
365+
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure
366+
4. `NamedTuple <https://docs.python.org/3/library/collections.html#collections.namedtuple>`_
367+
5. `Data Classes <https://docs.python.org/3/library/dataclasses.html>`_
368368
6. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
369+
7. List of above items
370+
8. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
369371

372+
You can find write examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#writes>`__.
370373

371374
Batching
372375
""""""""
@@ -532,7 +535,7 @@ In a `init <https://docs.python.org/3/library/configparser.html>`_ configuration
532535
customer = California Miner
533536
data_center = ${env.data_center}
534537
535-
You could also use a `TOML <https://toml.io/en/>`_ format for the configuration file.
538+
You can also use a `TOML <https://toml.io/en/>`_ format for the configuration file.
536539

537540
Via Environment Properties
538541
__________________________
@@ -1048,7 +1051,7 @@ The second example shows how to use client capabilities to realtime visualizatio
10481051
Other examples
10491052
""""""""""""""
10501053

1051-
You could find all examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples>`_.
1054+
You can find all examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples>`__.
10521055

10531056
.. marker-examples-end
10541057

docs/api.rst

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ WriteApi
1919
.. autoclass:: influxdb_client.WriteApi
2020
:members:
2121

22+
.. autoclass:: influxdb_client.client.write.point.Point
23+
:members:
24+
25+
.. autoclass:: influxdb_client.domain.write_precision.WritePrecision
26+
:members:
27+
2228
BucketsApi
2329
""""""""""
2430
.. autoclass:: influxdb_client.BucketsApi

examples/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame
88
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
99
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
10+
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
1011

1112
## Queries
1213
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`

examples/example.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,4 @@
3737
print("val count: ", val_count)
3838

3939
response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
40-
print (codecs.decode(response.data))
40+
print(codecs.decode(response.data))

examples/write_structured_data.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from collections import namedtuple
2+
from dataclasses import dataclass
3+
from datetime import datetime
4+
5+
from influxdb_client import InfluxDBClient
6+
from influxdb_client.client.write_api import SYNCHRONOUS
7+
8+
9+
class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])):
10+
"""
11+
Named structure - Sensor
12+
"""
13+
pass
14+
15+
16+
@dataclass
17+
class Car:
18+
"""
19+
DataClass structure - Car
20+
"""
21+
engine: str
22+
type: str
23+
speed: float
24+
25+
26+
"""
27+
Initialize client
28+
"""
29+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
30+
write_api = client.write_api(write_options=SYNCHRONOUS)
31+
32+
"""
33+
Sensor "current" state
34+
"""
35+
sensor = Sensor(name="sensor_pt859",
36+
location="warehouse_125",
37+
version="2021.06.05.5874",
38+
pressure=125,
39+
temperature=10,
40+
timestamp=datetime.utcnow())
41+
print(sensor)
42+
43+
"""
44+
Synchronous write
45+
"""
46+
write_api.write(bucket="my-bucket",
47+
record=sensor,
48+
record_measurement_key="name",
49+
record_time_key="timestamp",
50+
record_tag_keys=["location", "version"],
51+
record_field_keys=["pressure", "temperature"])
52+
53+
"""
54+
Car "current" speed
55+
"""
56+
car = Car('12V-BT', 'sport-cars', 125.25)
57+
print(car)
58+
59+
"""
60+
Synchronous write
61+
"""
62+
write_api.write(bucket="my-bucket",
63+
record=car,
64+
record_measurement_name="performance",
65+
record_tag_keys=["engine", "type"],
66+
record_field_keys=["speed"])

influxdb_client/client/write/point.py

+74-8
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,82 @@ def measurement(measurement):
6161
return p
6262

6363
@staticmethod
64-
def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION):
65-
"""Initialize point from 'dict' structure."""
66-
point = Point(dictionary['measurement'])
67-
if 'tags' in dictionary:
64+
def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs):
65+
"""
66+
Initialize point from 'dict' structure.
67+
68+
The expected dict structure is:
69+
- measurement
70+
- tags
71+
- fields
72+
- time
73+
74+
Example:
75+
.. code-block:: python
76+
77+
# Use default dictionary structure
78+
dict_structure = {
79+
"measurement": "h2o_feet",
80+
"tags": {"location": "coyote_creek"},
81+
"fields": {"water_level": 1.0},
82+
"time": 1
83+
}
84+
point = Point.from_dict(dict_structure, WritePrecision.NS)
85+
86+
Example:
87+
.. code-block:: python
88+
89+
# Use custom dictionary structure
90+
dictionary = {
91+
"name": "sensor_pt859",
92+
"location": "warehouse_125",
93+
"version": "2021.06.05.5874",
94+
"pressure": 125,
95+
"temperature": 10,
96+
"created": 1632208639,
97+
}
98+
point = Point.from_dict(dictionary,
99+
write_precision=WritePrecision.S,
100+
record_measurement_key="name",
101+
record_time_key="created",
102+
record_tag_keys=["location", "version"],
103+
record_field_keys=["pressure", "temperature"])
104+
105+
:param dictionary: dictionary for serialize into data Point
106+
:param write_precision: sets the precision for the supplied time values
107+
:key record_measurement_key: key of dictionary with specified measurement
108+
:key record_measurement_name: static measurement name for data Point
109+
:key record_time_key: key of dictionary with specified timestamp
110+
:key record_tag_keys: list of dictionary keys to use as a tag
111+
:key record_field_keys: list of dictionary keys to use as a field
112+
:return: new data point
113+
"""
114+
measurement_ = kwargs.get('record_measurement_name', None)
115+
if measurement_ is None:
116+
measurement_ = dictionary[kwargs.get('record_measurement_key', 'measurement')]
117+
point = Point(measurement_)
118+
119+
record_tag_keys = kwargs.get('record_tag_keys', None)
120+
if record_tag_keys is not None:
121+
for tag_key in record_tag_keys:
122+
if tag_key in dictionary:
123+
point.tag(tag_key, dictionary[tag_key])
124+
elif 'tags' in dictionary:
68125
for tag_key, tag_value in dictionary['tags'].items():
69126
point.tag(tag_key, tag_value)
70-
for field_key, field_value in dictionary['fields'].items():
71-
point.field(field_key, field_value)
72-
if 'time' in dictionary:
73-
point.time(dictionary['time'], write_precision=write_precision)
127+
128+
record_field_keys = kwargs.get('record_field_keys', None)
129+
if record_field_keys is not None:
130+
for field_key in record_field_keys:
131+
if field_key in dictionary:
132+
point.field(field_key, dictionary[field_key])
133+
else:
134+
for field_key, field_value in dictionary['fields'].items():
135+
point.field(field_key, field_value)
136+
137+
record_time_key = kwargs.get('record_time_key', 'time')
138+
if record_time_key in dictionary:
139+
point.time(dictionary[record_time_key], write_precision=write_precision)
74140
return point
75141

76142
def __init__(self, measurement_name):

influxdb_client/client/write_api.py

+72-10
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from enum import Enum
99
from random import random
1010
from time import sleep
11-
from typing import Union, Any, Iterable
11+
from typing import Union, Any, Iterable, NamedTuple
1212

1313
import rx
1414
from rx import operators as ops, Observable
@@ -24,6 +24,15 @@
2424
logger = logging.getLogger(__name__)
2525

2626

27+
try:
28+
import dataclasses
29+
from dataclasses import dataclass
30+
31+
_HAS_DATACLASS = True
32+
except ModuleNotFoundError:
33+
_HAS_DATACLASS = False
34+
35+
2736
class WriteType(Enum):
2837
"""Configuration which type of writes will client use."""
2938

@@ -173,7 +182,20 @@ def _body_reduce(batch_items):
173182

174183

175184
class WriteApi:
176-
"""Implementation for '/api/v2/write' endpoint."""
185+
"""
186+
Implementation for '/api/v2/write' endpoint.
187+
188+
Example:
189+
.. code-block:: python
190+
191+
from influxdb_client import InfluxDBClient
192+
from influxdb_client.client.write_api import SYNCHRONOUS
193+
194+
195+
# Initialize SYNCHRONOUS instance of WriteApi
196+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
197+
write_api = client.write_api(write_options=SYNCHRONOUS)
198+
"""
177199

178200
def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(),
179201
point_settings: PointSettings = PointSettings()) -> None:
@@ -217,21 +239,51 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
217239
def write(self, bucket: str, org: str = None,
218240
record: Union[
219241
str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'],
220-
Observable] = None,
242+
Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass']
243+
] = None,
221244
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
222245
"""
223246
Write time-series data into InfluxDB.
224247
248+
:param str bucket: specifies the destination bucket for writes (required)
225249
:param str, Organization org: specifies the destination organization for writes;
226250
take the ID, Name or Organization;
227251
if it's not specified then is used default from client.org.
228-
:param str bucket: specifies the destination bucket for writes (required)
229252
:param WritePrecision write_precision: specifies the precision for the unix timestamps within
230253
the body line-protocol. The precision specified on a Point has precedes
231254
and is use for write.
232-
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
233-
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
234-
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
255+
:param record: Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or
256+
RxPY Observable to write
257+
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
258+
:key data_frame_tag_columns: list of DataFrame columns which are tags,
259+
rest columns will be fields - ``DataFrame``
260+
:key record_measurement_key: key of record with specified measurement -
261+
``dictionary``, ``NamedTuple``, ``dataclass``
262+
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``
263+
:key record_time_key: key of record with specified timestamp - ``dictionary``, ``NamedTuple``, ``dataclass``
264+
:key record_tag_keys: list of record keys to use as a tag - ``dictionary``, ``NamedTuple``, ``dataclass``
265+
:key record_field_keys: list of record keys to use as a field - ``dictionary``, ``NamedTuple``, ``dataclass``
266+
267+
Example:
268+
.. code-block:: python
269+
270+
# Record as Line Protocol
271+
write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1")
272+
273+
# Record as Dictionary
274+
dictionary = {
275+
"measurement": "h2o_feet",
276+
"tags": {"location": "us-west"},
277+
"fields": {"level": 125},
278+
"time": 1
279+
}
280+
write_api.write("my-bucket", "my-org", dictionary)
281+
282+
# Record as Point
283+
from influxdb_client import Point
284+
point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1)
285+
write_api.write("my-bucket", "my-org", point)
286+
235287
"""
236288
org = get_org_query_param(org=org, client=self._influxdb_client)
237289

@@ -309,12 +361,16 @@ def _serialize(self, record, write_precision, payload, **kwargs):
309361
self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs)
310362

311363
elif isinstance(record, dict):
312-
self._serialize(Point.from_dict(record, write_precision=write_precision),
364+
self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs),
313365
write_precision, payload, **kwargs)
314366
elif 'DataFrame' in type(record).__name__:
315367
serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs)
316368
self._serialize(serializer.serialize(), write_precision, payload, **kwargs)
317-
369+
elif hasattr(record, "_asdict"):
370+
# noinspection PyProtectedMember
371+
self._serialize(record._asdict(), write_precision, payload, **kwargs)
372+
elif _HAS_DATACLASS and dataclasses.is_dataclass(record):
373+
self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs)
318374
elif isinstance(record, Iterable):
319375
for item in record:
320376
self._serialize(item, write_precision, payload, **kwargs)
@@ -334,7 +390,7 @@ def _write_batching(self, bucket, org, data,
334390
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)
335391

336392
elif isinstance(data, dict):
337-
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),
393+
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs),
338394
precision, **kwargs)
339395

340396
elif 'DataFrame' in type(data).__name__:
@@ -344,6 +400,12 @@ def _write_batching(self, bucket, org, data,
344400
self._write_batching(bucket, org,
345401
serializer.serialize(chunk_idx),
346402
precision, **kwargs)
403+
elif hasattr(data, "_asdict"):
404+
# noinspection PyProtectedMember
405+
self._write_batching(bucket, org, data._asdict(), precision, **kwargs)
406+
407+
elif _HAS_DATACLASS and dataclasses.is_dataclass(data):
408+
self._write_batching(bucket, org, dataclasses.asdict(data), precision, **kwargs)
347409

348410
elif isinstance(data, Iterable):
349411
for item in data:

0 commit comments

Comments
 (0)