From 52d4d6a197494acd3fa5b95359cf5aa89872f69f Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Mon, 12 Aug 2024 14:24:15 +0200 Subject: [PATCH 1/2] fix: multiprocessing example --- examples/import_data_set_multiprocessing.py | 122 ++++++++++---------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/examples/import_data_set_multiprocessing.py b/examples/import_data_set_multiprocessing.py index 60de64c5..a813fe51 100644 --- a/examples/import_data_set_multiprocessing.py +++ b/examples/import_data_set_multiprocessing.py @@ -4,6 +4,7 @@ https://github.com/toddwschneider/nyc-taxi-data """ import concurrent.futures +import gzip import io import multiprocessing from collections import OrderedDict @@ -92,10 +93,10 @@ def parse_row(row: OrderedDict): return Point("taxi-trip-data") \ .tag("dispatching_base_num", row['dispatching_base_num']) \ - .tag("PULocationID", row['PULocationID']) \ - .tag("DOLocationID", row['DOLocationID']) \ + .tag("PULocationID", row['PUlocationID']) \ + .tag("DOLocationID", row['DOlocationID']) \ .tag("SR_Flag", row['SR_Flag']) \ - .field("dropoff_datetime", row['dropoff_datetime']) \ + .field("dropoff_datetime", row['dropOff_datetime']) \ .time(row['pickup_datetime']) \ .to_line_protocol() @@ -141,8 +142,7 @@ def init_counter(counter, progress, queue): progress_ = Value('i', 0) startTime = datetime.now() - url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv" - # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv" + url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz" """ Open URL and for stream data @@ -150,71 +150,71 @@ def init_counter(counter, progress, queue): response = urlopen(url) if response.headers: content_length = response.headers['Content-length'] - io_wrapper = ProgressTextIOWrapper(response) - io_wrapper.progress = progress_ """ - Start writer as a new process + Open GZIP stream """ - writer = InfluxDBWriter(queue_) - writer.start() + with gzip.open(response, 'rb') as stream: + io_wrapper = ProgressTextIOWrapper(stream, encoding='utf-8') + io_wrapper.progress = progress_ - """ - Create process pool for parallel encoding into LineProtocol - """ - cpu_count = multiprocessing.cpu_count() - with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, - initargs=(counter_, progress_, queue_)) as executor: """ - Converts incoming HTTP stream into sequence of LineProtocol + Start writer as a new process """ - data = rx \ - .from_iterable(DictReader(io_wrapper)) \ - .pipe(ops.buffer_with_count(10_000), - # Parse 10_000 rows into LineProtocol on subprocess - ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length))) + writer = InfluxDBWriter(queue_) + writer.start() """ - Write data into InfluxDB + Create process pool for parallel encoding into LineProtocol """ - data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) - - """ - Terminate Writer - """ - queue_.put(None) - queue_.join() + cpu_count = multiprocessing.cpu_count() + with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, + initargs=(counter_, progress_, queue_)) as executor: + """ + Converts incoming HTTP stream into sequence of LineProtocol + """ + data = rx \ + .from_iterable(DictReader(io_wrapper)) \ + .pipe(ops.buffer_with_count(10_000), + # Parse 10_000 rows into LineProtocol on subprocess + ops.map(lambda rows: executor.submit(parse_rows, rows, content_length))) + + """ + Write data into InfluxDB + """ + data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) - print() - print(f'Import finished in: {datetime.now() - startTime}') - print() - - """ - Querying 10 pickups from dispatching 'B00008' - """ - query = 'from(bucket:"my-bucket")' \ - '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ - '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ - '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ - '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ - '|> rename(columns: {_time: "pickup_datetime"})' \ - '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' - - client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) - result = client.query_api().query(query=query) + """ + Terminate Writer + """ + queue_.put(None) + queue_.join() - """ - Processing results - """ - print() - print("=== Querying 10 pickups from dispatching 'B00008' ===") - print() - for table in result: - for record in table.records: - print( - f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') + print() + print(f'Import finished in: {datetime.now() - startTime}') + print() - """ - Close client - """ - client.close() + """ + Querying 10 pickups from dispatching 'B00008' + """ + query = 'from(bucket:"my-bucket")' \ + '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ + '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ + '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ + '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ + '|> rename(columns: {_time: "pickup_datetime"})' \ + '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: + result = client.query_api().query(query=query) + + """ + Processing results + """ + print() + print("=== Querying 10 pickups from dispatching 'B00008' ===") + print() + for table in result: + for record in table.records: + print( + f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') From e1990e4a6507a86a437924039fece5790c7656d2 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Mon, 12 Aug 2024 14:39:57 +0200 Subject: [PATCH 2/2] docs: update CHANGELOG.md --- CHANGELOG.md | 3 +++ examples/import_data_set_multiprocessing.py | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dab2d69..923317ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.46.0 [unreleased] +### Examples: +1. [#664](https://github.com/influxdata/influxdb-client-python/pull/664/): Multiprocessing example uses new source of data + ## 1.45.0 [2024-08-12] ### Bug Fixes diff --git a/examples/import_data_set_multiprocessing.py b/examples/import_data_set_multiprocessing.py index a813fe51..b20b6174 100644 --- a/examples/import_data_set_multiprocessing.py +++ b/examples/import_data_set_multiprocessing.py @@ -114,7 +114,7 @@ def parse_rows(rows, total_size): counter_.value += len(_parsed_rows) if counter_.value % 10_000 == 0: print('{0:8}{1}'.format(counter_.value, ' - {0:.2f} %' - .format(100 * float(progress_.value) / float(int(total_size))) if total_size else "")) + .format(float(progress_.value) / float(int(total_size))) if total_size else "")) pass queue_.put(_parsed_rows) @@ -148,8 +148,9 @@ def init_counter(counter, progress, queue): Open URL and for stream data """ response = urlopen(url) - if response.headers: - content_length = response.headers['Content-length'] + # we can't get content length from response because the gzip stream content length is unknown + # so we set it to this value, just for progress display + content_length = 23143223 """ Open GZIP stream