From fcc3bec6e84a32ffd59396c792ce4356ca9d92d2 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Fri, 26 Mar 2021 15:20:16 +0100 Subject: [PATCH 1/3] fix: add support for "with .. as .." statement for cleaner exception handling --- influxdb_client/client/influxdb_client.py | 6 ++++++ influxdb_client/client/write_api.py | 6 ++++++ tests/test_InfluxDBClient.py | 23 ++++++++++++++++++++++- 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index 8b793c0d..b9101220 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -71,6 +71,12 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or self.api_client = ApiClient(configuration=conf, header_name=auth_header_name, header_value=auth_header_value, retries=retries) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + @classmethod def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False): """ diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index d5a975ab..0b290143 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -264,6 +264,12 @@ def close(self): """Flush data and dispose a batching buffer.""" self.__del__() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + def __del__(self): """Close WriteApi.""" if self._subject: diff --git a/tests/test_InfluxDBClient.py b/tests/test_InfluxDBClient.py index 31814b3c..a006cb0c 100644 --- a/tests/test_InfluxDBClient.py +++ b/tests/test_InfluxDBClient.py @@ -4,7 +4,8 @@ import threading import unittest -from influxdb_client import InfluxDBClient +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions, WriteType class InfluxDBClientTest(unittest.TestCase): @@ -138,6 +139,26 @@ def _start_http_server(self): self.httpd_thread = threading.Thread(target=self.httpd.serve_forever) self.httpd_thread.start() + def test_write_context_manager(self): + + with InfluxDBClient.from_env_properties(self.debug) as self.client: + api_client = self.client.api_client + with self.client.write_api(write_options=WriteOptions(write_type=WriteType.batching)) as write_api: + write_api_test = write_api + write_api.write(bucket="my-bucket", + record=Point("h2o_feet") + .tag("location", "coyote_creek") + .field("level water_level", 5.0)) + self.assertIsNotNone(write_api._subject) + self.assertIsNotNone(write_api._disposable) + + self.assertIsNone(write_api_test._subject) + self.assertIsNone(write_api_test._disposable) + self.assertIsNotNone(self.client.api_client) + self.assertIsNotNone(self.client.api_client.rest_client.pool_manager) + + self.assertIsNone(api_client._pool) + self.assertIsNone(self.client.api_client) class ServerWithSelfSingedSSL(http.server.SimpleHTTPRequestHandler): def _set_headers(self): From 7d22027fb5befdb0967f837cfb284614ae0ebab0 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Fri, 26 Mar 2021 16:03:47 +0100 Subject: [PATCH 2/3] fix: add support for "with .. as .." statement for cleaner exception handling --- influxdb_client/client/influxdb_client.py | 9 +++++++++ influxdb_client/client/write_api.py | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index b9101220..d38d38ec 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -72,9 +72,18 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or header_value=auth_header_value, retries=retries) def __enter__(self): + """ + Enter the runtime context related to this object. + + It will bind this method’s return value to the target(s) + specified in the `as` clause of the statement. + + return: self instance + """ return self def __exit__(self, exc_type, exc_value, traceback): + """Exit the runtime context related to this object and close the client.""" self.close() @classmethod diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 0b290143..ca058dc0 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -265,9 +265,18 @@ def close(self): self.__del__() def __enter__(self): + """ + Enter the runtime context related to this object. + + It will bind this method’s return value to the target(s) + specified in the `as` clause of the statement. + + return: self instance + """ return self def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the runtime context related to this object and close the WriteApi.""" self.close() def __del__(self): From 591bd606b8c504028f8d0c2060d10910f676ddd6 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 13 Apr 2021 15:22:38 +0200 Subject: [PATCH 3/3] chore: updated examples and README.rst to use with statement --- README.rst | 130 ++++++------ examples/buckets_management.py | 69 ++++--- examples/example.py | 70 ++++--- examples/import_data_set.py | 58 +++--- examples/import_data_set_multiprocessing.py | 143 +++++++------- examples/import_data_set_sync_batching.py | 50 +++-- examples/influx_cloud.py | 8 +- examples/influxdb_18_example.py | 27 ++- examples/ingest_dataframe_default_tags.py | 68 +++---- examples/nanosecond_precision.py | 52 +++-- examples/query.py | 206 ++++++++++---------- examples/query_from_file.py | 72 +++---- 12 files changed, 452 insertions(+), 501 deletions(-) diff --git a/README.rst b/README.rst index ec0618fb..d0c1ce60 100644 --- a/README.rst +++ b/README.rst @@ -220,7 +220,7 @@ Writes The `WriteApi `_ supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a `InfluxDB Line Protocol `_\ , `Data Point `_ or Observable stream. -**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should call ``_write_client.close()`` at the end of your script.** +**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should wrap the execution using ``with client.write_api(...) as write_api:`` statement or call ``_write_client.close()`` at the end of your script.** *The default instance of WriteApi use batching.* @@ -280,73 +280,69 @@ The batching is configurable by ``write_options``\ : from influxdb_client import InfluxDBClient, Point, WriteOptions - _client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") - _write_client = _client.write_api(write_options=WriteOptions(batch_size=500, - flush_interval=10_000, - jitter_interval=2_000, - retry_interval=5_000, - max_retries=5, - max_retry_delay=30_000, - exponential_base=2)) + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as _client: + + with _client.write_api(write_options=WriteOptions(batch_size=500, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2)) as _write_client: + + """ + Write Line Protocol formatted as string + """ + _write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1") + _write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2", + "h2o_feet,location=coyote_creek water_level=3.0 3"]) + + """ + Write Line Protocol formatted as byte array + """ + _write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode()) + _write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(), + "h2o_feet,location=coyote_creek water_level=3.0 3".encode()]) + + """ + Write Dictionary-style object + """ + _write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 1.0}, "time": 1}) + _write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 2.0}, "time": 2}, + {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 3.0}, "time": 3}]) + + """ + Write Data Point + """ + _write_client.write("my-bucket", "my-org", + Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4)) + _write_client.write("my-bucket", "my-org", + [Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5), + Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)]) + + """ + Write Observable stream + """ + _data = rx \ + .range(7, 11) \ + .pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i))) + + _write_client.write("my-bucket", "my-org", _data) + + """ + Write Pandas DataFrame + """ + _now = datetime.now(UTC) + _data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], + index=[_now, _now + timedelta(hours=1)], + columns=["location", "water_level"]) + + _write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet', + data_frame_tag_columns=['location']) - """ - Write Line Protocol formatted as string - """ - _write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1") - _write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2", - "h2o_feet,location=coyote_creek water_level=3.0 3"]) - - """ - Write Line Protocol formatted as byte array - """ - _write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode()) - _write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(), - "h2o_feet,location=coyote_creek water_level=3.0 3".encode()]) - - """ - Write Dictionary-style object - """ - _write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, - "fields": {"water_level": 1.0}, "time": 1}) - _write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, - "fields": {"water_level": 2.0}, "time": 2}, - {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, - "fields": {"water_level": 3.0}, "time": 3}]) - - """ - Write Data Point - """ - _write_client.write("my-bucket", "my-org", - Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4)) - _write_client.write("my-bucket", "my-org", - [Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5), - Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)]) - - """ - Write Observable stream - """ - _data = rx \ - .range(7, 11) \ - .pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i))) - - _write_client.write("my-bucket", "my-org", _data) - - """ - Write Pandas DataFrame - """ - _now = datetime.now(UTC) - _data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], - index=[_now, _now + timedelta(hours=1)], - columns=["location", "water_level"]) - - _write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet', - data_frame_tag_columns=['location']) - - """ - Close client - """ - _write_client.close() - _client.close() Default Tags diff --git a/examples/buckets_management.py b/examples/buckets_management.py index bde37ee8..a26926ee 100644 --- a/examples/buckets_management.py +++ b/examples/buckets_management.py @@ -10,39 +10,38 @@ url = "http://localhost:8086" token = "my-token" -client = InfluxDBClient(url=url, token=token) -buckets_api = client.buckets_api() +with InfluxDBClient(url=url, token=token) as client: + buckets_api = client.buckets_api() + + """ + The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API. + """ + org_name = "my-org" + org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0] + + """ + Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python" + """ + print(f"------- Create -------\n") + retention_rules = BucketRetentionRules(type="expire", every_seconds=3600) + created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python", + retention_rules=retention_rules, + org_id=org.id) + print(created_bucket) + + """ + List all Buckets + """ + print(f"\n------- List -------\n") + buckets = buckets_api.find_buckets().buckets + print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}" + for bucket in buckets])) + print("---") + + """ + Delete previously created bucket + """ + print(f"------- Delete -------\n") + buckets_api.delete_bucket(created_bucket) + print(f" successfully deleted bucket: {created_bucket.name}") -""" -The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API. -""" -org_name = "my-org" -org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0] - -""" -Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python" -""" -print(f"------- Create -------\n") -retention_rules = BucketRetentionRules(type="expire", every_seconds=3600) -created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python", - retention_rules=retention_rules, - org_id=org.id) -print(created_bucket) - -""" -List all Buckets -""" -print(f"\n------- List -------\n") -buckets = buckets_api.find_buckets().buckets -print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}" - for bucket in buckets])) -print("---") - -""" -Delete previously created bucket -""" -print(f"------- Delete -------\n") -buckets_api.delete_bucket(created_bucket) -print(f" successfully deleted bucket: {created_bucket.name}") - -client.close() diff --git a/examples/example.py b/examples/example.py index 32f75c50..73ed813a 100644 --- a/examples/example.py +++ b/examples/example.py @@ -4,39 +4,37 @@ from influxdb_client import WritePrecision, InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS -bucket = "my-bucket" - -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") - -write_api = client.write_api(write_options=SYNCHRONOUS) -query_api = client.query_api() - -p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.now(), WritePrecision.MS) - -# write using point structure -write_api.write(bucket=bucket, record=p) - -line_protocol = p.to_line_protocol() -print(line_protocol) - -# write using line protocol string -write_api.write(bucket=bucket, record=line_protocol) - -# using Table structure -tables = query_api.query('from(bucket:"my-bucket") |> range(start: -1m)') -for table in tables: - print(table) - for record in table.records: - # process record - print(record.values) - -# using csv library -csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)') -val_count = 0 -for record in csv_result: - for cell in record: - val_count += 1 -print("val count: ", val_count) - -response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)') -print (codecs.decode(response.data)) +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: + query_api = client.query_api() + + p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.utcnow(), + WritePrecision.MS) + write_api = client.write_api(write_options=SYNCHRONOUS) + + # write using point structure + write_api.write(bucket="my-bucket", record=p) + + line_protocol = p.to_line_protocol() + print(line_protocol) + + # write using line protocol string + write_api.write(bucket="my-bucket", record=line_protocol) + + # using Table structure + tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') + for table in tables: + print(table) + for record in table.records: + # process record + print(record.values) + + # using csv library + csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)') + val_count = 0 + for record in csv_result: + for cell in record: + val_count += 1 + print("val count: ", val_count) + + response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)') + print (codecs.decode(response.data)) diff --git a/examples/import_data_set.py b/examples/import_data_set.py index d67810b6..0777f82c 100644 --- a/examples/import_data_set.py +++ b/examples/import_data_set.py @@ -60,39 +60,33 @@ def parse_row(row: OrderedDict): .from_iterable(DictReader(open('vix-daily.csv', 'r'))) \ .pipe(ops.map(lambda row: parse_row(row))) -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client: -""" -Create client that writes data in batches with 50_000 items. -""" -write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) - -""" -Write data into InfluxDB -""" -write_api.write(bucket="my-bucket", record=data) -write_api.close() + """ + Create client that writes data in batches with 50_000 items. + """ + with client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) as write_api: -""" -Querying max value of CBOE Volatility Index -""" -query = 'from(bucket:"my-bucket")' \ - ' |> range(start: 0, stop: now())' \ - ' |> filter(fn: (r) => r._measurement == "financial-analysis")' \ - ' |> max()' -result = client.query_api().query(query=query) + """ + Write data into InfluxDB + """ + write_api.write(bucket="my-bucket", record=data) -""" -Processing results -""" -print() -print("=== results ===") -print() -for table in result: - for record in table.records: - print('max {0:5} = {1}'.format(record.get_field(), record.get_value())) + """ + Querying max value of CBOE Volatility Index + """ + query = 'from(bucket:"my-bucket")' \ + ' |> range(start: 0, stop: now())' \ + ' |> filter(fn: (r) => r._measurement == "financial-analysis")' \ + ' |> max()' + result = client.query_api().query(query=query) -""" -Close client -""" -client.close() + """ + Processing results + """ + print() + print("=== results ===") + print() + for table in result: + for record in table.records: + print('max {0:5} = {1}'.format(record.get_field(), record.get_value())) diff --git a/examples/import_data_set_multiprocessing.py b/examples/import_data_set_multiprocessing.py index e5414e94..632bcc8b 100644 --- a/examples/import_data_set_multiprocessing.py +++ b/examples/import_data_set_multiprocessing.py @@ -132,88 +132,89 @@ def init_counter(counter, progress, queue): queue_ = queue -""" -Create multiprocess shared environment -""" -queue_ = multiprocessing.Manager().Queue() -counter_ = Value('i', 0) -progress_ = Value('i', 0) -startTime = datetime.now() - -url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv" -# url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv" +if __name__ == "__main__": + """ + Create multiprocess shared environment + """ + queue_ = multiprocessing.Manager().Queue() + counter_ = Value('i', 0) + progress_ = Value('i', 0) + startTime = datetime.now() -""" -Open URL and for stream data -""" -response = urlopen(url) -if response.headers: - content_length = response.headers['Content-length'] -io_wrapper = ProgressTextIOWrapper(response) -io_wrapper.progress = progress_ + url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv" + # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv" -""" -Start writer as a new process -""" -writer = InfluxDBWriter(queue_) -writer.start() + """ + Open URL and for stream data + """ + response = urlopen(url) + if response.headers: + content_length = response.headers['Content-length'] + io_wrapper = ProgressTextIOWrapper(response) + io_wrapper.progress = progress_ -""" -Create process pool for parallel encoding into LineProtocol -""" -cpu_count = multiprocessing.cpu_count() -with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, - initargs=(counter_, progress_, queue_)) as executor: """ - Converts incoming HTTP stream into sequence of LineProtocol + Start writer as a new process """ - data = rx \ - .from_iterable(DictReader(io_wrapper)) \ - .pipe(ops.buffer_with_count(10_000), - # Parse 10_000 rows into LineProtocol on subprocess - ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length))) + writer = InfluxDBWriter(queue_) + writer.start() """ - Write data into InfluxDB + Create process pool for parallel encoding into LineProtocol """ - data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) + cpu_count = multiprocessing.cpu_count() + with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, + initargs=(counter_, progress_, queue_)) as executor: + """ + Converts incoming HTTP stream into sequence of LineProtocol + """ + data = rx \ + .from_iterable(DictReader(io_wrapper)) \ + .pipe(ops.buffer_with_count(10_000), + # Parse 10_000 rows into LineProtocol on subprocess + ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length))) + + """ + Write data into InfluxDB + """ + data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) -""" -Terminate Writer -""" -queue_.put(None) -queue_.join() + """ + Terminate Writer + """ + queue_.put(None) + queue_.join() -print() -print(f'Import finished in: {datetime.now() - startTime}') -print() + print() + print(f'Import finished in: {datetime.now() - startTime}') + print() -""" -Querying 10 pickups from dispatching 'B00008' -""" -query = 'from(bucket:"my-bucket")' \ - '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ - '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ - '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ - '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ - '|> rename(columns: {_time: "pickup_datetime"})' \ - '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' + """ + Querying 10 pickups from dispatching 'B00008' + """ + query = 'from(bucket:"my-bucket")' \ + '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ + '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ + '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ + '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ + '|> rename(columns: {_time: "pickup_datetime"})' \ + '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) -result = client.query_api().query(query=query) + client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) + result = client.query_api().query(query=query) -""" -Processing results -""" -print() -print("=== Querying 10 pickups from dispatching 'B00008' ===") -print() -for table in result: - for record in table.records: - print( - f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') + """ + Processing results + """ + print() + print("=== Querying 10 pickups from dispatching 'B00008' ===") + print() + for table in result: + for record in table.records: + print( + f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') -""" -Close client -""" -client.close() + """ + Close client + """ + client.close() diff --git a/examples/import_data_set_sync_batching.py b/examples/import_data_set_sync_batching.py index 19974314..f5e548a5 100644 --- a/examples/import_data_set_sync_batching.py +++ b/examples/import_data_set_sync_batching.py @@ -31,38 +31,32 @@ def csv_to_generator(csv_file_path): Define Retry strategy - 3 attempts => 2, 4, 8 """ retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2) -client = InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) - -""" -Use synchronous version of WriteApi to strongly depends on result of write -""" -write_api = client.write_api(write_options=SYNCHRONOUS) - -""" -Prepare batches from generator -""" -batches = rx \ - .from_iterable(csv_to_generator('vix-daily.csv')) \ - .pipe(ops.buffer_with_count(500)) +with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client: + """ + Use synchronous version of WriteApi to strongly depends on result of write + """ + write_api = client.write_api(write_options=SYNCHRONOUS) -def write_batch(batch): """ - Synchronous write + Prepare batches from generator """ - print(f'Writing... {len(batch)}') - write_api.write(bucket='my-bucket', record=batch) + batches = rx \ + .from_iterable(csv_to_generator('vix-daily.csv')) \ + .pipe(ops.buffer_with_count(500)) -""" -Write batches -""" -batches.subscribe(on_next=lambda batch: write_batch(batch), - on_error=lambda ex: print(f'Unexpected error: {ex}'), - on_completed=lambda: print('Import finished!')) + def write_batch(batch): + """ + Synchronous write + """ + print(f'Writing... {len(batch)}') + write_api.write(bucket='my-bucket', record=batch) -""" -Dispose client -""" -write_api.close() -client.close() + + """ + Write batches + """ + batches.subscribe(on_next=lambda batch: write_batch(batch), + on_error=lambda ex: print(f'Unexpected error: {ex}'), + on_completed=lambda: print('Import finished!')) diff --git a/examples/influx_cloud.py b/examples/influx_cloud.py index 2eab8e1d..6c8ed6f2 100644 --- a/examples/influx_cloud.py +++ b/examples/influx_cloud.py @@ -15,8 +15,7 @@ bucket = '...' org = '...' -client = InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token) -try: +with InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token) as client: kind = 'temperature' host = 'host1' device = 'opt-123' @@ -53,8 +52,3 @@ print() print('success') - -except Exception as e: - print(e) -finally: - client.close() diff --git a/examples/influxdb_18_example.py b/examples/influxdb_18_example.py index 24bd1561..07413b32 100644 --- a/examples/influxdb_18_example.py +++ b/examples/influxdb_18_example.py @@ -8,24 +8,21 @@ bucket = f'{database}/{retention_policy}' -client = InfluxDBClient(url='http://localhost:8086', token=f'{username}:{password}', org='-') +with InfluxDBClient(url='http://localhost:8086', token=f'{username}:{password}', org='-') as client: -print('*** Write Points ***') + with client.write_api() as write_api: + print('*** Write Points ***') -write_api = client.write_api() + point = Point("mem").tag("host", "host1").field("used_percent", 25.43234543) + print(point.to_line_protocol()) -point = Point("mem").tag("host", "host1").field("used_percent", 25.43234543) -print(point.to_line_protocol()) + write_api.write(bucket=bucket, record=point) -write_api.write(bucket=bucket, record=point) -write_api.close() + print('*** Query Points ***') -print('*** Query Points ***') + query_api = client.query_api() + query = f'from(bucket: \"{bucket}\") |> range(start: -1h)' + tables = query_api.query(query) + for record in tables[0].records: + print(f'#{record.get_time()} #{record.get_measurement()}: #{record.get_field()} #{record.get_value()}') -query_api = client.query_api() -query = f'from(bucket: \"{bucket}\") |> range(start: -1h)' -tables = query_api.query(query) -for record in tables[0].records: - print(f'#{record.get_time()} #{record.get_measurement()}: #{record.get_field()} #{record.get_value()}') - -client.close() diff --git a/examples/ingest_dataframe_default_tags.py b/examples/ingest_dataframe_default_tags.py index a8d9485a..4218a43e 100644 --- a/examples/ingest_dataframe_default_tags.py +++ b/examples/ingest_dataframe_default_tags.py @@ -13,40 +13,34 @@ df = pd.read_csv("vix-daily.csv") print(df.head()) -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") - -""" -Ingest DataFrame with default tags -""" -point_settings = PointSettings(**{"type": "vix-daily"}) -point_settings.add_default_tag("example-name", "ingest-data-frame") - -write_api = client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings) -write_api.write(bucket="my-bucket", record=df, data_frame_measurement_name="financial-analysis-df") - -""" -Querying ingested data -""" -query = 'from(bucket:"my-bucket")' \ - ' |> range(start: 0, stop: now())' \ - ' |> filter(fn: (r) => r._measurement == "financial-analysis-df")' \ - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ - ' |> limit(n:10, offset: 0)' -result = client.query_api().query(query=query) - -""" -Processing results -""" -print() -print("=== results ===") -print() -for table in result: - for record in table.records: - print('{4}: Open {0}, Close {1}, High {2}, Low {3}'.format(record["VIX Open"], record["VIX Close"], - record["VIX High"], record["VIX Low"], - record["type"])) - -""" -Close client -""" -client.close() +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + """ + Ingest DataFrame with default tags + """ + point_settings = PointSettings(**{"type": "vix-daily"}) + point_settings.add_default_tag("example-name", "ingest-data-frame") + + write_api = client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings) + write_api.write(bucket="my-bucket", record=df, data_frame_measurement_name="financial-analysis-df") + + """ + Querying ingested data + """ + query = 'from(bucket:"my-bucket")' \ + ' |> range(start: 0, stop: now())' \ + ' |> filter(fn: (r) => r._measurement == "financial-analysis-df")' \ + ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ + ' |> limit(n:10, offset: 0)' + result = client.query_api().query(query=query) + + """ + Processing results + """ + print() + print("=== results ===") + print() + for table in result: + for record in table.records: + print('{4}: Open {0}, Close {1}, High {2}, Low {3}'.format(record["VIX Open"], record["VIX Close"], + record["VIX High"], record["VIX Low"], + record["type"])) diff --git a/examples/nanosecond_precision.py b/examples/nanosecond_precision.py index cedba872..3c8e3c81 100644 --- a/examples/nanosecond_precision.py +++ b/examples/nanosecond_precision.py @@ -12,39 +12,35 @@ """ Prepare client. """ -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: -write_api = client.write_api(write_options=SYNCHRONOUS) -query_api = client.query_api() + write_api = client.write_api(write_options=SYNCHRONOUS) + """ + Prepare data + """ -""" -Prepare data -""" + point = Point("h2o_feet") \ + .field("water_level", 10) \ + .tag("location", "pacific") \ + .time('1996-02-25T21:20:00.001001231Z') -point = Point("h2o_feet") \ - .field("water_level", 10) \ - .tag("location", "pacific") \ - .time('1996-02-25T21:20:00.001001231Z') + print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}') + print() -print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}') -print() + write_api.write(bucket="my-bucket", record=point) -write_api.write(bucket="my-bucket", record=point) + query_api = client.query_api() -""" -Query: using Stream -""" -query = ''' -from(bucket:"my-bucket") - |> range(start: 0, stop: now()) - |> filter(fn: (r) => r._measurement == "h2o_feet") -''' -records = query_api.query_stream(query) + """ + Query: using Stream + """ + query = ''' + from(bucket:"my-bucket") + |> range(start: 0, stop: now()) + |> filter(fn: (r) => r._measurement == "h2o_feet") + ''' + records = query_api.query_stream(query) -for record in records: - print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}') + for record in records: + print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}') -""" -Close client -""" -client.close() diff --git a/examples/query.py b/examples/query.py index 540bdf02..ea2fff10 100644 --- a/examples/query.py +++ b/examples/query.py @@ -3,109 +3,105 @@ from influxdb_client import InfluxDBClient, Point, Dialect from influxdb_client.client.write_api import SYNCHRONOUS -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True) - -write_api = client.write_api(write_options=SYNCHRONOUS) -query_api = client.query_api() - -""" -Prepare data -""" - -_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) -_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3) - -write_api.write(bucket="my-bucket", record=[_point1, _point2]) - -""" -Query: using Table structure -""" -tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') - -for table in tables: - print(table) - for record in table.records: - print(record.values) - -print() -print() - -""" -Query: using Bind parameters -""" - -p = {"_start": datetime.timedelta(hours=-1), - "_location": "Prague", - "_desc": True, - "_floatParam": 25.1, - "_every": datetime.timedelta(minutes=5) - } - -tables = query_api.query(''' - from(bucket:"my-bucket") |> range(start: _start) +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True) as client: + + write_api = client.write_api(write_options=SYNCHRONOUS) + + """ + Prepare data + """ + + _point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) + _point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3) + + write_api.write(bucket="my-bucket", record=[_point1, _point2]) + + query_api = client.query_api() + + """ + Query: using Table structure + """ + tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)') + + for table in tables: + print(table) + for record in table.records: + print(record.values) + + print() + print() + + """ + Query: using Bind parameters + """ + + p = {"_start": datetime.timedelta(hours=-1), + "_location": "Prague", + "_desc": True, + "_floatParam": 25.1, + "_every": datetime.timedelta(minutes=5) + } + + tables = query_api.query(''' + from(bucket:"my-bucket") |> range(start: _start) + |> filter(fn: (r) => r["_measurement"] == "my_measurement") + |> filter(fn: (r) => r["_field"] == "temperature") + |> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam) + |> aggregateWindow(every: _every, fn: mean, createEmpty: true) + |> sort(columns: ["_time"], desc: _desc) + ''', params=p) + + for table in tables: + print(table) + for record in table.records: + print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"])) + + print() + print() + + """ + Query: using Stream + """ + records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)') + + for record in records: + print(f'Temperature in {record["location"]} is {record["_value"]}') + + """ + Interrupt a stream after retrieve a required data + """ + large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)') + for record in large_stream: + if record["location"] == "New York": + print(f'New York temperature: {record["_value"]}') + break + + large_stream.close() + + print() + print() + + """ + Query: using csv library + """ + csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)', + dialect=Dialect(header=False, delimiter=",", comment_prefix="#", annotations=[], + date_time_format="RFC3339")) + for csv_line in csv_result: + if not len(csv_line) == 0: + print(f'Temperature in {csv_line[9]} is {csv_line[6]}') + + print() + print() + + """ + Query: using Pandas DataFrame + """ + data_frame = query_api.query_data_frame(''' + from(bucket:"my-bucket") + |> range(start: -10m) |> filter(fn: (r) => r["_measurement"] == "my_measurement") - |> filter(fn: (r) => r["_field"] == "temperature") - |> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam) - |> aggregateWindow(every: _every, fn: mean, createEmpty: true) - |> sort(columns: ["_time"], desc: _desc) -''', params=p) - -for table in tables: - print(table) - for record in table.records: - print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"])) - -print() -print() - -""" -Query: using Stream -""" -records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)') - -for record in records: - print(f'Temperature in {record["location"]} is {record["_value"]}') - -""" -Interrupt a stream after retrieve a required data -""" -large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)') -for record in large_stream: - if record["location"] == "New York": - print(f'New York temperature: {record["_value"]}') - break - -large_stream.close() - -print() -print() - -""" -Query: using csv library -""" -csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)', - dialect=Dialect(header=False, delimiter=",", comment_prefix="#", annotations=[], - date_time_format="RFC3339")) -for csv_line in csv_result: - if not len(csv_line) == 0: - print(f'Temperature in {csv_line[9]} is {csv_line[6]}') - -print() -print() - -""" -Query: using Pandas DataFrame -""" -data_frame = query_api.query_data_frame(''' -from(bucket:"my-bucket") - |> range(start: -10m) - |> filter(fn: (r) => r["_measurement"] == "my_measurement") - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") - |> keep(columns: ["_time","location", "temperature"]) -''') -print(data_frame.to_string()) - -""" -Close client -""" -client.close() + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> keep(columns: ["_time","location", "temperature"]) + ''') + print(data_frame.to_string()) diff --git a/examples/query_from_file.py b/examples/query_from_file.py index 0411d82a..d1a545de 100644 --- a/examples/query_from_file.py +++ b/examples/query_from_file.py @@ -10,45 +10,37 @@ from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS -client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") - -write_api = client.write_api(write_options=SYNCHRONOUS) -query_api = client.query_api() - -""" -Prepare data -""" - -_points = [] -now = datetime.now(UTC).replace(hour=13, minute=20, second=15, microsecond=0) -for i in range(50): - _point = Point("weather")\ - .tag("location", "New York")\ - .field("temperature", random.randint(-10, 30))\ - .time(now - timedelta(days=i)) - _points.append(_point) - -write_api.write(bucket="my-bucket", record=_points) - -""" -Query: using Flux from file -""" -with open('query.flux', 'r') as file: - query = file.read() - -tables = query_api.query(query) - -for table in tables: - for record in table.records: - day_name = calendar.day_name[record["weekDay"]] - print(f'Temperature in {record["location"]} is {record["temperature"]}°C at {day_name}') - -""" -Close client -""" -client.close() - - - +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + + write_api = client.write_api(write_options=SYNCHRONOUS) + """ + Prepare data + """ + + _points = [] + now = datetime.now(UTC).replace(hour=13, minute=20, second=15, microsecond=0) + for i in range(50): + _point = Point("weather")\ + .tag("location", "New York")\ + .field("temperature", random.randint(-10, 30))\ + .time(now - timedelta(days=i)) + _points.append(_point) + + write_api.write(bucket="my-bucket", record=_points) + + query_api = client.query_api() + + """ + Query: using Flux from file + """ + with open('query.flux', 'r') as file: + query = file.read() + + tables = query_api.query(query) + + for table in tables: + for record in table.records: + day_name = calendar.day_name[record["weekDay"]] + print(f'Temperature in {record["location"]} is {record["temperature"]}°C at {day_name}')