Skip to content

FluxRecord returns '_start': None, '_stop': None, '_time': None #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rdinoff opened this issue Jul 22, 2022 · 4 comments · Fixed by #477
Closed

FluxRecord returns '_start': None, '_stop': None, '_time': None #471

rdinoff opened this issue Jul 22, 2022 · 4 comments · Fixed by #477
Labels
bug Something isn't working
Milestone

Comments

@rdinoff
Copy link
Contributor

rdinoff commented Jul 22, 2022

Sometimes FluxRecord has None for _start, _stop and _time. Run again with same time range and all works.

FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': None, '_stop': None, '_time': None, '_measurement': 'ors', 'id': 'r1', 'world': 'ny', 'confidence': 0.695238, 'cs': 'ucs', 'from': 'ors', 'msgtype': 'rawpose', 'orientation_type': 'quat', 'orientation_w': 0.7009092642998509, 'orientation_x': 0.0, 'orientation_y': 0.0, 'orientation_z': 0.7132504491541816, 'plane': 0.0, 'position_x': -5.015499999999989, 'position_y': 0.629000000000012, 'position_z': 0.0, 'raw': True, 'senderhostname': None, 'sendersha': '58e9817db955bf6e94799416479b96707343b2b5', 'senderversion': 'v7.0.0'}

client = InfluxDBClient.from_config_file("config.ini")
query_api = client.query_api()

    query = f'''
        from(bucket: "bucket")
          |> range(start: {opt.start}, stop: {opt.stop})
          |> filter(fn: (r) => r._measurement == "ors")
          |> filter(fn: (r) => r["world"] == "ny")
          |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''

    records = query_api.query_stream(query)

    for record in records:
        print(record)
        if record.values["_time"]:
            timestamp_ms = int(record.values["_time"].timestamp() * 1000)
        else:
            print("ors No time")
            continue

Expected behavior:
To always have valid time fields

Actual behavior:
FluxRecord returns '_start': None, '_stop': None, '_time': None

Specifications:

  • Client Version: influxdb-client==1.30.0
  • InfluxDB Version: docker influxdb2.2
  • Platform: Ubuntu 20.04.4 LTS
  • Python 3.8.10
@bednar
Copy link
Contributor

bednar commented Jul 23, 2022

Hi @rdinoff,

thanks for using our client.

The client just parse InfluxDB's response to FluxRecord. It looks like that response from server doesn’t contains required data. Can you check response by enabling debug mode?

InfluxDBClient.from_config_file("config.ini", debug=True)

Regards

@bednar bednar added the question Further information is requested label Jul 23, 2022
@rdinoff
Copy link
Contributor Author

rdinoff commented Jul 27, 2022

I can recreate this problem very consistently using threads, if debug=False. Set debug=True and problem does not occur. Also the problem does not occur if using the asyncio api

#!/usr/bin/python

import threading
from influxdb_client import InfluxDBClient

opt = {
    'start': 1658347680,
    'stop': 1658347685
}

def ors_data():
    client = InfluxDBClient.from_config_file("config.ini", debug=False)
    query_api = client.query_api()

    query = f'''
        from(bucket: "b1")
          |> range(start: {opt['start']}, stop: {opt['stop']})
          |> filter(fn: (r) => r._measurement == "ors")
          |> filter(fn: (r) => r["world"] == "ny")
          |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''

    records = query_api.query_stream(query)

    for record in records:
        print(record)
        if record.values["_time"] is None:
            print("ERROR ors No time")

def haip_data():
    client = InfluxDBClient.from_config_file("config.ini", debug=False)
    query_api = client.query_api()

    query = f'''
        from(bucket: "b1")
          |> range(start: {opt['start']}, stop: {opt['stop']})
          |> filter(fn: (r) => r._measurement == "haip")
          |> filter(fn: (r) => r["world"] == "ny")
          |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''

    records = query_api.query_stream(query)

    for record in records:
        print(record)
        if record.values["_time"] is None:
            print("ERROR: haip No time")

if __name__ == "__main__":
    t2 = threading.Thread(target=ors_data)
    t3 = threading.Thread(target=haip_data)

    t2.start()
    t3.start()

    t2.join()
    t3.join()

@bednar
Copy link
Contributor

bednar commented Jul 28, 2022

@rdinoff thanks for your example.

Unfortunately, I am not able to simulate your issue. I am using following script:

import threading
from influxdb_client import InfluxDBClient

opt = {
    'start': 1658347680,
    'stop': 1658347685
}
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'


def prepare_data():
    print("preparing data...")

    with InfluxDBClient(url=url, token=token, org=org) as client:
        with client.write_api() as write_api:
            for i in range(0, 100_000):
                _timestamp = (opt['start'] * 1000000000) + i
                write_api.write(bucket, org, f'haip,world=ny value={i},value2={i} {_timestamp}')
                write_api.write(bucket, org, f'ors,world=ny value={i},value2={i} {_timestamp}')
    print("done")


def ors_data():
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = client.query_api()

    query = f'''
        from(bucket: "{bucket}")
          |> range(start: {opt['start']}, stop: {opt['stop']})
          |> filter(fn: (r) => r._measurement == "ors")
          |> filter(fn: (r) => r["world"] == "ny")
          |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''

    records = query_api.query_stream(query)

    for record in records:
        if record.values["_time"] is None:
            print("ERROR ors No time")


