|
7 | 7 | import csv
|
8 | 8 | import logging
|
9 | 9 | from datetime import datetime
|
| 10 | +import pkg_resources |
| 11 | +from distutils.version import LooseVersion |
10 | 12 |
|
11 | 13 | import pandas as pd
|
12 | 14 | import numpy as np
|
|
19 | 21 | import bigquery_client
|
20 | 22 | import gflags as flags
|
21 | 23 | _BQ_INSTALLED = True
|
| 24 | + |
| 25 | + _BQ_VERSION = pkg_resources.get_distribution('bigquery').version |
| 26 | + if LooseVersion(_BQ_VERSION) >= '2.0.17': |
| 27 | + _BQ_VALID_VERSION = True |
| 28 | + else: |
| 29 | + _BQ_VALID_VERSION = False |
| 30 | + |
22 | 31 | except ImportError:
|
23 | 32 | _BQ_INSTALLED = False
|
24 | 33 |
|
@@ -102,6 +111,9 @@ def _parse_entry(field_value, field_type):
|
102 | 111 | field_value = np.datetime64(timestamp)
|
103 | 112 | elif field_type == 'BOOLEAN':
|
104 | 113 | field_value = field_value == 'true'
|
| 114 | + # Note that results are unicode, so this will |
| 115 | + # fail for non-ASCII characters.. this probably |
| 116 | + # functions differently in Python 3 |
105 | 117 | else:
|
106 | 118 | field_value = str(field_value)
|
107 | 119 | return field_value
|
@@ -228,68 +240,76 @@ def _parse_data(client, job, index_col=None, col_order=None):
|
228 | 240 | # Iterate over the result rows.
|
229 | 241 | # Since Google's API now requires pagination of results,
|
230 | 242 | # we do that here. The following is repurposed from
|
231 |
| - # bigquery_client.py :: Client.ReadTableRows() |
| 243 | + # bigquery_client.py :: Client._JobTableReader._ReadOnePage |
| 244 | + |
| 245 | + # TODO: Enable Reading From Table, see Client._TableTableReader._ReadOnePage |
232 | 246 |
|
233 | 247 | # Initially, no page token is set
|
234 | 248 | page_token = None
|
235 | 249 |
|
236 |
| - # Most of Google's client API's allow one to set total_rows in case |
237 |
| - # the user only wants the first 'n' results from a query. Typically |
238 |
| - # they set this to sys.maxint by default, but this caused problems |
239 |
| - # during testing - specifically on OS X. It appears that at some |
240 |
| - # point in bigquery_client.py, there is an attempt to cast this value |
241 |
| - # to an unsigned integer. Depending on the python install, |
242 |
| - # sys.maxint may exceed the limitations of unsigned integers. |
243 |
| - # |
244 |
| - # See: |
245 |
| - # https://code.google.com/p/google-bigquery-tools/issues/detail?id=14 |
246 |
| - |
247 |
| - # This is hardcoded value for 32bit sys.maxint per |
248 |
| - # the above note. Theoretically, we could simply use |
249 |
| - # 100,000 (or whatever the current max page size is), |
250 |
| - # but this is more flexible in the event of an API change |
251 |
| - total_rows = 2147483647 |
252 |
| - |
253 |
| - # Keep track of rows read |
254 |
| - row_count = 0 |
| 250 | + # This number is the current max results per page |
| 251 | + max_rows = bigquery_client._MAX_ROWS_PER_REQUEST |
| 252 | + |
| 253 | + # How many rows in result set? Initialize to max_rows |
| 254 | + total_rows = max_rows |
| 255 | + |
| 256 | + # This is the starting row for a particular page... |
| 257 | + # is ignored if page_token is present, though |
| 258 | + # it may be useful if we wish to implement SQL like LIMITs |
| 259 | + # with minimums |
| 260 | + start_row = 0 |
255 | 261 |
|
256 | 262 | # Keep our page DataFrames until the end when we
|
257 | 263 | # concatentate them
|
258 | 264 | dataframe_list = list()
|
259 | 265 |
|
260 |
| - # Iterate over all rows |
261 |
| - while row_count < total_rows: |
262 |
| - data = client.apiclient.tabledata().list(maxResults=total_rows - row_count, |
263 |
| - pageToken=page_token, |
264 |
| - **table_dict).execute() |
| 266 | + current_job = job['jobReference'] |
265 | 267 |
|
266 |
| - # If there are more results than will fit on a page, |
267 |
| - # you will recieve a token for the next page. |
268 |
| - page_token = data.get('pageToken', None) |
| 268 | + # Iterate over all rows |
| 269 | + while start_row < total_rows: |
| 270 | + # Setup the parameters for getQueryResults() API Call |
| 271 | + kwds = dict(current_job) |
| 272 | + kwds['maxResults'] = max_rows |
| 273 | + # Sets the timeout to 0 because we assume the table is already ready. |
| 274 | + # This is because our previous call to Query() is synchronous |
| 275 | + # and will block until it's actually done |
| 276 | + kwds['timeoutMs'] = 0 |
| 277 | + # Use start row if there's no page_token ... in other words, the |
| 278 | + # user requested to start somewhere other than the beginning... |
| 279 | + # presently this is not a parameter to read_gbq(), but it will be |
| 280 | + # added eventually. |
| 281 | + if page_token: |
| 282 | + kwds['pageToken'] = page_token |
| 283 | + else: |
| 284 | + kwds['startIndex'] = start_row |
| 285 | + data = client.apiclient.jobs().getQueryResults(**kwds).execute() |
| 286 | + if not data['jobComplete']: |
| 287 | + raise BigqueryError('Job was not completed, or was invalid') |
269 | 288 |
|
270 | 289 | # How many rows are there across all pages?
|
271 |
| - total_rows = min(total_rows, int(data['totalRows'])) # Changed to use get(data[rows],0) |
| 290 | + # Note: This is presently the only reason we don't just use |
| 291 | + # _ReadOnePage() directly |
| 292 | + total_rows = int(data['totalRows']) |
| 293 | + |
| 294 | + page_token = data.get('pageToken', None) |
272 | 295 | raw_page = data.get('rows', [])
|
273 | 296 | page_array = _parse_page(raw_page, col_names, col_types, col_dtypes)
|
274 | 297 |
|
275 |
| - row_count += len(page_array) |
| 298 | + start_row += len(raw_page) |
276 | 299 | if total_rows > 0:
|
277 |
| - completed = (100 * row_count) / total_rows |
278 |
| - logger.info('Remaining Rows: ' + str(total_rows - row_count) + '(' + str(completed) + '% Complete)') |
| 300 | + completed = (100 * start_row) / total_rows |
| 301 | + logger.info('Remaining Rows: ' + str(total_rows - start_row) + '(' + str(completed) + '% Complete)') |
279 | 302 | else:
|
280 | 303 | logger.info('No Rows')
|
281 | 304 |
|
282 | 305 | dataframe_list.append(DataFrame(page_array))
|
283 | 306 |
|
284 |
| - # Handle any exceptions that might have occured |
285 |
| - if not page_token and row_count != total_rows: |
| 307 | + # Did we get enough rows? Note: gbq.py stopped checking for this |
| 308 | + # but we felt it was still a good idea. |
| 309 | + if not page_token and not raw_page and start_row != total_rows: |
286 | 310 | raise bigquery_client.BigqueryInterfaceError(
|
287 |
| - 'PageToken missing for %r' % ( |
288 |
| - bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),)) |
289 |
| - if not raw_page and row_count != total_rows: |
290 |
| - raise bigquery_client.BigqueryInterfaceError( |
291 |
| - 'Not enough rows returned by server for %r' % ( |
292 |
| - bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),)) |
| 311 | + ("Not enough rows returned by server. Expected: {0}" + \ |
| 312 | + " Rows, But Recieved {1}").format(total_rows, start_row)) |
293 | 313 |
|
294 | 314 | # Build final dataframe
|
295 | 315 | final_df = concat(dataframe_list, ignore_index=True)
|
@@ -355,6 +375,10 @@ def to_gbq(dataframe, destination_table, schema=None, col_order=None, if_exists=
|
355 | 375 | else:
|
356 | 376 | raise ImportError('Could not import Google BigQuery Client.')
|
357 | 377 |
|
| 378 | + if not _BQ_VALID_VERSION: |
| 379 | + raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery " |
| 380 | + "support, current version " + _BQ_VERSION) |
| 381 | + |
358 | 382 | ALLOWED_TYPES = ['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP', 'RECORD']
|
359 | 383 |
|
360 | 384 | if if_exists == 'replace' and schema is None:
|
@@ -456,6 +480,10 @@ def read_gbq(query, project_id = None, destination_table = None, index_col=None,
|
456 | 480 | else:
|
457 | 481 | raise ImportError('Could not import Google BigQuery Client.')
|
458 | 482 |
|
| 483 | + if not _BQ_VALID_VERSION: |
| 484 | + raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery " |
| 485 | + "support, current version " + _BQ_VERSION) |
| 486 | + |
459 | 487 | query_args = kwargs
|
460 | 488 | query_args['project_id'] = project_id
|
461 | 489 | query_args['query'] = query
|
|
0 commit comments