Skip to content

Python default write-api commands no longer writing to bucket #458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
SeanMayer335174 opened this issue Jun 21, 2022 · 6 comments
Closed
Labels
question Further information is requested

Comments

@SeanMayer335174
Copy link

SeanMayer335174 commented Jun 21, 2022

I've been trying to write to a localhost Influxdb 2.2 server I have running on a command terminal. Using the default python write_api commands I try to write to the system with the following code:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api  import SYNCHRONOUS
import datetime

my_url = "http://localhost:8086"
my_bucket= "bucket_name"
my_org = "my_org"
my_token = "123&4lkJnwq@#2#$LFE@#f2#2f#23ffa==asdfASd2o3/a.sd'sasd=="

point_name = "measurement"
field1 = {"frequency": 50}
field2 = {"amplitude": 3}
my_tag = {"to_test" : "atest"}

with InfluxDBClient(url=my_url, token=my_token, org=my_org) as client:
    writer = client.write_api(write_options=SYNCHRONOUS)
    my_point = Point(point_name)
    my_point.tag(list(my_tag.keys())[0], my_tag[list(my_tag.keys())[0]]]
    my_point.field(list(field1.keys())[0], field1[list(field1.keys())[0]]]
    my_point.field(list(field2.keys())[0], field2[list(field2.keys())[0]]]
    my_point.time(datetime.datetime.utcnow(), WritePrecision.NS)

    writer.write(my_bucket, my_org, my_point)

This code runs without exception every time I run it. I submitted it for approval after I confirmed the data would input to the bucket and remain there.

Then, without exception or change, the write function stopped changing the bucket. The program would execute just fine, we would even add print statements to examine my_point and make sure all of the information was correct, and complete with a confirmation message. Yet when inspecting the bucket no new changes would be found.

Things I've tried:

Inspected bucket retention policy: Using the python bucket_api system I was able to download the bucket's retention policy:

from dateutil.tz import tzutc  #Needed for eval translation
with InfluxDBClient(url=my_url, token=my_token, org=my_org) as client:
    bucket_api = client.buckets_api()
    bucketInfo = eval(str(bucket_api.find_buckets()))["buckets"]
myBucketInfo = [aBucketsInfo for aBucketsInfo in bucketInfo if aBucketsInfo["name"] == my_bucket][0]
print(myBucketInfo["retention_rules"])

This would output:

[{'every_seconds': 0, 'shard_group_duration_seconds': 604800, 'type': 'expire'}]

Which, as far as I can tell, doesn't seem to conflict with anything I was doing.

Flushed writer memory:

I had heard adding the following code would flush the writer api, and eliminate some of the problems I was having:

   writer.write(my_bucket, my_org, my_point)
    writer.__del__()
    writer.close()
    client.__del__()
    client.close()

No effect.

Tried adding http message processing system:

Frustrated I wasn't even getting exceptions, I went to the influxdb github and found the following to help detect http success and fail messages to help me troubleshoot:

class BatchingCallback(object):

    def success(self, conf: (str, str, str), data: str):
        """Successfully writen batch."""
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
        """Unsuccessfully writen batch."""
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
        """Retryable error."""
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()
with InfluxDBClient(url=my_url, token=my_token, org=my_org) as client:
    writer = client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry, write_options=SYNCHRONOUS)
    writer.write(my_bucket, my_org, my_point)

Still didn't actually write to the bucket, still didn't provide any messages or exceptions when it failed.

Conclusion: I'm not sure where to go with this. The code was basically adapted from the default code influxdb supplied. I know it worked once, so the problem isn't how the code is constructed. There must be a problem with how the bucket is being managed? It's a simple bucket I didn't do anything with, this was a test solution, but it seems to have blown up in my face. Nothing seems to add data to the bucket anymore and I can't think of anything else I can do to solve it.

@bednar
Copy link
Contributor

bednar commented Jun 21, 2022

Hi @SeanMayer335174,

thanks for using our client.

I've been trying to write to a localhost Influxdb 2.2 server I have running on a command terminal. Using the default python write_api commands I try to write to the system with the following code:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api  import SYNCHRONOUS
import datetime

my_url = "http://localhost:8086"
my_bucket= "bucket_name"
my_org = "my_org"
my_token = "123&4lkJnwq@#2#$LFE@#f2#2f#23ffa==asdfASd2o3/a.sd'sasd=="

point_name = "measurement"
field1 = {"frequency": 50}
field2 = {"amplitude": 3}
my_tag = {"to_test" : "atest"}

with InfluxDBClient(url=my_url, token=my_token, org=my_org) as client:
    writer = client.write_api(write_options=SYNCHRONOUS)
    my_point = Point(point_name)
    my_point.tag(list(my_tag.keys())[0], my_tag[list(my_tag.keys())[0]]]
    my_point.field(list(field1.keys())[0], field1[list(field1.keys())[0]]]
    my_point.field(list(field2.keys())[0], field2[list(field2.keys())[0]]]
    my_point.time(datetime.datetime.utcnow(), WritePrecision.NS)

    writer.write(my_bucket, my_org, my_point)

Your code looks fine. I've tested it and everything looks good. Try to enable debug logging on the client by: debug=True. Here is my test script:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime

my_url = "http://localhost:8086"
my_bucket = "my-bucket"
my_org = "my-org"
my_token = "my-token"

point_name = "measurement"
field1 = {"frequency": 50}
field2 = {"amplitude": 3}
my_tag = {"to_test": "atest"}

with InfluxDBClient(url=my_url, token=my_token, org=my_org, debug=True) as client:
    writer = client.write_api(write_options=SYNCHRONOUS)
    my_point = Point(point_name)
    my_point.tag(list(my_tag.keys())[0], my_tag[list(my_tag.keys())[0]])
    my_point.field(list(field1.keys())[0], field1[list(field1.keys())[0]])
    my_point.field(list(field2.keys())[0], field2[list(field2.keys())[0]])
    my_point.time(datetime.datetime.utcnow(), WritePrecision.NS)

    writer.write(my_bucket, my_org, my_point)

    csv_lines = client.query_api().query_csv(f"from(bucket: \"{my_bucket}\") |> range(start: -10m) |> group()")
    for csv_line in csv_lines:
        print(csv_line)

Regards

@bednar bednar added the question Further information is requested label Jun 21, 2022
@SeanMayer335174
Copy link
Author

I tried your debug recommendation. It did print out some details, although I'm struggling to troubleshoot them:

send: b'POST /api/v2/write?org=my-org&bucket=my_bucket&precision=ns HTTP/1.1\r\nHost: localhost:8086\r\nAccept-Encoding: identity\r\nContent-Length: 82\r\nContent-Encoding: identity\r\nContent-Type: text/plain\r\nAccept: application/json\r\nAuthorization: Token 123&4lkJnwq@#2#$LFE@#f2#2f#23ffa==asdfASd2o3/a.sd'sasd==\r\nUser-Agent: influxdb-client-python/1.29.1\r\n\r\n'
send: b'measurement,to_test=atest amplitude=3i,frequency=50i 1655846102572576000'
reply: 'HTTP/1.1 204 No Content\r\n'
header: X-Influxdb-Build: OSS
header: X-Influxdb-Version: 2.1.1
header: Date: Tue, 21 Jun 2022 21:15:02 GMT

Unfortunately I don't know what to do with this information. The "No Content" reply seems to indicate the client is just ignoring the write request. Beyond that, the send appears to be correct, it is just not affecting the client.

I checked to make sure the client was running actively, which I can confirm. Still can't figure out what the problem here is.

@bednar
Copy link
Contributor

bednar commented Jun 22, 2022

I tried your debug recommendation. It did print out some details, although I'm struggling to troubleshoot them:

