Skip to content

Commit b9eed84

Browse files
author
Jacob Schaer
committed
Fixed pandas.io.gbq Paging Bug
1 parent ac1609e commit b9eed84

File tree

5 files changed

+91
-61
lines changed

5 files changed

+91
-61
lines changed

ci/requirements-2.6.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ python-dateutil==1.5
44
pytz==2013b
55
http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz
66
html5lib==1.0b2
7-
bigquery==2.0.15
7+
bigquery==2.0.17

ci/requirements-2.7.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ MySQL-python==1.2.4
1818
scipy==0.10.0
1919
beautifulsoup4==4.2.1
2020
statsmodels==0.5.0
21-
bigquery==2.0.15
21+
bigquery==2.0.17

ci/requirements-2.7_LOCALE.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ lxml==3.2.1
1616
scipy==0.10.0
1717
beautifulsoup4==4.2.1
1818
statsmodels==0.5.0
19-
bigquery==2.0.15
19+
bigquery==2.0.17

pandas/io/gbq.py

+68-40
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import csv
88
import logging
99
from datetime import datetime
10+
import pkg_resources
11+
from distutils.version import LooseVersion
1012

1113
import pandas as pd
1214
import numpy as np
@@ -19,6 +21,13 @@
1921
import bigquery_client
2022
import gflags as flags
2123
_BQ_INSTALLED = True
24+
25+
_BQ_VERSION = pkg_resources.get_distribution('bigquery').version
26+
if LooseVersion(_BQ_VERSION) >= '2.0.17':
27+
_BQ_VALID_VERSION = True
28+
else:
29+
_BQ_VALID_VERSION = False
30+
2231
except ImportError:
2332
_BQ_INSTALLED = False
2433

@@ -102,6 +111,9 @@ def _parse_entry(field_value, field_type):
102111
field_value = np.datetime64(timestamp)
103112
elif field_type == 'BOOLEAN':
104113
field_value = field_value == 'true'
114+
# Note that results are unicode, so this will
115+
# fail for non-ASCII characters.. this probably
116+
# functions differently in Python 3
105117
else:
106118
field_value = str(field_value)
107119
return field_value
@@ -228,68 +240,76 @@ def _parse_data(client, job, index_col=None, col_order=None):
228240
# Iterate over the result rows.
229241
# Since Google's API now requires pagination of results,
230242
# we do that here. The following is repurposed from
231-
# bigquery_client.py :: Client.ReadTableRows()
243+
# bigquery_client.py :: Client._JobTableReader._ReadOnePage
244+
245+
# TODO: Enable Reading From Table, see Client._TableTableReader._ReadOnePage
232246

233247
# Initially, no page token is set
234248
page_token = None
235249

236-
# Most of Google's client API's allow one to set total_rows in case
237-
# the user only wants the first 'n' results from a query. Typically
238-
# they set this to sys.maxint by default, but this caused problems
239-
# during testing - specifically on OS X. It appears that at some
240-
# point in bigquery_client.py, there is an attempt to cast this value
241-
# to an unsigned integer. Depending on the python install,
242-
# sys.maxint may exceed the limitations of unsigned integers.
243-
#
244-
# See:
245-
# https://code.google.com/p/google-bigquery-tools/issues/detail?id=14
246-
247-
# This is hardcoded value for 32bit sys.maxint per
248-
# the above note. Theoretically, we could simply use
249-
# 100,000 (or whatever the current max page size is),
250-
# but this is more flexible in the event of an API change
251-
total_rows = 2147483647
252-
253-
# Keep track of rows read
254-
row_count = 0
250+
# This number is the current max results per page
251+
max_rows = bigquery_client._MAX_ROWS_PER_REQUEST
252+
253+
# How many rows in result set? Initialize to max_rows
254+
total_rows = max_rows
255+
256+
# This is the starting row for a particular page...
257+
# is ignored if page_token is present, though
258+
# it may be useful if we wish to implement SQL like LIMITs
259+
# with minimums
260+
start_row = 0
255261

256262
# Keep our page DataFrames until the end when we
257263
# concatentate them
258264
dataframe_list = list()
259265

260-
# Iterate over all rows
261-
while row_count < total_rows:
262-
data = client.apiclient.tabledata().list(maxResults=total_rows - row_count,
263-
pageToken=page_token,
264-
**table_dict).execute()
266+
current_job = job['jobReference']
265267

266-
# If there are more results than will fit on a page,
267-
# you will recieve a token for the next page.
268-
page_token = data.get('pageToken', None)
268+
# Iterate over all rows
269+
while start_row < total_rows:
270+
# Setup the parameters for getQueryResults() API Call
271+
kwds = dict(current_job)
272+
kwds['maxResults'] = max_rows
273+
# Sets the timeout to 0 because we assume the table is already ready.
274+
# This is because our previous call to Query() is synchronous
275+
# and will block until it's actually done
276+
kwds['timeoutMs'] = 0
277+
# Use start row if there's no page_token ... in other words, the
278+
# user requested to start somewhere other than the beginning...
279+
# presently this is not a parameter to read_gbq(), but it will be
280+
# added eventually.
281+
if page_token:
282+
kwds['pageToken'] = page_token
283+
else:
284+
kwds['startIndex'] = start_row
285+
data = client.apiclient.jobs().getQueryResults(**kwds).execute()
286+
if not data['jobComplete']:
287+
raise BigqueryError('Job was not completed, or was invalid')
269288

