diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index f1fcc822adeaf..80df487fee441 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -6,6 +6,7 @@ import uuid import numpy as np +#import pnpkg_resources from distutils.version import LooseVersion from pandas import compat @@ -13,22 +14,17 @@ from pandas.tools.merge import concat from pandas.core.common import PandasError +from apiclient.discovery import build +from apiclient.http import MediaFileUpload +from apiclient.errors import HttpError -def _check_google_client_version(): - if compat.PY3: - raise NotImplementedError("Google's libraries do not support Python 3 yet") +from oauth2client.client import AccessTokenRefreshError - try: - import pkg_resources +import httplib2 - except ImportError: - raise ImportError('Could not import pkg_resources (setuptools).') - - _GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution('google-api-python-client').version - - if LooseVersion(_GOOGLE_API_CLIENT_VERSION) < '1.2.0': - raise ImportError("pandas requires google-api-python-client >= 1.2.0 for Google " - "BigQuery support, current version " + _GOOGLE_API_CLIENT_VERSION) +# P.Chr added libs +from hurry.filesize import size, si +import time logger = logging.getLogger('pandas.io.gbq') logger.setLevel(logging.ERROR) @@ -40,12 +36,6 @@ class InvalidPageToken(PandasError, IOError): """ pass -class InvalidQueryException(PandasError, IOError): - """ - Raised when a malformed query is given to read_gbq. - """ - pass - class AccessDeniedException(PandasError, IOError): """ Raised when invalid credentials are provided, or tokens have expired. @@ -79,70 +69,51 @@ class InvalidColumnOrder(PandasError, IOError): """ pass -class GbqConnector(object): - +class GbqConnector: def __init__(self, project_id, reauth=False): - self.project_id = project_id self.reauth = reauth - self.credentials = self.get_credentials() - self.service = self.get_service(self.credentials) - - def get_credentials(self): - try: - from oauth2client.client import OAuth2WebServerFlow - from oauth2client.file import Storage - from oauth2client.tools import run_flow, argparser - - except ImportError: - raise ImportError('Could not import Google API Client.') + self.scope = 'https://www.googleapis.com/auth/bigquery' + self.json_key_path = '{}/.google_api_oauth2_credentials/service_accounts/{}.json'.format(os.environ['HOME'], project_id) + self.service = self.get_service() - _check_google_client_version() + def _get_credentials(self): + from oauth2client.client import SignedJwtAssertionCredentials - flow = OAuth2WebServerFlow(client_id='495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd.apps.googleusercontent.com', - client_secret='kOc9wMptUtxkcIFbtZCcrEAc', - scope='https://www.googleapis.com/auth/bigquery', - redirect_uri='urn:ietf:wg:oauth:2.0:oob') + with open(self.json_key_path) as f: + json_key = json.load(f) - storage = Storage('bigquery_credentials.dat') - credentials = storage.get() - - if credentials is None or credentials.invalid or self.reauth: - credentials = run_flow(flow, storage, argparser.parse_args([])) - - return credentials - - def get_service(self, credentials): - try: - import httplib2 - - except ImportError: - raise ImportError("pandas requires httplib2 for Google BigQuery support") + return SignedJwtAssertionCredentials( + json_key['client_email'], + bytes(json_key['private_key'], 'UTF-8'), + self.scope, + ) + def get_service(self): try: - from apiclient.discovery import build - - except ImportError: - raise ImportError('Could not import Google API Client.') - - _check_google_client_version() - - http = httplib2.Http() - http = credentials.authorize(http) - bigquery_service = build('bigquery', 'v2', http=http) - - return bigquery_service + credentials = self._get_credentials() + http = httplib2.Http() + http = credentials.authorize(http) + bigquery_service = build('bigquery', 'v2', http=http) + + return bigquery_service + except (FileNotFoundError, ValueError, KeyError, NotImplementedError): + raise NotFoundException("There's problem with service account credentials for project: {}. Please " + "provide valid JSON key file to {}\n".format(self.project_id, self.json_key_path)) + + def _start_timer(self): + self.start = time.monotonic() + + def get_elapsed_seconds(self): + return round(time.monotonic() - self.start, 2) + + def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.', overlong=7): + sec = self.get_elapsed_seconds() + if sec > overlong: + print('{} {} {}'.format(prefix, sec, postfix)) + sys.stdout.flush() def run_query(self, query): - try: - from apiclient.errors import HttpError - from oauth2client.client import AccessTokenRefreshError - - except ImportError: - raise ImportError('Could not import Google API Client.') - - _check_google_client_version() - job_collection = self.service.jobs() job_data = { 'configuration': { @@ -153,10 +124,13 @@ def run_query(self, query): } } + self._start_timer() try: query_reply = job_collection.insert(projectId=self.project_id, body=job_data).execute() status = query_reply['status'] + print('Query requested...') + sys.stdout.flush() except AccessTokenRefreshError: raise AccessDeniedException("The credentials have been revoked or expired, please re-run" "the application to re-authorize") @@ -168,24 +142,36 @@ def run_query(self, query): if errors: reasons = [error['reason'] for error in errors] + messages = [error['message'] for error in errors] if 'accessDenied' in reasons: - raise AccessDeniedException - if 'invalidQuery' in reasons: - raise InvalidQueryException + raise AccessDeniedException(messages) if 'notFound' in reasons: - raise NotFoundException + raise NotFoundException(messages) if 'termsOfServiceNotAccepted' in reasons: - raise TermsOfServiceNotAcceptedException + raise TermsOfServiceNotAcceptedException(messages) else: - raise UnknownGBQException(errors) + raise UnknownGBQException(status) job_reference = query_reply['jobReference'] while(not query_reply.get('jobComplete', False)): - print('Job not yet complete...') + self.print_elapsed_seconds(' Elapsed', 's. Waiting...') query_reply = job_collection.getQueryResults( projectId=job_reference['projectId'], - jobId=job_reference['jobId']).execute() + jobId=job_reference['jobId'], + ).execute() + + + bytes_processed = int(query_reply.get('totalBytesProcessed', '0')) + if query_reply['cacheHit']: + print('Query done.\nCached result.\nPrice: 0 $.\n') + else: + # Charges are rounded to the nearest MB, with a minimum 10 MB data processed per table referenced by the query. + mbytes = max( round( bytes_processed / 1e6 ), 10) # minimal charge is 10MB + price = round(mbytes * 5 / 1e6, 4) # 5$ per TB + print('Query done.\nProcessed: {}.\nPrice: {} $.\n'.format(size(bytes_processed, system=si), price)) + + print('Retrieving results...') total_rows = int(query_reply['totalRows']) result_pages = list() @@ -199,23 +185,38 @@ def run_query(self, query): page = query_reply['rows'] result_pages.append(page) current_row += len(page) - page_token = query_reply.get('pageToken', None) - if not page_token and current_row < total_rows: - raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows".format(current_row,total_rows)) + self.print_elapsed_seconds(' Got page: {}; {}% done. Elapsed'.format( + len(result_pages) , + round(100.0 * current_row / total_rows) , + )) + if current_row == total_rows: + break + + page_token = query_reply.get('pageToken', None) + + if not page_token: + raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows" + .format(current_row, total_rows)) elif page_token in seen_page_tokens: raise InvalidPageToken("A duplicate pageToken was returned") seen_page_tokens.append(page_token) + query_reply = job_collection.getQueryResults( projectId = job_reference['projectId'], jobId = job_reference['jobId'], - pageToken = page_token).execute() + pageToken = page_token, + ).execute() if (current_row < total_rows): raise InvalidPageToken() + # print basic query stats + print('Got all {} rows.'.format(total_rows)) + self.print_elapsed_seconds('Query + retrieving time taken' , 's.\n\nConstructing DataFrame...') + return schema, result_pages def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): @@ -232,7 +233,8 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): row_dict = dict() row_dict['json'] = json.loads(row.to_json(force_ascii = False, date_unit = 's', - date_format = 'iso')) + date_format = 'iso', + )) row_dict['insertId'] = job_id + str(index) rows.append(row_dict) remaining_rows -= 1 @@ -247,7 +249,8 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): projectId = self.project_id, datasetId = dataset_id, tableId = table_id, - body = body).execute() + body = body, + ).execute() if 'insertErrors' in response: raise UnknownGBQException(response) @@ -260,23 +263,22 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): def _parse_data(schema, rows): # see: http://pandas.pydata.org/pandas-docs/dev/missing_data.html#missing-data-casting-rules-and-indexing - dtype_map = {'INTEGER': np.dtype(float), + dtype_map = { + 'INTEGER': np.dtype(int), 'FLOAT': np.dtype(float), - 'TIMESTAMP': 'M8[ns]'} # This seems to be buggy without - # nanosecond indicator + 'TIMESTAMP': 'M8[ns]', # This seems to be buggy without nanosecond indicator + } fields = schema['fields'] col_types = [field['type'] for field in fields] - col_names = [field['name'].encode('ascii', 'ignore') for field in fields] + col_names = [field['name'] for field in fields] col_dtypes = [dtype_map.get(field['type'], object) for field in fields] - page_array = np.zeros((len(rows),), - dtype=zip(col_names, col_dtypes)) + page_array = np.zeros( shape = len(rows), dtype = list(zip(col_names, col_dtypes)) ) for row_num, raw_row in enumerate(rows): entries = raw_row.get('f', []) for col_num, field_type in enumerate(col_types): - field_value = _parse_entry(entries[col_num].get('v', ''), - field_type) + field_value = _parse_entry(entries[col_num].get('v', ''), field_type) page_array[row_num][col_num] = field_value return DataFrame(page_array) @@ -284,19 +286,23 @@ def _parse_data(schema, rows): def _parse_entry(field_value, field_type): if field_value is None or field_value == 'null': return None - if field_type == 'INTEGER' or field_type == 'FLOAT': + if field_type == 'INTEGER': + return int(field_value) + elif field_type == 'FLOAT': return float(field_value) elif field_type == 'TIMESTAMP': timestamp = datetime.utcfromtimestamp(float(field_value)) return np.datetime64(timestamp) + # return int(timestamp) elif field_type == 'BOOLEAN': return field_value == 'true' return field_value - def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=False): """Load data from Google BigQuery. + Adapted to Python3 and upgraded by P.Chromiec for model team @ RTBHouse + THIS IS AN EXPERIMENTAL LIBRARY The main method a user calls to execute a Query in Google BigQuery and read results @@ -327,12 +333,17 @@ def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=Fa """ + # TODO: first check index_col and col_order with dry_run mode if not project_id: raise TypeError("Missing required parameter: project_id") connector = GbqConnector(project_id, reauth = reauth) schema, pages = connector.run_query(query) + + if not pages: + return DataFrame() + dataframe_list = [] while len(pages) > 0: page = pages.pop() @@ -363,6 +374,10 @@ def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=Fa # if there are no NaN's. This is presently due to a # limitation of numpy in handling missing data. final_df._data = final_df._data.downcast(dtypes='infer') + + connector.print_elapsed_seconds('Total time taken', 's.', 0) + print( datetime.now().strftime("Finished at %Y-%m-%d %H:%M:%S.") ); + return final_df def to_gbq(dataframe, destination_table, project_id=None, chunksize=10000,