-
Notifications
You must be signed in to change notification settings - Fork 186
FluxRecord returns '_start': None, '_stop': None, '_time': None #471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi @rdinoff, thanks for using our client. The client just parse InfluxDB's response to
Regards |
I can recreate this problem very consistently using threads, if debug=False. Set debug=True and problem does not occur. Also the problem does not occur if using the asyncio api
|
@rdinoff thanks for your example. Unfortunately, I am not able to simulate your issue. I am using following script: import threading
from influxdb_client import InfluxDBClient
opt = {
'start': 1658347680,
'stop': 1658347685
}
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'
def prepare_data():
print("preparing data...")
with InfluxDBClient(url=url, token=token, org=org) as client:
with client.write_api() as write_api:
for i in range(0, 100_000):
_timestamp = (opt['start'] * 1000000000) + i
write_api.write(bucket, org, f'haip,world=ny value={i},value2={i} {_timestamp}')
write_api.write(bucket, org, f'ors,world=ny value={i},value2={i} {_timestamp}')
print("done")
def ors_data():
client = InfluxDBClient(url=url, token=token, org=org)
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: {opt['start']}, stop: {opt['stop']})
|> filter(fn: (r) => r._measurement == "ors")
|> filter(fn: (r) => r["world"] == "ny")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
records = query_api.query_stream(query)
for record in records:
if record.values["_time"] is None:
print("ERROR ors No time")
def haip_data():
client = InfluxDBClient(url=url, token=token, org=org)
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: {opt['start']}, stop: {opt['stop']})
|> filter(fn: (r) => r._measurement == "haip")
|> filter(fn: (r) => r["world"] == "ny")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
records = query_api.query_stream(query)
for record in records:
if record.values["_time"] is None:
print("ERROR: haip No time")
if __name__ == "__main__":
prepare_data()
t2 = threading.Thread(target=ors_data)
t3 = threading.Thread(target=haip_data)
t2.start()
t3.start()
t2.join()
t3.join()
Can you share a little bit more about your data - amount and structure? Thanks |
after changing my example code to use InfluxDBClient, I can set debug=True and see the error You can see the structure form the FluxRecord. The rows are ~300,000 rows (pivot) per measurement, but for the 5 seconds I am reproducing the error when there are only 9 rows (pivot)
|
Sometimes FluxRecord has None for _start, _stop and _time. Run again with same time range and all works.
FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': None, '_stop': None, '_time': None, '_measurement': 'ors', 'id': 'r1', 'world': 'ny', 'confidence': 0.695238, 'cs': 'ucs', 'from': 'ors', 'msgtype': 'rawpose', 'orientation_type': 'quat', 'orientation_w': 0.7009092642998509, 'orientation_x': 0.0, 'orientation_y': 0.0, 'orientation_z': 0.7132504491541816, 'plane': 0.0, 'position_x': -5.015499999999989, 'position_y': 0.629000000000012, 'position_z': 0.0, 'raw': True, 'senderhostname': None, 'sendersha': '58e9817db955bf6e94799416479b96707343b2b5', 'senderversion': 'v7.0.0'}
Expected behavior:
To always have valid time fields
Actual behavior:
FluxRecord returns '_start': None, '_stop': None, '_time': None
Specifications:
The text was updated successfully, but these errors were encountered: