@@ -132,88 +132,89 @@ def init_counter(counter, progress, queue):
132
132
queue_ = queue
133
133
134
134
135
- """
136
- Create multiprocess shared environment
137
- """
138
- queue_ = multiprocessing .Manager ().Queue ()
139
- counter_ = Value ('i' , 0 )
140
- progress_ = Value ('i' , 0 )
141
- startTime = datetime .now ()
142
-
143
- url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
144
- # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"
135
+ if __name__ == "__main__" :
136
+ """
137
+ Create multiprocess shared environment
138
+ """
139
+ queue_ = multiprocessing .Manager ().Queue ()
140
+ counter_ = Value ('i' , 0 )
141
+ progress_ = Value ('i' , 0 )
142
+ startTime = datetime .now ()
145
143
146
- """
147
- Open URL and for stream data
148
- """
149
- response = urlopen (url )
150
- if response .headers :
151
- content_length = response .headers ['Content-length' ]
152
- io_wrapper = ProgressTextIOWrapper (response )
153
- io_wrapper .progress = progress_
144
+ url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
145
+ # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"
154
146
155
- """
156
- Start writer as a new process
157
- """
158
- writer = InfluxDBWriter (queue_ )
159
- writer .start ()
147
+ """
148
+ Open URL and for stream data
149
+ """
150
+ response = urlopen (url )
151
+ if response .headers :
152
+ content_length = response .headers ['Content-length' ]
153
+ io_wrapper = ProgressTextIOWrapper (response )
154
+ io_wrapper .progress = progress_
160
155
161
- """
162
- Create process pool for parallel encoding into LineProtocol
163
- """
164
- cpu_count = multiprocessing .cpu_count ()
165
- with concurrent .futures .ProcessPoolExecutor (cpu_count , initializer = init_counter ,
166
- initargs = (counter_ , progress_ , queue_ )) as executor :
167
156
"""
168
- Converts incoming HTTP stream into sequence of LineProtocol
157
+ Start writer as a new process
169
158
"""
170
- data = rx \
171
- .from_iterable (DictReader (io_wrapper )) \
172
- .pipe (ops .buffer_with_count (10_000 ),
173
- # Parse 10_000 rows into LineProtocol on subprocess
174
- ops .flat_map (lambda rows : executor .submit (parse_rows , rows , content_length )))
159
+ writer = InfluxDBWriter (queue_ )
160
+ writer .start ()
175
161
176
162
"""
177
- Write data into InfluxDB
163
+ Create process pool for parallel encoding into LineProtocol
178
164
"""
179
- data .subscribe (on_next = lambda x : None , on_error = lambda ex : print (f'Unexpected error: { ex } ' ))
165
+ cpu_count = multiprocessing .cpu_count ()
166
+ with concurrent .futures .ProcessPoolExecutor (cpu_count , initializer = init_counter ,
167
+ initargs = (counter_ , progress_ , queue_ )) as executor :
168
+ """
169
+ Converts incoming HTTP stream into sequence of LineProtocol
170
+ """
171
+ data = rx \
172
+ .from_iterable (DictReader (io_wrapper )) \
173
+ .pipe (ops .buffer_with_count (10_000 ),
174
+ # Parse 10_000 rows into LineProtocol on subprocess
175
+ ops .flat_map (lambda rows : executor .submit (parse_rows , rows , content_length )))
176
+
177
+ """
178
+ Write data into InfluxDB
179
+ """
180
+ data .subscribe (on_next = lambda x : None , on_error = lambda ex : print (f'Unexpected error: { ex } ' ))
180
181
181
- """
182
- Terminate Writer
183
- """
184
- queue_ .put (None )
185
- queue_ .join ()
182
+ """
183
+ Terminate Writer
184
+ """
185
+ queue_ .put (None )
186
+ queue_ .join ()
186
187
187
- print ()
188
- print (f'Import finished in: { datetime .now () - startTime } ' )
189
- print ()
188
+ print ()
189
+ print (f'Import finished in: { datetime .now () - startTime } ' )
190
+ print ()
190
191
191
- """
192
- Querying 10 pickups from dispatching 'B00008'
193
- """
194
- query = 'from(bucket:"my-bucket")' \
195
- '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
196
- '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
197
- '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
198
- '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
199
- '|> rename(columns: {_time: "pickup_datetime"})' \
200
- '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
192
+ """
193
+ Querying 10 pickups from dispatching 'B00008'
194
+ """
195
+ query = 'from(bucket:"my-bucket")' \
196
+ '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
197
+ '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
198
+ '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
199
+ '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
200
+ '|> rename(columns: {_time: "pickup_datetime"})' \
201
+ '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'
201
202
202
- client = InfluxDBClient (url = "http://localhost:8086" , token = "my-token" , org = "my-org" , debug = False )
203
- result = client .query_api ().query (query = query )
203
+ client = InfluxDBClient (url = "http://localhost:8086" , token = "my-token" , org = "my-org" , debug = False )
204
+ result = client .query_api ().query (query = query )
204
205
205
- """
206
- Processing results
207
- """
208
- print ()
209
- print ("=== Querying 10 pickups from dispatching 'B00008' ===" )
210
- print ()
211
- for table in result :
212
- for record in table .records :
213
- print (
214
- f'Dispatching: { record ["dispatching_base_num" ]} pickup: { record ["pickup_datetime" ]} dropoff: { record ["dropoff_datetime" ]} ' )
206
+ """
207
+ Processing results
208
+ """
209
+ print ()
210
+ print ("=== Querying 10 pickups from dispatching 'B00008' ===" )
211
+ print ()
212
+ for table in result :
213
+ for record in table .records :
214
+ print (
215
+ f'Dispatching: { record ["dispatching_base_num" ]} pickup: { record ["pickup_datetime" ]} dropoff: { record ["dropoff_datetime" ]} ' )
215
216
216
- """
217
- Close client
218
- """
219
- client .close ()
217
+ """
218
+ Close client
219
+ """
220
+ client .close ()
0 commit comments