def haip_data():
    client = InfluxDBClient(url=url, token=token, org=org)
    query_api = client.query_api()

    query = f'''
        from(bucket: "{bucket}")
          |> range(start: {opt['start']}, stop: {opt['stop']})
          |> filter(fn: (r) => r._measurement == "haip")
          |> filter(fn: (r) => r["world"] == "ny")
          |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''

    records = query_api.query_stream(query)

    for record in records:
        if record.values["_time"] is None:
            print("ERROR: haip No time")


if __name__ == "__main__":
    prepare_data()

    t2 = threading.Thread(target=ors_data)
    t3 = threading.Thread(target=haip_data)

    t2.start()
    t3.start()

    t2.join()
    t3.join()
  • InfluxDB 2.0.3
  • Python 3.9.13

Can you share a little bit more about your data - amount and structure?

Thanks

@rdinoff
Copy link
Contributor Author

rdinoff commented Jul 28, 2022

after changing my example code to use InfluxDBClient, I can set debug=True and see the error

You can see the structure form the FluxRecord. The rows are ~300,000 rows (pivot) per measurement, but for the 5 seconds I am reproducing the error when there are only 9 rows (pivot)

send: b'POST /api/v2/query?org=my-org HTTP/1.1\r\nHost: xxx.xxx.xxx-xxx.net:8086\r\nAccept-Encoding: identity\r\nContent-Length: 510\r\nAccept: application/json\r\nContent-Type: application/json\r\nAuthorization: Token token==\r\nUser-Agent: influxdb-client-python/1.30.0\r\n\r\n'
send: b'POST /api/v2/query?org=my-org HTTP/1.1\r\nHost: xxx.xxx.xxx-xxx.net:8086\r\nAccept-Encoding: identity\r\nContent-Length: 509\r\nAccept: application/json\r\nContent-Type: application/json\r\nAuthorization: Token token==\r\nUser-Agent: influxdb-client-python/1.30.0\r\n\r\n'
send: b'{"extern": {"imports": [], "body": []}, "query": "\\n        from(bucket: \\"bucket\\")\\n          |> range(start: 1658347680, stop: 1658347685)\\n          |> filter(fn: (r) => r._measurement == \\"haip\\")\\n          |> filter(fn: (r) => r[\\"world\\"] == \\"ny\\")\\n          |> pivot(rowKey: [\\"_time\\"], columnKey: [\\"_field\\"], valueColumn: \\"_value\\")\\n    ", "dialect": {"header": true, "delimiter": ",", "annotations": ["datatype", "group", "default"], "commentPrefix": "#", "dateTimeFormat": "RFC3339"}}'
send: b'{"extern": {"imports": [], "body": []}, "query": "\\n        from(bucket: \\"bucket\\")\\n          |> range(start: 1658347680, stop: 1658347685)\\n          |> filter(fn: (r) => r._measurement == \\"ors\\")\\n          |> filter(fn: (r) => r[\\"world\\"] == \\"ny\\")\\n          |> pivot(rowKey: [\\"_time\\"], columnKey: [\\"_field\\"], valueColumn: \\"_value\\")\\n    ", "dialect": {"header": true, "delimiter": ",", "annotations": ["datatype", "group", "default"], "commentPrefix": "#", "dateTimeFormat": "RFC3339"}}'
reply: 'HTTP/1.1 200 OK\r\n'
header: Content-Type: text/csv; charset=utf-8
header: Vary: Accept-Encoding
header: X-Influxdb-Build: OSS
header: X-Influxdb-Version: v2.2.0
header: Date: Thu, 28 Jul 2022 13:45:08 GMT
header: Transfer-Encoding: chunked
reply: 'HTTP/1.1 200 OK\r\n'
header: Content-Type: text/csv; charset=utf-8
header: Vary: Accept-Encoding
header: X-Influxdb-Build: OSS
header: X-Influxdb-Version: v2.2.0
header: Date: Thu, 28 Jul 2022 13:45:08 GMT
header: Transfer-Encoding: chunked
FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': None, '_stop': None, '_time': None, '_measurement': 'haip', 'id': '40032517', 'world': 'ny', 'active': True, 'area_id_0': 7.0, 'cs': 'haip', 'event_happened': False, 'from': 'haip', 'map_id': 1.0, 'moving': False, 'reliability': 1.46669341131906, 'x': 11.379555, 'xcov': 1.56107255590408, 'y': 5.5213866, 'ycov': 0.590117006902644, 'z': 0.75, 'zcov': 2.22044604925031e-16}
ERROR: haip No time

@bednar bednar added bug Something isn't working and removed question Further information is requested labels Aug 1, 2022
@bednar bednar added this to the 1.32.0 milestone Aug 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants