Skip to content

fix: add support for "with .. as .." statement for cleaner exception … #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 63 additions & 67 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Writes
The `WriteApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write_api.py>`_ supports synchronous, asynchronous and batching writes into InfluxDB 2.0.
The data should be passed as a `InfluxDB Line Protocol <https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/>`_\ , `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py>`_ or Observable stream.

**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should call ``_write_client.close()`` at the end of your script.**
**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should wrap the execution using ``with client.write_api(...) as write_api:`` statement or call ``_write_client.close()`` at the end of your script.**

*The default instance of WriteApi use batching.*

Expand Down Expand Up @@ -280,73 +280,69 @@ The batching is configurable by ``write_options``\ :

from influxdb_client import InfluxDBClient, Point, WriteOptions

_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2))
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as _client:

with _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)) as _write_client:

"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
"h2o_feet,location=coyote_creek water_level=3.0 3"])

"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])

"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 2.0}, "time": 2},
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 3.0}, "time": 3}])

"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org",
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org",
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])

"""
Write Observable stream
"""
_data = rx \
.range(7, 11) \
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))

_write_client.write("my-bucket", "my-org", _data)

"""
Write Pandas DataFrame
"""
_now = datetime.now(UTC)
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[_now, _now + timedelta(hours=1)],
columns=["location", "water_level"])

_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
"h2o_feet,location=coyote_creek water_level=3.0 3"])

"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])

"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 2.0}, "time": 2},
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 3.0}, "time": 3}])

"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org",
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org",
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])

"""
Write Observable stream
"""
_data = rx \
.range(7, 11) \
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))

_write_client.write("my-bucket", "my-org", _data)

"""
Write Pandas DataFrame
"""
_now = datetime.now(UTC)
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[_now, _now + timedelta(hours=1)],
columns=["location", "water_level"])

_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])

"""
Close client
"""
_write_client.close()
_client.close()


Default Tags
Expand Down
69 changes: 34 additions & 35 deletions examples/buckets_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,38 @@
url = "http://localhost:8086"
token = "my-token"

client = InfluxDBClient(url=url, token=token)
buckets_api = client.buckets_api()
with InfluxDBClient(url=url, token=token) as client:
buckets_api = client.buckets_api()

"""
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
"""
org_name = "my-org"
org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0]

"""
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
"""
print(f"------- Create -------\n")
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
retention_rules=retention_rules,
org_id=org.id)
print(created_bucket)

"""
List all Buckets
"""
print(f"\n------- List -------\n")
buckets = buckets_api.find_buckets().buckets
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
for bucket in buckets]))
print("---")

"""
Delete previously created bucket
"""
print(f"------- Delete -------\n")
buckets_api.delete_bucket(created_bucket)
print(f" successfully deleted bucket: {created_bucket.name}")

"""
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
"""
org_name = "my-org"
org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0]

"""
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
"""
print(f"------- Create -------\n")
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
retention_rules=retention_rules,
org_id=org.id)
print(created_bucket)

"""
List all Buckets
"""
print(f"\n------- List -------\n")
buckets = buckets_api.find_buckets().buckets
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
for bucket in buckets]))
print("---")

"""
Delete previously created bucket
"""
print(f"------- Delete -------\n")
buckets_api.delete_bucket(created_bucket)
print(f" successfully deleted bucket: {created_bucket.name}")

client.close()
70 changes: 34 additions & 36 deletions examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,37 @@
from influxdb_client import WritePrecision, InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.now(), WritePrecision.MS)

# write using point structure
write_api.write(bucket=bucket, record=p)

line_protocol = p.to_line_protocol()
print(line_protocol)

# write using line protocol string
write_api.write(bucket=bucket, record=line_protocol)

# using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -1m)')
for table in tables:
print(table)
for record in table.records:
# process record
print(record.values)

# using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for record in csv_result:
for cell in record:
val_count += 1
print("val count: ", val_count)

response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
print (codecs.decode(response.data))
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.utcnow(),
WritePrecision.MS)
write_api = client.write_api(write_options=SYNCHRONOUS)

# write using point structure
write_api.write(bucket="my-bucket", record=p)

line_protocol = p.to_line_protocol()
print(line_protocol)

# write using line protocol string
write_api.write(bucket="my-bucket", record=line_protocol)

# using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for record in table.records:
# process record
print(record.values)

# using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for record in csv_result:
for cell in record:
val_count += 1
print("val count: ", val_count)

response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
print (codecs.decode(response.data))
58 changes: 26 additions & 32 deletions examples/import_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,39 +60,33 @@ def parse_row(row: OrderedDict):
.from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
.pipe(ops.map(lambda row: parse_row(row)))

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:

"""
Create client that writes data in batches with 50_000 items.
"""
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))

"""
Write data into InfluxDB
"""
write_api.write(bucket="my-bucket", record=data)
write_api.close()
"""
Create client that writes data in batches with 50_000 items.
"""
with client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) as write_api:

"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
' |> range(start: 0, stop: now())' \
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
' |> max()'
result = client.query_api().query(query=query)
"""
Write data into InfluxDB
"""
write_api.write(bucket="my-bucket", record=data)

"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
for record in table.records:
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
' |> range(start: 0, stop: now())' \
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
' |> max()'
result = client.query_api().query(query=query)

"""
Close client
"""
client.close()
"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
for record in table.records:
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
Loading