From e035747be76a258879ec2dddff3606ced6dc2c08 Mon Sep 17 00:00:00 2001 From: chromy Date: Tue, 2 Jun 2015 22:18:52 +0200 Subject: [PATCH 1/2] gbq: service account credentials support + optional verbosity --- doc/source/io.rst | 36 ++++- doc/source/whatsnew/v0.18.0.txt | 3 + pandas/core/frame.py | 8 +- pandas/io/gbq.py | 258 +++++++++++++++++++++-------- pandas/io/tests/test_gbq.py | 276 ++++++++++++++++++++++++++++++-- 5 files changed, 486 insertions(+), 95 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index e301e353071d9..34839412a5fd5 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4093,6 +4093,33 @@ The key functions are: .. _io.bigquery_reader: + +Authentication +'''''''''''''' + +Authentication is possible with either user account credentials or service account credentials. + +Authenticating with user account credentials is as simple as following the prompts in a browser window +which will be automatically opened for you. You will be authenticated to the specified +``BigQuery`` account via Google's ``Oauth2`` mechanism. Additional information on the +authentication mechanism can be found `here `__. + +Authentication with service account credentials is possible via the `'private_key'` parameter. This method +is particularly useful when working on remote servers (eg. jupyter iPython notebook on remote host). +The remote authentication using user account credentials is not currently supported in Pandas. +Additional information on service accounts can be found +`here `__. + +.. note:: + +The `'private_key'` parameter can be set to either the file path of the service account key in JSON format, or +key contents of the service account key in JSON format. + +.. note:: + +A private key can be obtained from the Google developers console by clicking `here `__. Use JSON key type. + + Querying '''''''' @@ -4107,12 +4134,6 @@ into a DataFrame using the :func:`~pandas.io.gbq.read_gbq` function. data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', projectid) -You will then be authenticated to the specified BigQuery account -via Google's Oauth2 mechanism. In general, this is as simple as following the -prompts in a browser window which will be opened for you. Should the browser not -be available, or fail to launch, a code will be provided to complete the process -manually. Additional information on the authentication mechanism can be found -`here `__. You can define which column from BigQuery to use as an index in the destination DataFrame as well as a preferred column order as follows: @@ -4125,7 +4146,7 @@ destination DataFrame as well as a preferred column order as follows: .. note:: - You can find your project id in the `BigQuery management console `__. + You can find your project id in the `Google developers console `__. .. note:: @@ -4134,6 +4155,7 @@ destination DataFrame as well as a preferred column order as follows: .. _io.bigquery_writer: + Writing DataFrames '''''''''''''''''' diff --git a/doc/source/whatsnew/v0.18.0.txt b/doc/source/whatsnew/v0.18.0.txt index ccdc48bc1dbbb..9f623ec90c75e 100644 --- a/doc/source/whatsnew/v0.18.0.txt +++ b/doc/source/whatsnew/v0.18.0.txt @@ -152,6 +152,9 @@ Other enhancements - ``Series`` gained an ``is_unique`` attribute (:issue:`11946`) - ``DataFrame.quantile`` and ``Series.quantile`` now accept ``interpolation`` keyword (:issue:`10174`). - ``DataFrame.select_dtypes`` now allows the ``np.float16`` typecode (:issue:`11990`) +- Added Google ``BigQuery`` service account authentication support, which enables authentication on remote servers. (:issue:`11881`) + For further details see :ref:`here ` + .. _whatsnew_0180.enhancements.rounding: diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 907da619b1875..8cf73caeac32e 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -859,7 +859,7 @@ def to_dict(self, orient='dict'): raise ValueError("orient '%s' not understood" % orient) def to_gbq(self, destination_table, project_id, chunksize=10000, - verbose=True, reauth=False, if_exists='fail'): + verbose=True, reauth=False, if_exists='fail', private_key=None): """Write a DataFrame to a Google BigQuery table. THIS IS AN EXPERIMENTAL LIBRARY @@ -883,6 +883,10 @@ def to_gbq(self, destination_table, project_id, chunksize=10000, 'fail': If table exists, do nothing. 'replace': If table exists, drop it, recreate it, and insert data. 'append': If table exists, insert data. Create if does not exist. + private_key : str (optional) + Service account private key in JSON format. Can be file path + or string contents. This is useful for remote server + authentication (eg. jupyter iPython notebook on remote host) .. versionadded:: 0.17.0 """ @@ -890,7 +894,7 @@ def to_gbq(self, destination_table, project_id, chunksize=10000, from pandas.io import gbq return gbq.to_gbq(self, destination_table, project_id=project_id, chunksize=chunksize, verbose=verbose, reauth=reauth, - if_exists=if_exists) + if_exists=if_exists, private_key=private_key) @classmethod def from_records(cls, data, index=None, exclude=None, columns=None, diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 4bf46f199c34a..136bc041decec 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -4,6 +4,8 @@ import logging from time import sleep import uuid +import time +import sys import numpy as np @@ -39,10 +41,33 @@ def _check_google_client_version(): .format(google_api_minimum_version, _GOOGLE_API_CLIENT_VERSION)) + +def _test_google_api_imports(): + + try: + import httplib2 + from apiclient.discovery import build + from apiclient.errors import HttpError + from oauth2client.client import AccessTokenRefreshError + from oauth2client.client import OAuth2WebServerFlow + from oauth2client.client import SignedJwtAssertionCredentials + from oauth2client.file import Storage + from oauth2client.tools import run_flow, argparser + except ImportError as e: + raise ImportError("Missing module required for Google BigQuery " + "support: {0}".format(str(e))) + logger = logging.getLogger('pandas.io.gbq') logger.setLevel(logging.ERROR) +class InvalidPrivateKeyFormat(PandasError, ValueError): + """ + Raised when provided private key has invalid format. + """ + pass + + class AccessDenied(PandasError, ValueError): """ Raised when invalid credentials are provided, or tokens have expired. @@ -114,39 +139,35 @@ class TableCreationError(PandasError, ValueError): class GbqConnector(object): + scope = 'https://www.googleapis.com/auth/bigquery' - def __init__(self, project_id, reauth=False): - self.test_google_api_imports() + def __init__(self, project_id, reauth=False, verbose=False, + private_key=None): + _check_google_client_version() + _test_google_api_imports() self.project_id = project_id self.reauth = reauth + self.verbose = verbose + self.private_key = private_key self.credentials = self.get_credentials() - self.service = self.get_service(self.credentials) - - def test_google_api_imports(self): - try: - import httplib2 # noqa - from apiclient.discovery import build # noqa - from apiclient.errors import HttpError # noqa - from oauth2client.client import AccessTokenRefreshError # noqa - from oauth2client.client import OAuth2WebServerFlow # noqa - from oauth2client.file import Storage # noqa - from oauth2client.tools import run_flow, argparser # noqa - except ImportError as e: - raise ImportError("Missing module required for Google BigQuery " - "support: {0}".format(str(e))) + self.service = self.get_service() def get_credentials(self): + if self.private_key: + return self.get_service_account_credentials() + else: + return self.get_user_account_credentials() + + def get_user_account_credentials(self): from oauth2client.client import OAuth2WebServerFlow from oauth2client.file import Storage from oauth2client.tools import run_flow, argparser - _check_google_client_version() - flow = OAuth2WebServerFlow( client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd' '.apps.googleusercontent.com'), client_secret='kOc9wMptUtxkcIFbtZCcrEAc', - scope='https://www.googleapis.com/auth/bigquery', + scope=self.scope, redirect_uri='urn:ietf:wg:oauth:2.0:oob') storage = Storage('bigquery_credentials.dat') @@ -157,15 +178,71 @@ def get_credentials(self): return credentials + def get_service_account_credentials(self): + from oauth2client.client import SignedJwtAssertionCredentials + from os.path import isfile + + try: + if isfile(self.private_key): + with open(self.private_key) as f: + json_key = json.loads(f.read()) + else: + # ugly hack: 'private_key' field has new lines inside, + # they break json parser, but we need to preserve them + json_key = json.loads(self.private_key.replace('\n', ' ')) + json_key['private_key'] = json_key['private_key'].replace( + ' ', '\n') + + if compat.PY3: + json_key['private_key'] = bytes( + json_key['private_key'], 'UTF-8') + + return SignedJwtAssertionCredentials( + json_key['client_email'], + json_key['private_key'], + self.scope, + ) + except (KeyError, ValueError, TypeError, AttributeError): + raise InvalidPrivateKeyFormat( + "Private key is missing or invalid. It should be service " + "account private key JSON (file path or string contents) " + "with at least two keys: 'client_email' and 'private_key'. " + "Can be obtained from: https://console.developers.google." + "com/permissions/serviceaccounts") + + def _print(self, msg, end='\n'): + if self.verbose: + sys.stdout.write(msg + end) + sys.stdout.flush() + + def _start_timer(self): + self.start = time.time() + + def get_elapsed_seconds(self): + return round(time.time() - self.start, 2) + + def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.', + overlong=7): + sec = self.get_elapsed_seconds() + if sec > overlong: + self._print('{} {} {}'.format(prefix, sec, postfix)) + + # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size @staticmethod - def get_service(credentials): + def sizeof_fmt(num, suffix='b'): + fmt = "%3.1f %s%s" + for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']: + if abs(num) < 1024.0: + return fmt % (num, unit, suffix) + num /= 1024.0 + return fmt % (num, 'Y', suffix) + + def get_service(self): import httplib2 from apiclient.discovery import build - _check_google_client_version() - http = httplib2.Http() - http = credentials.authorize(http) + http = self.credentials.authorize(http) bigquery_service = build('bigquery', 'v2', http=http) return bigquery_service @@ -188,8 +265,7 @@ def process_http_error(ex): raise GenericGBQException(errors) - @staticmethod - def process_insert_errors(insert_errors, verbose): + def process_insert_errors(self, insert_errors): for insert_error in insert_errors: row = insert_error['index'] errors = insert_error.get('errors', None) @@ -202,8 +278,8 @@ def process_insert_errors(insert_errors, verbose): .format(row, reason, location, message)) # Report all error messages if verbose is set - if verbose: - print(error_message) + if self.verbose: + self._print(error_message) else: raise StreamingInsertError(error_message + '\nEnable verbose logging to ' @@ -211,7 +287,7 @@ def process_insert_errors(insert_errors, verbose): raise StreamingInsertError - def run_query(self, query, verbose=True): + def run_query(self, query): from apiclient.errors import HttpError from oauth2client.client import AccessTokenRefreshError @@ -228,21 +304,27 @@ def run_query(self, query, verbose=True): } } + self._start_timer() try: + self._print('Requesting query... ', end="") query_reply = job_collection.insert( projectId=self.project_id, body=job_data).execute() - except AccessTokenRefreshError: - raise AccessDenied("The credentials have been revoked or expired, " - "please re-run the application " - "to re-authorize") + self._print('ok.\nQuery running...') + except (AccessTokenRefreshError, ValueError): + if self.private_key: + raise AccessDenied( + "The service account credentials are not valid") + else: + raise AccessDenied( + "The credentials have been revoked or expired, " + "please re-run the application to re-authorize") except HttpError as ex: self.process_http_error(ex) job_reference = query_reply['jobReference'] while not query_reply.get('jobComplete', False): - if verbose: - print('Waiting for job to complete...') + self.print_elapsed_seconds(' Elapsed', 's. Waiting...') try: query_reply = job_collection.getQueryResults( projectId=job_reference['projectId'], @@ -250,6 +332,17 @@ def run_query(self, query, verbose=True): except HttpError as ex: self.process_http_error(ex) + if self.verbose: + if query_reply['cacheHit']: + self._print('Query done.\nCache hit.\n') + else: + bytes_processed = int(query_reply.get( + 'totalBytesProcessed', '0')) + self._print('Query done.\nProcessed: {}\n'.format( + self.sizeof_fmt(bytes_processed))) + + self._print('Retrieving results...') + total_rows = int(query_reply['totalRows']) result_pages = list() seen_page_tokens = list() @@ -262,6 +355,15 @@ def run_query(self, query, verbose=True): page = query_reply['rows'] result_pages.append(page) current_row += len(page) + + self.print_elapsed_seconds( + ' Got page: {}; {}% done. Elapsed'.format( + len(result_pages), + round(100.0 * current_row / total_rows))) + + if current_row == total_rows: + break + page_token = query_reply.get('pageToken', None) if not page_token and current_row < total_rows: @@ -285,18 +387,21 @@ def run_query(self, query, verbose=True): if current_row < total_rows: raise InvalidPageToken() + # print basic query stats + self._print('Got {} rows.\n'.format(total_rows)) + return schema, result_pages - def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): + def load_data(self, dataframe, dataset_id, table_id, chunksize): from apiclient.errors import HttpError job_id = uuid.uuid4().hex rows = [] remaining_rows = len(dataframe) - if verbose: + if self.verbose: total_rows = remaining_rows - print("\n\n") + self._print("\n\n") for index, row in dataframe.reset_index(drop=True).iterrows(): row_dict = dict() @@ -308,9 +413,8 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): remaining_rows -= 1 if (len(rows) % chunksize == 0) or (remaining_rows == 0): - if verbose: - print("\rStreaming Insert is {0}% Complete".format( - ((total_rows - remaining_rows) * 100) / total_rows)) + self._print("\rStreaming Insert is {0}% Complete".format( + ((total_rows - remaining_rows) * 100) / total_rows)) body = {'rows': rows} @@ -335,13 +439,12 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): insert_errors = response.get('insertErrors', None) if insert_errors: - self.process_insert_errors(insert_errors, verbose) + self.process_insert_errors(insert_errors) sleep(1) # Maintains the inserts "per second" rate per API rows = [] - if verbose: - print("\n") + self._print("\n") def verify_schema(self, dataset_id, table_id, schema): from apiclient.errors import HttpError @@ -356,8 +459,7 @@ def verify_schema(self, dataset_id, table_id, schema): except HttpError as ex: self.process_http_error(ex) - def delete_and_recreate_table(self, dataset_id, table_id, - table_schema, verbose): + def delete_and_recreate_table(self, dataset_id, table_id, table_schema): delay = 0 # Changes to table schema may take up to 2 minutes as of May 2015 See @@ -367,12 +469,12 @@ def delete_and_recreate_table(self, dataset_id, table_id, # be a 120 second delay if not self.verify_schema(dataset_id, table_id, table_schema): - if verbose: - print('The existing table has a different schema. ' - 'Please wait 2 minutes. See Google BigQuery issue #191') + self._print('The existing table has a different schema. Please ' + 'wait 2 minutes. See Google BigQuery issue #191') delay = 120 - table = _Table(self.project_id, dataset_id) + table = _Table(self.project_id, dataset_id, + private_key=self.private_key) table.delete(table_id) table.create(table_id, table_schema) sleep(delay) @@ -418,7 +520,7 @@ def _parse_entry(field_value, field_type): def read_gbq(query, project_id=None, index_col=None, col_order=None, - reauth=False, verbose=True): + reauth=False, verbose=True, private_key=None): """Load data from Google BigQuery. THIS IS AN EXPERIMENTAL LIBRARY @@ -446,6 +548,10 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, if multiple accounts are used. verbose : boolean (default True) Verbose output + private_key : str (optional) + Service account private key in JSON format. Can be file path + or string contents. This is useful for remote server + authentication (eg. jupyter iPython notebook on remote host) Returns ------- @@ -457,8 +563,9 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, if not project_id: raise TypeError("Missing required parameter: project_id") - connector = GbqConnector(project_id, reauth=reauth) - schema, pages = connector.run_query(query, verbose=verbose) + connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, + private_key=private_key) + schema, pages = connector.run_query(query) dataframe_list = [] while len(pages) > 0: page = pages.pop() @@ -492,11 +599,18 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, # if there are no NaN's. This is presently due to a # limitation of numpy in handling missing data. final_df._data = final_df._data.downcast(dtypes='infer') + + connector.print_elapsed_seconds( + 'Total time taken', + datetime.now().strftime('s.\nFinished at %Y-%m-%d %H:%M:%S.'), + 0 + ) + return final_df def to_gbq(dataframe, destination_table, project_id, chunksize=10000, - verbose=True, reauth=False, if_exists='fail'): + verbose=True, reauth=False, if_exists='fail', private_key=None): """Write a DataFrame to a Google BigQuery table. THIS IS AN EXPERIMENTAL LIBRARY @@ -520,6 +634,10 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, 'fail': If table exists, do nothing. 'replace': If table exists, drop it, recreate it, and insert data. 'append': If table exists, insert data. Create if does not exist. + private_key : str (optional) + Service account private key in JSON format. Can be file path + or string contents. This is useful for remote server + authentication (eg. jupyter iPython notebook on remote host) """ if if_exists not in ('fail', 'replace', 'append'): @@ -529,10 +647,12 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, raise NotFoundException( "Invalid Table Name. Should be of the form 'datasetId.tableId' ") - connector = GbqConnector(project_id, reauth=reauth) + connector = GbqConnector(project_id, reauth=reauth, verbose=verbose, + private_key=private_key) dataset_id, table_id = destination_table.rsplit('.', 1) - table = _Table(project_id, dataset_id, reauth=reauth) + table = _Table(project_id, dataset_id, reauth=reauth, + private_key=private_key) table_schema = _generate_bq_schema(dataframe) @@ -545,7 +665,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, "append or replace data.") elif if_exists == 'replace': connector.delete_and_recreate_table( - dataset_id, table_id, table_schema, verbose) + dataset_id, table_id, table_schema) elif if_exists == 'append': if not connector.verify_schema(dataset_id, table_id, table_schema): raise InvalidSchema("Please verify that the column order, " @@ -555,7 +675,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, else: table.create(table_id, table_schema) - connector.load_data(dataframe, dataset_id, table_id, chunksize, verbose) + connector.load_data(dataframe, dataset_id, table_id, chunksize) def generate_bq_schema(df, default_type='STRING'): @@ -597,15 +717,12 @@ def _generate_bq_schema(df, default_type='STRING'): class _Table(GbqConnector): - def __init__(self, project_id, dataset_id, reauth=False): + def __init__(self, project_id, dataset_id, reauth=False, verbose=False, + private_key=None): from apiclient.errors import HttpError - self.test_google_api_imports() - self.project_id = project_id - self.reauth = reauth - self.credentials = self.get_credentials() - self.service = self.get_service(self.credentials) self.http_error = HttpError self.dataset_id = dataset_id + GbqConnector.__init__(self, project_id, reauth, verbose, private_key) def exists(self, table_id): """ Check if a table exists in Google BigQuery @@ -653,8 +770,10 @@ def create(self, table_id, schema): raise TableCreationError( "The table could not be created because it already exists") - if not _Dataset(self.project_id).exists(self.dataset_id): - _Dataset(self.project_id).create(self.dataset_id) + if not _Dataset(self.project_id, + private_key=self.private_key).exists(self.dataset_id): + _Dataset(self.project_id, + private_key=self.private_key).create(self.dataset_id) body = { 'schema': schema, @@ -698,14 +817,11 @@ def delete(self, table_id): class _Dataset(GbqConnector): - def __init__(self, project_id, reauth=False): + def __init__(self, project_id, reauth=False, verbose=False, + private_key=None): from apiclient.errors import HttpError - self.test_google_api_imports() - self.project_id = project_id - self.reauth = reauth - self.credentials = self.get_credentials() - self.service = self.get_service(self.credentials) self.http_error = HttpError + GbqConnector.__init__(self, project_id, reauth, verbose, private_key) def exists(self, dataset_id): """ Check if a dataset exists in Google BigQuery diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index 88a1e3e0a5cc3..6a37c5ea6ef7a 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -16,6 +16,9 @@ import pandas.util.testing as tm PROJECT_ID = None +PRIVATE_KEY_JSON_PATH = None +PRIVATE_KEY_JSON_CONTENTS = None + DATASET_ID = 'pydata_pandas_bq_testing' TABLE_ID = 'new_test' DESTINATION_TABLE = "{0}.{1}".format(DATASET_ID + "1", TABLE_ID) @@ -49,8 +52,9 @@ def _test_imports(): from apiclient.discovery import build # noqa from apiclient.errors import HttpError # noqa - from oauth2client.client import OAuth2WebServerFlow # noqa - from oauth2client.client import AccessTokenRefreshError # noqa + from oauth2client.client import OAuth2WebServerFlow + from oauth2client.client import AccessTokenRefreshError + from oauth2client.client import SignedJwtAssertionCredentials from oauth2client.file import Storage # noqa from oauth2client.tools import run_flow # noqa @@ -96,13 +100,13 @@ def test_requirements(): raise nose.SkipTest(import_exception) -def clean_gbq_environment(): - dataset = gbq._Dataset(PROJECT_ID) +def clean_gbq_environment(private_key=None): + dataset = gbq._Dataset(PROJECT_ID, private_key=private_key) for i in range(1, 10): if DATASET_ID + str(i) in dataset.datasets(): dataset_id = DATASET_ID + str(i) - table = gbq._Table(PROJECT_ID, dataset_id) + table = gbq._Table(PROJECT_ID, dataset_id, private_key=private_key) for j in range(1, 20): if TABLE_ID + str(j) in dataset.tables(dataset_id): table.delete(TABLE_ID + str(j)) @@ -153,8 +157,39 @@ def test_should_be_able_to_get_valid_credentials(self): self.assertFalse(credentials.invalid, 'Returned credentials invalid') def test_should_be_able_to_get_a_bigquery_service(self): + bigquery_service = self.sut.get_service() + self.assertTrue(bigquery_service is not None, 'No service returned') + + def test_should_be_able_to_get_schema_from_query(self): + schema, pages = self.sut.run_query('SELECT 1') + self.assertTrue(schema is not None) + + def test_should_be_able_to_get_results_from_query(self): + schema, pages = self.sut.run_query('SELECT 1') + self.assertTrue(pages is not None) + + +class TestGBQConnectorServiceAccountKeyPathIntegration(tm.TestCase): + def setUp(self): + test_requirements() + + if not PROJECT_ID or not PRIVATE_KEY_JSON_PATH: + raise nose.SkipTest("Cannot run integration tests without " + "a project id and private key json path") + + self.sut = gbq.GbqConnector(PROJECT_ID, + private_key=PRIVATE_KEY_JSON_PATH) + + def test_should_be_able_to_make_a_connector(self): + self.assertTrue(self.sut is not None, + 'Could not create a GbqConnector') + + def test_should_be_able_to_get_valid_credentials(self): credentials = self.sut.get_credentials() - bigquery_service = self.sut.get_service(credentials) + self.assertFalse(credentials.invalid, 'Returned credentials invalid') + + def test_should_be_able_to_get_a_bigquery_service(self): + bigquery_service = self.sut.get_service() self.assertTrue(bigquery_service is not None, 'No service returned') def test_should_be_able_to_get_schema_from_query(self): @@ -166,8 +201,39 @@ def test_should_be_able_to_get_results_from_query(self): self.assertTrue(pages is not None) -class TestReadGBQUnitTests(tm.TestCase): +class TestGBQConnectorServiceAccountKeyContentsIntegration(tm.TestCase): + def setUp(self): + test_requirements() + + if not PROJECT_ID or not PRIVATE_KEY_JSON_CONTENTS: + raise nose.SkipTest("Cannot run integration tests without " + "a project id and private key json contents") + + self.sut = gbq.GbqConnector(PROJECT_ID, + private_key=PRIVATE_KEY_JSON_CONTENTS) + + def test_should_be_able_to_make_a_connector(self): + self.assertTrue(self.sut is not None, + 'Could not create a GbqConnector') + + def test_should_be_able_to_get_valid_credentials(self): + credentials = self.sut.get_credentials() + self.assertFalse(credentials.invalid, 'Returned credentials invalid') + def test_should_be_able_to_get_a_bigquery_service(self): + bigquery_service = self.sut.get_service() + self.assertTrue(bigquery_service is not None, 'No service returned') + + def test_should_be_able_to_get_schema_from_query(self): + schema, pages = self.sut.run_query('SELECT 1') + self.assertTrue(schema is not None) + + def test_should_be_able_to_get_results_from_query(self): + schema, pages = self.sut.run_query('SELECT 1') + self.assertTrue(pages is not None) + + +class GBQUnitTests(tm.TestCase): def setUp(self): test_requirements() @@ -212,6 +278,41 @@ def test_that_parse_data_works_properly(self): correct_output = DataFrame({'VALID_STRING': ['PI']}) tm.assert_frame_equal(test_output, correct_output) + def test_read_gbq_with_invalid_private_key_json_should_fail(self): + with tm.assertRaises(gbq.InvalidPrivateKeyFormat): + gbq.read_gbq('SELECT 1', project_id='x', private_key='y') + + def test_read_gbq_with_empty_private_key_json_should_fail(self): + with tm.assertRaises(gbq.InvalidPrivateKeyFormat): + gbq.read_gbq('SELECT 1', project_id='x', private_key='{}') + + def test_read_gbq_with_private_key_json_wrong_types_should_fail(self): + with tm.assertRaises(gbq.InvalidPrivateKeyFormat): + gbq.read_gbq( + 'SELECT 1', project_id='x', + private_key='{ "client_email" : 1, "private_key" : True }') + + def test_read_gbq_with_empty_private_key_file_should_fail(self): + from tempfile import mkstemp + from os import remove + _, empty_file = mkstemp() + try: + with tm.assertRaises(gbq.InvalidPrivateKeyFormat): + gbq.read_gbq('SELECT 1', project_id='x', + private_key=empty_file) + finally: + remove(empty_file) + + def test_read_gbq_with_corrupted_private_key_json_should_fail(self): + if not PRIVATE_KEY_JSON_CONTENTS: + raise nose.SkipTest("Cannot run without private key json content") + + import re + with tm.assertRaises(gbq.InvalidPrivateKeyFormat): + gbq.read_gbq( + 'SELECT 1', project_id='x', + private_key=re.sub('[a-z]', '9', PRIVATE_KEY_JSON_CONTENTS)) + class TestReadGBQIntegration(tm.TestCase): @@ -246,6 +347,24 @@ def tearDown(self): # executed. pass + def test_should_read_as_service_account_with_key_path(self): + if not PRIVATE_KEY_JSON_PATH: + raise nose.SkipTest("Cannot run integration tests without a " + "private key json path") + query = 'SELECT "PI" as VALID_STRING' + df = gbq.read_gbq(query, project_id=PROJECT_ID, + private_key=PRIVATE_KEY_JSON_PATH) + tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']})) + + def test_should_read_as_service_account_with_key_contents(self): + if not PRIVATE_KEY_JSON_CONTENTS: + raise nose.SkipTest("Cannot run integration tests without a " + "private key json contents") + query = 'SELECT "PI" as VALID_STRING' + df = gbq.read_gbq(query, project_id=PROJECT_ID, + private_key=PRIVATE_KEY_JSON_CONTENTS) + tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']})) + def test_should_properly_handle_valid_strings(self): query = 'SELECT "PI" as VALID_STRING' df = gbq.read_gbq(query, project_id=PROJECT_ID) @@ -384,11 +503,13 @@ def test_download_dataset_larger_than_200k_rows(self): def test_zero_rows(self): # Bug fix for https://github.com/pydata/pandas/issues/10273 - df = gbq.read_gbq("SELECT title, language FROM " - "[publicdata:samples.wikipedia] where " - "timestamp=-9999999", + df = gbq.read_gbq("SELECT title, id " + "FROM [publicdata:samples.wikipedia] " + "WHERE timestamp=-9999999", project_id=PROJECT_ID) - expected_result = DataFrame(columns=['title', 'language']) + page_array = np.zeros( + (0,), dtype=[('title', object), ('id', np.dtype(float))]) + expected_result = DataFrame(page_array, columns=['title', 'id']) self.assert_frame_equal(df, expected_result) @@ -439,12 +560,12 @@ def tearDown(self): def test_upload_data(self): destination_table = DESTINATION_TABLE + "1" - test_size = 1000001 + test_size = 20001 df = make_mixed_dataframe_v2(test_size) gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000) - sleep(60) # <- Curses Google!!! + sleep(30) # <- Curses Google!!! result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}" .format(destination_table), @@ -479,7 +600,7 @@ def test_upload_data_if_table_exists_append(self): # Test the if_exists parameter with value 'append' gbq.to_gbq(df, destination_table, PROJECT_ID, if_exists='append') - sleep(60) # <- Curses Google!!! + sleep(30) # <- Curses Google!!! result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}" .format(destination_table), @@ -505,7 +626,7 @@ def test_upload_data_if_table_exists_replace(self): gbq.to_gbq(df_different_schema, destination_table, PROJECT_ID, if_exists='replace') - sleep(60) # <- Curses Google!!! + sleep(30) # <- Curses Google!!! result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}" .format(destination_table), @@ -619,6 +740,131 @@ def test_dataset_does_not_exist(self): self.assertTrue(not self.dataset.exists( DATASET_ID + "_not_found"), 'Expected dataset not to exist') + +class TestToGBQIntegrationServiceAccountKeyPath(tm.TestCase): + # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 + # As a workaround to this issue, each test should use a unique table name. + # Make sure to modify the for loop range in the tearDownClass when a new + # test is added + # See `Issue 191 + # `__ + + @classmethod + def setUpClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *BEFORE* + # executing *ALL* tests described below. + + if not PROJECT_ID: + raise nose.SkipTest( + "Cannot run integration tests without a project id") + if not PRIVATE_KEY_JSON_PATH: + raise nose.SkipTest( + "Cannot run integration tests without private key json path") + + test_requirements() + clean_gbq_environment(PRIVATE_KEY_JSON_PATH) + + def setUp(self): + # - PER-TEST FIXTURES - + # put here any instruction you want to be run *BEFORE* *EVERY* test + # is executed. + pass + + @classmethod + def tearDownClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *AFTER* + # executing all tests. + + clean_gbq_environment(PRIVATE_KEY_JSON_PATH) + + def tearDown(self): + # - PER-TEST FIXTURES - + # put here any instructions you want to be run *AFTER* *EVERY* test + # is executed. + pass + + def test_upload_data_as_service_account_with_key_path(self): + destination_table = DESTINATION_TABLE + "11" + + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + + gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000, + private_key=PRIVATE_KEY_JSON_PATH) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq( + "SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), + project_id=PROJECT_ID, + private_key=PRIVATE_KEY_JSON_PATH) + + self.assertEqual(result['NUM_ROWS'][0], test_size) + + +class TestToGBQIntegrationServiceAccountKeyContents(tm.TestCase): + # Changes to BigQuery table schema may take up to 2 minutes as of May 2015 + # As a workaround to this issue, each test should use a unique table name. + # Make sure to modify the for loop range in the tearDownClass when a new + # test is added + # See `Issue 191 + # `__ + + @classmethod + def setUpClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *BEFORE* + # executing *ALL* tests described below. + + if not PROJECT_ID: + raise nose.SkipTest( + "Cannot run integration tests without a project id") + if not PRIVATE_KEY_JSON_CONTENTS: + raise nose.SkipTest("Cannot run integration tests without " + "private key json contents") + + test_requirements() + clean_gbq_environment(PRIVATE_KEY_JSON_CONTENTS) + + def setUp(self): + # - PER-TEST FIXTURES - + # put here any instruction you want to be run *BEFORE* *EVERY* test + # is executed. + pass + + @classmethod + def tearDownClass(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *AFTER* + # executing all tests. + + clean_gbq_environment(PRIVATE_KEY_JSON_CONTENTS) + + def tearDown(self): + # - PER-TEST FIXTURES - + # put here any instructions you want to be run *AFTER* *EVERY* test + # is executed. + pass + + def test_upload_data_as_service_account_with_key_contents(self): + destination_table = DESTINATION_TABLE + "12" + + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + + gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000, + private_key=PRIVATE_KEY_JSON_CONTENTS) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq( + "SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), + project_id=PROJECT_ID, + private_key=PRIVATE_KEY_JSON_CONTENTS) + self.assertEqual(result['NUM_ROWS'][0], test_size) + if __name__ == '__main__': nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'], exit=False) From b3e80d867ca03f4830d83acf4ec6f0f590559130 Mon Sep 17 00:00:00 2001 From: tworec Date: Mon, 1 Feb 2016 11:18:25 +0100 Subject: [PATCH 2/2] style fixes --- doc/source/io.rst | 7 +-- doc/source/whatsnew/v0.18.0.txt | 3 +- pandas/io/gbq.py | 4 +- pandas/io/tests/test_gbq.py | 78 +++++++++++++++------------------ 4 files changed, 42 insertions(+), 50 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index 34839412a5fd5..2b9be8e2850fc 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4112,12 +4112,13 @@ Additional information on service accounts can be found .. note:: -The `'private_key'` parameter can be set to either the file path of the service account key in JSON format, or -key contents of the service account key in JSON format. + The `'private_key'` parameter can be set to either the file path of the service account key + in JSON format, or key contents of the service account key in JSON format. .. note:: -A private key can be obtained from the Google developers console by clicking `here `__. Use JSON key type. + A private key can be obtained from the Google developers console by clicking + `here `__. Use JSON key type. Querying diff --git a/doc/source/whatsnew/v0.18.0.txt b/doc/source/whatsnew/v0.18.0.txt index 9f623ec90c75e..877aaf597ab54 100644 --- a/doc/source/whatsnew/v0.18.0.txt +++ b/doc/source/whatsnew/v0.18.0.txt @@ -152,8 +152,7 @@ Other enhancements - ``Series`` gained an ``is_unique`` attribute (:issue:`11946`) - ``DataFrame.quantile`` and ``Series.quantile`` now accept ``interpolation`` keyword (:issue:`10174`). - ``DataFrame.select_dtypes`` now allows the ``np.float16`` typecode (:issue:`11990`) -- Added Google ``BigQuery`` service account authentication support, which enables authentication on remote servers. (:issue:`11881`) - For further details see :ref:`here ` +- Added Google ``BigQuery`` service account authentication support, which enables authentication on remote servers. (:issue:`11881`). For further details see :ref:`here ` .. _whatsnew_0180.enhancements.rounding: diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 136bc041decec..9fd99a49aa92f 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -722,7 +722,7 @@ def __init__(self, project_id, dataset_id, reauth=False, verbose=False, from apiclient.errors import HttpError self.http_error = HttpError self.dataset_id = dataset_id - GbqConnector.__init__(self, project_id, reauth, verbose, private_key) + super(_Table, self).__init__(project_id, reauth, verbose, private_key) def exists(self, table_id): """ Check if a table exists in Google BigQuery @@ -821,7 +821,7 @@ def __init__(self, project_id, reauth=False, verbose=False, private_key=None): from apiclient.errors import HttpError self.http_error = HttpError - GbqConnector.__init__(self, project_id, reauth, verbose, private_key) + super(_Dataset, self).__init__(project_id, reauth, verbose, private_key) def exists(self, dataset_id): """ Check if a dataset exists in Google BigQuery diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index 6a37c5ea6ef7a..4c3d19aad4d09 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -1,3 +1,4 @@ +import re from datetime import datetime import nose import pytz @@ -31,6 +32,24 @@ _HTTPLIB2_INSTALLED = False _SETUPTOOLS_INSTALLED = False +def _skip_if_no_project_id(): + if not PROJECT_ID: + raise nose.SkipTest( + "Cannot run integration tests without a project id") + +def _skip_if_no_private_key_path(): + if not PRIVATE_KEY_JSON_PATH: + raise nose.SkipTest("Cannot run integration tests without a " + "private key json file path") + +def _skip_if_no_private_key_contents(): + if not PRIVATE_KEY_JSON_CONTENTS: + raise nose.SkipTest("Cannot run integration tests without a " + "private key json contents") + + _skip_if_no_project_id() + _skip_if_no_private_key_path() + _skip_if_no_private_key_contents() def _test_imports(): global _GOOGLE_API_CLIENT_INSTALLED, _GOOGLE_API_CLIENT_VALID_VERSION, \ @@ -142,9 +161,7 @@ class TestGBQConnectorIntegration(tm.TestCase): def setUp(self): test_requirements() - if not PROJECT_ID: - raise nose.SkipTest( - "Cannot run integration tests without a project id") + _skip_if_no_project_id() self.sut = gbq.GbqConnector(PROJECT_ID) @@ -173,9 +190,8 @@ class TestGBQConnectorServiceAccountKeyPathIntegration(tm.TestCase): def setUp(self): test_requirements() - if not PROJECT_ID or not PRIVATE_KEY_JSON_PATH: - raise nose.SkipTest("Cannot run integration tests without " - "a project id and private key json path") + _skip_if_no_project_id() + _skip_if_no_private_key_path() self.sut = gbq.GbqConnector(PROJECT_ID, private_key=PRIVATE_KEY_JSON_PATH) @@ -205,9 +221,8 @@ class TestGBQConnectorServiceAccountKeyContentsIntegration(tm.TestCase): def setUp(self): test_requirements() - if not PROJECT_ID or not PRIVATE_KEY_JSON_CONTENTS: - raise nose.SkipTest("Cannot run integration tests without " - "a project id and private key json contents") + _skip_if_no_project_id() + _skip_if_no_private_key_contents() self.sut = gbq.GbqConnector(PROJECT_ID, private_key=PRIVATE_KEY_JSON_CONTENTS) @@ -293,21 +308,14 @@ def test_read_gbq_with_private_key_json_wrong_types_should_fail(self): private_key='{ "client_email" : 1, "private_key" : True }') def test_read_gbq_with_empty_private_key_file_should_fail(self): - from tempfile import mkstemp - from os import remove - _, empty_file = mkstemp() - try: + with tm.ensure_clean() as empty_file_path: with tm.assertRaises(gbq.InvalidPrivateKeyFormat): gbq.read_gbq('SELECT 1', project_id='x', - private_key=empty_file) - finally: - remove(empty_file) + private_key=empty_file_path) def test_read_gbq_with_corrupted_private_key_json_should_fail(self): - if not PRIVATE_KEY_JSON_CONTENTS: - raise nose.SkipTest("Cannot run without private key json content") + _skip_if_no_private_key_contents() - import re with tm.assertRaises(gbq.InvalidPrivateKeyFormat): gbq.read_gbq( 'SELECT 1', project_id='x', @@ -322,9 +330,7 @@ def setUpClass(cls): # put here any instruction you want to execute only *ONCE* *BEFORE* # executing *ALL* tests described below. - if not PROJECT_ID: - raise nose.SkipTest( - "Cannot run integration tests without a project id") + _skip_if_no_project_id() test_requirements() @@ -348,18 +354,14 @@ def tearDown(self): pass def test_should_read_as_service_account_with_key_path(self): - if not PRIVATE_KEY_JSON_PATH: - raise nose.SkipTest("Cannot run integration tests without a " - "private key json path") + _skip_if_no_private_key_path() query = 'SELECT "PI" as VALID_STRING' df = gbq.read_gbq(query, project_id=PROJECT_ID, private_key=PRIVATE_KEY_JSON_PATH) tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']})) def test_should_read_as_service_account_with_key_contents(self): - if not PRIVATE_KEY_JSON_CONTENTS: - raise nose.SkipTest("Cannot run integration tests without a " - "private key json contents") + _skip_if_no_private_key_contents() query = 'SELECT "PI" as VALID_STRING' df = gbq.read_gbq(query, project_id=PROJECT_ID, private_key=PRIVATE_KEY_JSON_CONTENTS) @@ -526,9 +528,7 @@ def setUpClass(cls): # put here any instruction you want to execute only *ONCE* *BEFORE* # executing *ALL* tests described below. - if not PROJECT_ID: - raise nose.SkipTest( - "Cannot run integration tests without a project id") + _skip_if_no_project_id() test_requirements() clean_gbq_environment() @@ -755,12 +755,8 @@ def setUpClass(cls): # put here any instruction you want to execute only *ONCE* *BEFORE* # executing *ALL* tests described below. - if not PROJECT_ID: - raise nose.SkipTest( - "Cannot run integration tests without a project id") - if not PRIVATE_KEY_JSON_PATH: - raise nose.SkipTest( - "Cannot run integration tests without private key json path") + _skip_if_no_project_id() + _skip_if_no_private_key_path() test_requirements() clean_gbq_environment(PRIVATE_KEY_JSON_PATH) @@ -818,12 +814,8 @@ def setUpClass(cls): # put here any instruction you want to execute only *ONCE* *BEFORE* # executing *ALL* tests described below. - if not PROJECT_ID: - raise nose.SkipTest( - "Cannot run integration tests without a project id") - if not PRIVATE_KEY_JSON_CONTENTS: - raise nose.SkipTest("Cannot run integration tests without " - "private key json contents") + _skip_if_no_project_id() + _skip_if_no_private_key_contents() test_requirements() clean_gbq_environment(PRIVATE_KEY_JSON_CONTENTS)