Skip to content

Gbq upgrade #11876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 112 additions & 97 deletions pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,25 @@
import uuid

import numpy as np
#import pnpkg_resources

from distutils.version import LooseVersion
from pandas import compat
from pandas.core.api import DataFrame
from pandas.tools.merge import concat
from pandas.core.common import PandasError

from apiclient.discovery import build
from apiclient.http import MediaFileUpload
from apiclient.errors import HttpError

def _check_google_client_version():
if compat.PY3:
raise NotImplementedError("Google's libraries do not support Python 3 yet")
from oauth2client.client import AccessTokenRefreshError

try:
import pkg_resources
import httplib2

except ImportError:
raise ImportError('Could not import pkg_resources (setuptools).')

_GOOGLE_API_CLIENT_VERSION = pkg_resources.get_distribution('google-api-python-client').version

if LooseVersion(_GOOGLE_API_CLIENT_VERSION) < '1.2.0':
raise ImportError("pandas requires google-api-python-client >= 1.2.0 for Google "
"BigQuery support, current version " + _GOOGLE_API_CLIENT_VERSION)
# P.Chr added libs
from hurry.filesize import size, si
import time

logger = logging.getLogger('pandas.io.gbq')
logger.setLevel(logging.ERROR)
Expand All @@ -40,12 +36,6 @@ class InvalidPageToken(PandasError, IOError):
"""
pass

class InvalidQueryException(PandasError, IOError):
"""
Raised when a malformed query is given to read_gbq.
"""
pass

class AccessDeniedException(PandasError, IOError):
"""
Raised when invalid credentials are provided, or tokens have expired.
Expand Down Expand Up @@ -79,70 +69,51 @@ class InvalidColumnOrder(PandasError, IOError):
"""
pass

class GbqConnector(object):

class GbqConnector:
def __init__(self, project_id, reauth=False):

self.project_id = project_id
self.reauth = reauth
self.credentials = self.get_credentials()
self.service = self.get_service(self.credentials)

def get_credentials(self):
try:
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.file import Storage
from oauth2client.tools import run_flow, argparser

except ImportError:
raise ImportError('Could not import Google API Client.')
self.scope = 'https://www.googleapis.com/auth/bigquery'
self.json_key_path = '{}/.google_api_oauth2_credentials/service_accounts/{}.json'.format(os.environ['HOME'], project_id)
self.service = self.get_service()

_check_google_client_version()
def _get_credentials(self):
from oauth2client.client import SignedJwtAssertionCredentials

flow = OAuth2WebServerFlow(client_id='495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd.apps.googleusercontent.com',
client_secret='kOc9wMptUtxkcIFbtZCcrEAc',
scope='https://www.googleapis.com/auth/bigquery',
redirect_uri='urn:ietf:wg:oauth:2.0:oob')
with open(self.json_key_path) as f:
json_key = json.load(f)

storage = Storage('bigquery_credentials.dat')
credentials = storage.get()

if credentials is None or credentials.invalid or self.reauth:
credentials = run_flow(flow, storage, argparser.parse_args([]))

return credentials

def get_service(self, credentials):
try:
import httplib2

except ImportError:
raise ImportError("pandas requires httplib2 for Google BigQuery support")
return SignedJwtAssertionCredentials(
json_key['client_email'],
bytes(json_key['private_key'], 'UTF-8'),
self.scope,
)

def get_service(self):
try:
from apiclient.discovery import build

except ImportError:
raise ImportError('Could not import Google API Client.')

_check_google_client_version()

http = httplib2.Http()
http = credentials.authorize(http)
bigquery_service = build('bigquery', 'v2', http=http)

return bigquery_service
credentials = self._get_credentials()
http = httplib2.Http()
http = credentials.authorize(http)
bigquery_service = build('bigquery', 'v2', http=http)

return bigquery_service
except (FileNotFoundError, ValueError, KeyError, NotImplementedError):
raise NotFoundException("There's problem with service account credentials for project: {}. Please "
"provide valid JSON key file to {}\n".format(self.project_id, self.json_key_path))

def _start_timer(self):
self.start = time.monotonic()

def get_elapsed_seconds(self):
return round(time.monotonic() - self.start, 2)

def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.', overlong=7):
sec = self.get_elapsed_seconds()
if sec > overlong:
print('{} {} {}'.format(prefix, sec, postfix))
sys.stdout.flush()

def run_query(self, query):
try:
from apiclient.errors import HttpError
from oauth2client.client import AccessTokenRefreshError

except ImportError:
raise ImportError('Could not import Google API Client.')

_check_google_client_version()

job_collection = self.service.jobs()
job_data = {
'configuration': {
Expand All @@ -153,10 +124,13 @@ def run_query(self, query):
}
}

self._start_timer()
try:
query_reply = job_collection.insert(projectId=self.project_id,
body=job_data).execute()
status = query_reply['status']
print('Query requested...')
sys.stdout.flush()
except AccessTokenRefreshError:
raise AccessDeniedException("The credentials have been revoked or expired, please re-run"
"the application to re-authorize")
Expand All @@ -168,24 +142,36 @@ def run_query(self, query):

if errors:
reasons = [error['reason'] for error in errors]
messages = [error['message'] for error in errors]
if 'accessDenied' in reasons:
raise AccessDeniedException
if 'invalidQuery' in reasons:
raise InvalidQueryException
raise AccessDeniedException(messages)
if 'notFound' in reasons:
raise NotFoundException
raise NotFoundException(messages)
if 'termsOfServiceNotAccepted' in reasons:
raise TermsOfServiceNotAcceptedException
raise TermsOfServiceNotAcceptedException(messages)
else:
raise UnknownGBQException(errors)
raise UnknownGBQException(status)

job_reference = query_reply['jobReference']

while(not query_reply.get('jobComplete', False)):
print('Job not yet complete...')
self.print_elapsed_seconds(' Elapsed', 's. Waiting...')
query_reply = job_collection.getQueryResults(
projectId=job_reference['projectId'],
jobId=job_reference['jobId']).execute()
jobId=job_reference['jobId'],
).execute()


bytes_processed = int(query_reply.get('totalBytesProcessed', '0'))
if query_reply['cacheHit']:
print('Query done.\nCached result.\nPrice: 0 $.\n')
else:
# Charges are rounded to the nearest MB, with a minimum 10 MB data processed per table referenced by the query.
mbytes = max( round( bytes_processed / 1e6 ), 10) # minimal charge is 10MB
price = round(mbytes * 5 / 1e6, 4) # 5$ per TB
print('Query done.\nProcessed: {}.\nPrice: {} $.\n'.format(size(bytes_processed, system=si), price))

print('Retrieving results...')

total_rows = int(query_reply['totalRows'])
result_pages = list()
Expand All @@ -199,23 +185,38 @@ def run_query(self, query):
page = query_reply['rows']
result_pages.append(page)
current_row += len(page)
page_token = query_reply.get('pageToken', None)

if not page_token and current_row < total_rows:
raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows".format(current_row,total_rows))
self.print_elapsed_seconds(' Got page: {}; {}% done. Elapsed'.format(
len(result_pages) ,
round(100.0 * current_row / total_rows) ,
))

if current_row == total_rows:
break

page_token = query_reply.get('pageToken', None)

if not page_token:
raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows"
.format(current_row, total_rows))
elif page_token in seen_page_tokens:
raise InvalidPageToken("A duplicate pageToken was returned")

seen_page_tokens.append(page_token)

query_reply = job_collection.getQueryResults(
projectId = job_reference['projectId'],
jobId = job_reference['jobId'],
pageToken = page_token).execute()
pageToken = page_token,
).execute()

if (current_row < total_rows):
raise InvalidPageToken()

# print basic query stats
print('Got all {} rows.'.format(total_rows))
self.print_elapsed_seconds('Query + retrieving time taken' , 's.\n\nConstructing DataFrame...')

return schema, result_pages

def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):
Expand All @@ -232,7 +233,8 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):
row_dict = dict()
row_dict['json'] = json.loads(row.to_json(force_ascii = False,
date_unit = 's',
date_format = 'iso'))
date_format = 'iso',
))
row_dict['insertId'] = job_id + str(index)
rows.append(row_dict)
remaining_rows -= 1
Expand All @@ -247,7 +249,8 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):
projectId = self.project_id,
datasetId = dataset_id,
tableId = table_id,
body = body).execute()
body = body,
).execute()
if 'insertErrors' in response:
raise UnknownGBQException(response)

Expand All @@ -260,43 +263,46 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):

def _parse_data(schema, rows):
# see: http://pandas.pydata.org/pandas-docs/dev/missing_data.html#missing-data-casting-rules-and-indexing
dtype_map = {'INTEGER': np.dtype(float),
dtype_map = {
'INTEGER': np.dtype(int),
'FLOAT': np.dtype(float),
'TIMESTAMP': 'M8[ns]'} # This seems to be buggy without
# nanosecond indicator
'TIMESTAMP': 'M8[ns]', # This seems to be buggy without nanosecond indicator
}

fields = schema['fields']
col_types = [field['type'] for field in fields]
col_names = [field['name'].encode('ascii', 'ignore') for field in fields]
col_names = [field['name'] for field in fields]
col_dtypes = [dtype_map.get(field['type'], object) for field in fields]
page_array = np.zeros((len(rows),),
dtype=zip(col_names, col_dtypes))
page_array = np.zeros( shape = len(rows), dtype = list(zip(col_names, col_dtypes)) )

for row_num, raw_row in enumerate(rows):
entries = raw_row.get('f', [])
for col_num, field_type in enumerate(col_types):
field_value = _parse_entry(entries[col_num].get('v', ''),
field_type)
field_value = _parse_entry(entries[col_num].get('v', ''), field_type)
page_array[row_num][col_num] = field_value

return DataFrame(page_array)

def _parse_entry(field_value, field_type):
if field_value is None or field_value == 'null':
return None
if field_type == 'INTEGER' or field_type == 'FLOAT':
if field_type == 'INTEGER':
return int(field_value)
elif field_type == 'FLOAT':
return float(field_value)
elif field_type == 'TIMESTAMP':
timestamp = datetime.utcfromtimestamp(float(field_value))
return np.datetime64(timestamp)
# return int(timestamp)
elif field_type == 'BOOLEAN':
return field_value == 'true'
return field_value


def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=False):
"""Load data from Google BigQuery.

Adapted to Python3 and upgraded by P.Chromiec for model team @ RTBHouse

THIS IS AN EXPERIMENTAL LIBRARY

The main method a user calls to execute a Query in Google BigQuery and read results
Expand Down Expand Up @@ -327,12 +333,17 @@ def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=Fa

"""

# TODO: first check index_col and col_order with dry_run mode

if not project_id:
raise TypeError("Missing required parameter: project_id")

connector = GbqConnector(project_id, reauth = reauth)
schema, pages = connector.run_query(query)

if not pages:
return DataFrame()

dataframe_list = []
while len(pages) > 0:
page = pages.pop()
Expand Down Expand Up @@ -363,6 +374,10 @@ def read_gbq(query, project_id = None, index_col=None, col_order=None, reauth=Fa
# if there are no NaN's. This is presently due to a
# limitation of numpy in handling missing data.
final_df._data = final_df._data.downcast(dtypes='infer')

connector.print_elapsed_seconds('Total time taken', 's.', 0)
print( datetime.now().strftime("Finished at %Y-%m-%d %H:%M:%S.") );

return final_df

def to_gbq(dataframe, destination_table, project_id=None, chunksize=10000,
Expand Down