270289
# How many rows are there across all pages?
271-
total_rows = min(total_rows, int(data['totalRows'])) # Changed to use get(data[rows],0)
290+
# Note: This is presently the only reason we don't just use
291+
# _ReadOnePage() directly
292+
total_rows = int(data['totalRows'])
293+
294+
page_token = data.get('pageToken', None)
272295
raw_page = data.get('rows', [])
273296
page_array = _parse_page(raw_page, col_names, col_types, col_dtypes)
274297

275-
row_count += len(page_array)
298+
start_row += len(raw_page)
276299
if total_rows > 0:
277-
completed = (100 * row_count) / total_rows
278-
logger.info('Remaining Rows: ' + str(total_rows - row_count) + '(' + str(completed) + '% Complete)')
300+
completed = (100 * start_row) / total_rows
301+
logger.info('Remaining Rows: ' + str(total_rows - start_row) + '(' + str(completed) + '% Complete)')
279302
else:
280303
logger.info('No Rows')
281304

282305
dataframe_list.append(DataFrame(page_array))
283306

284-
# Handle any exceptions that might have occured
285-
if not page_token and row_count != total_rows:
307+
# Did we get enough rows? Note: gbq.py stopped checking for this
308+
# but we felt it was still a good idea.
309+
if not page_token and not raw_page and start_row != total_rows:
286310
raise bigquery_client.BigqueryInterfaceError(
287-
'PageToken missing for %r' % (
288-
bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),))
289-
if not raw_page and row_count != total_rows:
290-
raise bigquery_client.BigqueryInterfaceError(
291-
'Not enough rows returned by server for %r' % (
292-
bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),))
311+
("Not enough rows returned by server. Expected: {0}" + \
312+
" Rows, But Recieved {1}").format(total_rows, start_row))
293313

294314
# Build final dataframe
295315
final_df = concat(dataframe_list, ignore_index=True)
@@ -355,6 +375,10 @@ def to_gbq(dataframe, destination_table, schema=None, col_order=None, if_exists=
355375
else:
356376
raise ImportError('Could not import Google BigQuery Client.')
357377

378+
if not _BQ_VALID_VERSION:
379+
raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery "
380+
"support, current version " + _BQ_VERSION)
381+
358382
ALLOWED_TYPES = ['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP', 'RECORD']
359383

360384
if if_exists == 'replace' and schema is None:
@@ -456,6 +480,10 @@ def read_gbq(query, project_id = None, destination_table = None, index_col=None,
456480
else:
457481
raise ImportError('Could not import Google BigQuery Client.')
458482

483+
if not _BQ_VALID_VERSION:
484+
raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery "
485+
"support, current version " + _BQ_VERSION)
486+
459487
query_args = kwargs
460488
query_args['project_id'] = project_id
461489
query_args['query'] = query

pandas/io/tests/test_gbq.py

+20-18
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,21 @@ def GetTableSchema(self,table_dict):
4040
# Fake Google BigQuery API Client
4141
class FakeApiClient:
4242
def __init__(self):
43-
self._tabledata = FakeTableData()
43+
self._fakejobs = FakeJobs()
4444

4545

46-
def tabledata(self):
47-
return self._tabledata
46+
def jobs(self):
47+
return self._fakejobs
4848

49-
class FakeTableData:
49+
class FakeJobs:
5050
def __init__(self):
51-
self._list = FakeList()
51+
self._fakequeryresults = FakeResults()
5252

53-
def list(self,maxResults = None, pageToken = None, **table_dict):
54-
return self._list
53+
def getQueryResults(self, job_id=None, project_id=None,
54+
max_results=None, timeout_ms=None, **kwargs):
55+
return self._fakequeryresults
5556

56-
class FakeList:
57+
class FakeResults:
5758
def execute(self):
5859
return {'rows': [ {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brave'}, {'v': '3'}]},
5960
{'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'attended'}, {'v': '1'}]},
@@ -68,7 +69,8 @@ def execute(self):
6869
],
6970
'kind': 'bigquery#tableDataList',
7071
'etag': '"4PTsVxg68bQkQs1RJ1Ndewqkgg4/hoRHzb4qfhJAIa2mEewC-jhs9Bg"',
71-
'totalRows': '10'}
72+
'totalRows': '10',
73+
'jobComplete' : True}
7274

7375
####################################################################################
7476

@@ -225,16 +227,16 @@ def test_column_order_plus_index(self):
225227
correct_frame_small = DataFrame(correct_frame_small)[col_order]
226228
tm.assert_index_equal(result_frame.columns, correct_frame_small.columns)
227229

228-
# @with_connectivity_check
229-
# def test_download_dataset_larger_than_100k_rows(self):
230-
# # Test for known BigQuery bug in datasets larger than 100k rows
231-
# # http://stackoverflow.com/questions/19145587/bq-py-not-paging-results
232-
# if not os.path.exists(self.bq_token):
233-
# raise nose.SkipTest('Skipped because authentication information is not available.')
230+
@with_connectivity_check
231+
def test_download_dataset_larger_than_100k_rows(self):
232+
# Test for known BigQuery bug in datasets larger than 100k rows
233+
# http://stackoverflow.com/questions/19145587/bq-py-not-paging-results
234+
if not os.path.exists(self.bq_token):
235+
raise nose.SkipTest('Skipped because authentication information is not available.')
234236

235-
# client = gbq._authenticate()
236-
# a = gbq.read_gbq("SELECT id, FROM [publicdata:samples.wikipedia] LIMIT 100005")
237-
# self.assertTrue(len(a) == 100005)
237+
client = gbq._authenticate()
238+
a = gbq.read_gbq("SELECT id, FROM [publicdata:samples.wikipedia] LIMIT 100005")
239+
self.assertTrue(len(a) == 100005)
238240

239241
@with_connectivity_check
240242
def test_download_all_data_types(self):

0 commit comments

Comments
 (0)