send: b'POST /api/v2/write?org=my-org&bucket=my_bucket&precision=ns HTTP/1.1\r\nHost: localhost:8086\r\nAccept-Encoding: identity\r\nContent-Length: 82\r\nContent-Encoding: identity\r\nContent-Type: text/plain\r\nAccept: application/json\r\nAuthorization: Token 123&4lkJnwq@#2#$LFE@#f2#2f#23ffa==asdfASd2o3/a.sd'sasd==\r\nUser-Agent: influxdb-client-python/1.29.1\r\n\r\n'
send: b'measurement,to_test=atest amplitude=3i,frequency=50i 1655846102572576000'
reply: 'HTTP/1.1 204 No Content\r\n'
header: X-Influxdb-Build: OSS
header: X-Influxdb-Version: 2.1.1
header: Date: Tue, 21 Jun 2022 21:15:02 GMT

Unfortunately I don't know what to do with this information. The "No Content" reply seems to indicate the client is just ignoring the write request. Beyond that, the send appears to be correct, it is just not affecting the client.

I checked to make sure the client was running actively, which I can confirm. Still can't figure out what the problem here is.

The No Content is success response from the InfluxDB. measurement,to_test=atest amplitude=3i,frequency=50i 1655846102572576000 was successfully written into database.

Did you try querying your written data by:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime

my_url = "http://localhost:8086"
my_bucket = "my-bucket"
my_org = "my-org"
my_token = "my-token"

point_name = "measurement"
field1 = {"frequency": 50}
field2 = {"amplitude": 3}
my_tag = {"to_test": "atest"}

with InfluxDBClient(url=my_url, token=my_token, org=my_org, debug=True) as client:
    writer = client.write_api(write_options=SYNCHRONOUS)
    my_point = Point(point_name)
    my_point.tag(list(my_tag.keys())[0], my_tag[list(my_tag.keys())[0]])
    my_point.field(list(field1.keys())[0], field1[list(field1.keys())[0]])
    my_point.field(list(field2.keys())[0], field2[list(field2.keys())[0]])
    my_point.time(datetime.datetime.utcnow(), WritePrecision.NS)

    writer.write(my_bucket, my_org, my_point)

    csv_lines = client.query_api().query_csv(f"from(bucket: \"{my_bucket}\") |> range(start: -10m) |> group()")
    for csv_line in csv_lines:
        print(csv_line)

?

@bednar
Copy link
Contributor

bednar commented Jul 9, 2022

This issue has been closed because it has not had recent activity. Please reopen if this issue is still important to you and you have additionally information.

@bednar bednar closed this as completed Jul 9, 2022
@SamReub
Copy link

SamReub commented Jan 10, 2024

I have a similar issue as SeanMayer335174, no feedback if the write is succesfull or not. I tried the callback code from the examples but get no response. Setting debug=True in the client does help but it is more complex to capture this feedback as it is send to the std.out. Therefore I'm hoping to monitor the callback.succes output to check if the write was succesfull. Any idea what is wrong in my code, why don't I get a callback response?

class BatchingCallback(object):
    # https://influxdb-client.readthedocs.io/en/latest/usage.html#handling-errors
    # The only exception is batching WriteAPI (for more info see Batching), 
    # where you need to register custom callbacks to handle batch events. 
    # This is because this API runs in the background in a separate thread 
    # and isn’t possible to directly return underlying exceptions.

    def success(self, conf: (str, str, str), data: str):
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

with InfluxDBClient(**influxDB, debug=False) as client:
    try:
        callback = BatchingCallback()

        with client.write_api(write_options=SYNCHRONOUS, 
                            point_settings=point_settings,
                            success_callback=callback.success,
                            error_callback=callback.error,
                            retry_callback=callback.retry) as write_api:
            write_api.write(bucket=bucket, 
                            record=df_write, 
                            data_frame_measurement_name='Electricity',
                            write_precision=WritePrecision.MS)

    except InfluxDBError as e:
        raise Exception(f"InfluxDBError") from e

@bednar
Copy link
Contributor

bednar commented Jan 11, 2024

Hi @SamReub,

You are using synchronous write mode. In this mode, the data is immediately written to the database. So if there is an error, it throws immediately.

Bes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants