diff --git a/doc/source/io.rst b/doc/source/io.rst
index e301e353071d9..2b9be8e2850fc 100644
--- a/doc/source/io.rst
+++ b/doc/source/io.rst
@@ -4093,6 +4093,34 @@ The key functions are:
.. _io.bigquery_reader:
+
+Authentication
+''''''''''''''
+
+Authentication is possible with either user account credentials or service account credentials.
+
+Authenticating with user account credentials is as simple as following the prompts in a browser window
+which will be automatically opened for you. You will be authenticated to the specified
+``BigQuery`` account via Google's ``Oauth2`` mechanism. Additional information on the
+authentication mechanism can be found `here `__.
+
+Authentication with service account credentials is possible via the `'private_key'` parameter. This method
+is particularly useful when working on remote servers (eg. jupyter iPython notebook on remote host).
+The remote authentication using user account credentials is not currently supported in Pandas.
+Additional information on service accounts can be found
+`here `__.
+
+.. note::
+
+ The `'private_key'` parameter can be set to either the file path of the service account key
+ in JSON format, or key contents of the service account key in JSON format.
+
+.. note::
+
+ A private key can be obtained from the Google developers console by clicking
+ `here `__. Use JSON key type.
+
+
Querying
''''''''
@@ -4107,12 +4135,6 @@ into a DataFrame using the :func:`~pandas.io.gbq.read_gbq` function.
data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', projectid)
-You will then be authenticated to the specified BigQuery account
-via Google's Oauth2 mechanism. In general, this is as simple as following the
-prompts in a browser window which will be opened for you. Should the browser not
-be available, or fail to launch, a code will be provided to complete the process
-manually. Additional information on the authentication mechanism can be found
-`here `__.
You can define which column from BigQuery to use as an index in the
destination DataFrame as well as a preferred column order as follows:
@@ -4125,7 +4147,7 @@ destination DataFrame as well as a preferred column order as follows:
.. note::
- You can find your project id in the `BigQuery management console `__.
+ You can find your project id in the `Google developers console `__.
.. note::
@@ -4134,6 +4156,7 @@ destination DataFrame as well as a preferred column order as follows:
.. _io.bigquery_writer:
+
Writing DataFrames
''''''''''''''''''
diff --git a/doc/source/whatsnew/v0.18.0.txt b/doc/source/whatsnew/v0.18.0.txt
index ccdc48bc1dbbb..877aaf597ab54 100644
--- a/doc/source/whatsnew/v0.18.0.txt
+++ b/doc/source/whatsnew/v0.18.0.txt
@@ -152,6 +152,8 @@ Other enhancements
- ``Series`` gained an ``is_unique`` attribute (:issue:`11946`)
- ``DataFrame.quantile`` and ``Series.quantile`` now accept ``interpolation`` keyword (:issue:`10174`).
- ``DataFrame.select_dtypes`` now allows the ``np.float16`` typecode (:issue:`11990`)
+- Added Google ``BigQuery`` service account authentication support, which enables authentication on remote servers. (:issue:`11881`). For further details see :ref:`here `
+
.. _whatsnew_0180.enhancements.rounding:
diff --git a/pandas/core/frame.py b/pandas/core/frame.py
index 907da619b1875..8cf73caeac32e 100644
--- a/pandas/core/frame.py
+++ b/pandas/core/frame.py
@@ -859,7 +859,7 @@ def to_dict(self, orient='dict'):
raise ValueError("orient '%s' not understood" % orient)
def to_gbq(self, destination_table, project_id, chunksize=10000,
- verbose=True, reauth=False, if_exists='fail'):
+ verbose=True, reauth=False, if_exists='fail', private_key=None):
"""Write a DataFrame to a Google BigQuery table.
THIS IS AN EXPERIMENTAL LIBRARY
@@ -883,6 +883,10 @@ def to_gbq(self, destination_table, project_id, chunksize=10000,
'fail': If table exists, do nothing.
'replace': If table exists, drop it, recreate it, and insert data.
'append': If table exists, insert data. Create if does not exist.
+ private_key : str (optional)
+ Service account private key in JSON format. Can be file path
+ or string contents. This is useful for remote server
+ authentication (eg. jupyter iPython notebook on remote host)
.. versionadded:: 0.17.0
"""
@@ -890,7 +894,7 @@ def to_gbq(self, destination_table, project_id, chunksize=10000,
from pandas.io import gbq
return gbq.to_gbq(self, destination_table, project_id=project_id,
chunksize=chunksize, verbose=verbose, reauth=reauth,
- if_exists=if_exists)
+ if_exists=if_exists, private_key=private_key)
@classmethod
def from_records(cls, data, index=None, exclude=None, columns=None,
diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py
index 4bf46f199c34a..9fd99a49aa92f 100644
--- a/pandas/io/gbq.py
+++ b/pandas/io/gbq.py
@@ -4,6 +4,8 @@
import logging
from time import sleep
import uuid
+import time
+import sys
import numpy as np
@@ -39,10 +41,33 @@ def _check_google_client_version():
.format(google_api_minimum_version,
_GOOGLE_API_CLIENT_VERSION))
+
+def _test_google_api_imports():
+
+ try:
+ import httplib2
+ from apiclient.discovery import build
+ from apiclient.errors import HttpError
+ from oauth2client.client import AccessTokenRefreshError
+ from oauth2client.client import OAuth2WebServerFlow
+ from oauth2client.client import SignedJwtAssertionCredentials
+ from oauth2client.file import Storage
+ from oauth2client.tools import run_flow, argparser
+ except ImportError as e:
+ raise ImportError("Missing module required for Google BigQuery "
+ "support: {0}".format(str(e)))
+
logger = logging.getLogger('pandas.io.gbq')
logger.setLevel(logging.ERROR)
+class InvalidPrivateKeyFormat(PandasError, ValueError):
+ """
+ Raised when provided private key has invalid format.
+ """
+ pass
+
+
class AccessDenied(PandasError, ValueError):
"""
Raised when invalid credentials are provided, or tokens have expired.
@@ -114,39 +139,35 @@ class TableCreationError(PandasError, ValueError):
class GbqConnector(object):
+ scope = 'https://www.googleapis.com/auth/bigquery'
- def __init__(self, project_id, reauth=False):
- self.test_google_api_imports()
+ def __init__(self, project_id, reauth=False, verbose=False,
+ private_key=None):
+ _check_google_client_version()
+ _test_google_api_imports()
self.project_id = project_id
self.reauth = reauth
+ self.verbose = verbose
+ self.private_key = private_key
self.credentials = self.get_credentials()
- self.service = self.get_service(self.credentials)
-
- def test_google_api_imports(self):
- try:
- import httplib2 # noqa
- from apiclient.discovery import build # noqa
- from apiclient.errors import HttpError # noqa
- from oauth2client.client import AccessTokenRefreshError # noqa
- from oauth2client.client import OAuth2WebServerFlow # noqa
- from oauth2client.file import Storage # noqa
- from oauth2client.tools import run_flow, argparser # noqa
- except ImportError as e:
- raise ImportError("Missing module required for Google BigQuery "
- "support: {0}".format(str(e)))
+ self.service = self.get_service()
def get_credentials(self):
+ if self.private_key:
+ return self.get_service_account_credentials()
+ else:
+ return self.get_user_account_credentials()
+
+ def get_user_account_credentials(self):
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.file import Storage
from oauth2client.tools import run_flow, argparser
- _check_google_client_version()
-
flow = OAuth2WebServerFlow(
client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd'
'.apps.googleusercontent.com'),
client_secret='kOc9wMptUtxkcIFbtZCcrEAc',
- scope='https://www.googleapis.com/auth/bigquery',
+ scope=self.scope,
redirect_uri='urn:ietf:wg:oauth:2.0:oob')
storage = Storage('bigquery_credentials.dat')
@@ -157,15 +178,71 @@ def get_credentials(self):
return credentials
+ def get_service_account_credentials(self):
+ from oauth2client.client import SignedJwtAssertionCredentials
+ from os.path import isfile
+
+ try:
+ if isfile(self.private_key):
+ with open(self.private_key) as f:
+ json_key = json.loads(f.read())
+ else:
+ # ugly hack: 'private_key' field has new lines inside,
+ # they break json parser, but we need to preserve them
+ json_key = json.loads(self.private_key.replace('\n', ' '))
+ json_key['private_key'] = json_key['private_key'].replace(
+ ' ', '\n')
+
+ if compat.PY3:
+ json_key['private_key'] = bytes(
+ json_key['private_key'], 'UTF-8')
+
+ return SignedJwtAssertionCredentials(
+ json_key['client_email'],
+ json_key['private_key'],
+ self.scope,
+ )
+ except (KeyError, ValueError, TypeError, AttributeError):
+ raise InvalidPrivateKeyFormat(
+ "Private key is missing or invalid. It should be service "
+ "account private key JSON (file path or string contents) "
+ "with at least two keys: 'client_email' and 'private_key'. "
+ "Can be obtained from: https://console.developers.google."
+ "com/permissions/serviceaccounts")
+
+ def _print(self, msg, end='\n'):
+ if self.verbose:
+ sys.stdout.write(msg + end)
+ sys.stdout.flush()
+
+ def _start_timer(self):
+ self.start = time.time()
+
+ def get_elapsed_seconds(self):
+ return round(time.time() - self.start, 2)
+
+ def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.',
+ overlong=7):
+ sec = self.get_elapsed_seconds()
+ if sec > overlong:
+ self._print('{} {} {}'.format(prefix, sec, postfix))
+
+ # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
@staticmethod
- def get_service(credentials):
+ def sizeof_fmt(num, suffix='b'):
+ fmt = "%3.1f %s%s"
+ for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']:
+ if abs(num) < 1024.0:
+ return fmt % (num, unit, suffix)
+ num /= 1024.0
+ return fmt % (num, 'Y', suffix)
+
+ def get_service(self):
import httplib2
from apiclient.discovery import build
- _check_google_client_version()
-
http = httplib2.Http()
- http = credentials.authorize(http)
+ http = self.credentials.authorize(http)
bigquery_service = build('bigquery', 'v2', http=http)
return bigquery_service
@@ -188,8 +265,7 @@ def process_http_error(ex):
raise GenericGBQException(errors)
- @staticmethod
- def process_insert_errors(insert_errors, verbose):
+ def process_insert_errors(self, insert_errors):
for insert_error in insert_errors:
row = insert_error['index']
errors = insert_error.get('errors', None)
@@ -202,8 +278,8 @@ def process_insert_errors(insert_errors, verbose):
.format(row, reason, location, message))
# Report all error messages if verbose is set
- if verbose:
- print(error_message)
+ if self.verbose:
+ self._print(error_message)
else:
raise StreamingInsertError(error_message +
'\nEnable verbose logging to '
@@ -211,7 +287,7 @@ def process_insert_errors(insert_errors, verbose):
raise StreamingInsertError
- def run_query(self, query, verbose=True):
+ def run_query(self, query):
from apiclient.errors import HttpError
from oauth2client.client import AccessTokenRefreshError
@@ -228,21 +304,27 @@ def run_query(self, query, verbose=True):
}
}
+ self._start_timer()
try:
+ self._print('Requesting query... ', end="")
query_reply = job_collection.insert(
projectId=self.project_id, body=job_data).execute()
- except AccessTokenRefreshError:
- raise AccessDenied("The credentials have been revoked or expired, "
- "please re-run the application "
- "to re-authorize")
+ self._print('ok.\nQuery running...')
+ except (AccessTokenRefreshError, ValueError):
+ if self.private_key:
+ raise AccessDenied(
+ "The service account credentials are not valid")
+ else:
+ raise AccessDenied(
+ "The credentials have been revoked or expired, "
+ "please re-run the application to re-authorize")
except HttpError as ex:
self.process_http_error(ex)
job_reference = query_reply['jobReference']
while not query_reply.get('jobComplete', False):
- if verbose:
- print('Waiting for job to complete...')
+ self.print_elapsed_seconds(' Elapsed', 's. Waiting...')
try:
query_reply = job_collection.getQueryResults(
projectId=job_reference['projectId'],
@@ -250,6 +332,17 @@ def run_query(self, query, verbose=True):
except HttpError as ex:
self.process_http_error(ex)
+ if self.verbose:
+ if query_reply['cacheHit']:
+ self._print('Query done.\nCache hit.\n')
+ else:
+ bytes_processed = int(query_reply.get(
+ 'totalBytesProcessed', '0'))
+ self._print('Query done.\nProcessed: {}\n'.format(
+ self.sizeof_fmt(bytes_processed)))
+
+ self._print('Retrieving results...')
+
total_rows = int(query_reply['totalRows'])
result_pages = list()
seen_page_tokens = list()
@@ -262,6 +355,15 @@ def run_query(self, query, verbose=True):
page = query_reply['rows']
result_pages.append(page)
current_row += len(page)
+
+ self.print_elapsed_seconds(
+ ' Got page: {}; {}% done. Elapsed'.format(
+ len(result_pages),
+ round(100.0 * current_row / total_rows)))
+
+ if current_row == total_rows:
+ break
+
page_token = query_reply.get('pageToken', None)
if not page_token and current_row < total_rows:
@@ -285,18 +387,21 @@ def run_query(self, query, verbose=True):
if current_row < total_rows:
raise InvalidPageToken()
+ # print basic query stats
+ self._print('Got {} rows.\n'.format(total_rows))
+
return schema, result_pages
- def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):
+ def load_data(self, dataframe, dataset_id, table_id, chunksize):
from apiclient.errors import HttpError
job_id = uuid.uuid4().hex
rows = []
remaining_rows = len(dataframe)
- if verbose:
+ if self.verbose:
total_rows = remaining_rows
- print("\n\n")
+ self._print("\n\n")
for index, row in dataframe.reset_index(drop=True).iterrows():
row_dict = dict()
@@ -308,9 +413,8 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):
remaining_rows -= 1
if (len(rows) % chunksize == 0) or (remaining_rows == 0):
- if verbose:
- print("\rStreaming Insert is {0}% Complete".format(
- ((total_rows - remaining_rows) * 100) / total_rows))
+ self._print("\rStreaming Insert is {0}% Complete".format(
+ ((total_rows - remaining_rows) * 100) / total_rows))
body = {'rows': rows}
@@ -335,13 +439,12 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose):
insert_errors = response.get('insertErrors', None)
if insert_errors:
- self.process_insert_errors(insert_errors, verbose)
+ self.process_insert_errors(insert_errors)
sleep(1) # Maintains the inserts "per second" rate per API
rows = []
- if verbose:
- print("\n")
+ self._print("\n")
def verify_schema(self, dataset_id, table_id, schema):
from apiclient.errors import HttpError
@@ -356,8 +459,7 @@ def verify_schema(self, dataset_id, table_id, schema):
except HttpError as ex:
self.process_http_error(ex)
- def delete_and_recreate_table(self, dataset_id, table_id,
- table_schema, verbose):
+ def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
delay = 0
# Changes to table schema may take up to 2 minutes as of May 2015 See
@@ -367,12 +469,12 @@ def delete_and_recreate_table(self, dataset_id, table_id,
# be a 120 second delay
if not self.verify_schema(dataset_id, table_id, table_schema):
- if verbose:
- print('The existing table has a different schema. '
- 'Please wait 2 minutes. See Google BigQuery issue #191')
+ self._print('The existing table has a different schema. Please '
+ 'wait 2 minutes. See Google BigQuery issue #191')
delay = 120
- table = _Table(self.project_id, dataset_id)
+ table = _Table(self.project_id, dataset_id,
+ private_key=self.private_key)
table.delete(table_id)
table.create(table_id, table_schema)
sleep(delay)
@@ -418,7 +520,7 @@ def _parse_entry(field_value, field_type):
def read_gbq(query, project_id=None, index_col=None, col_order=None,
- reauth=False, verbose=True):
+ reauth=False, verbose=True, private_key=None):
"""Load data from Google BigQuery.
THIS IS AN EXPERIMENTAL LIBRARY
@@ -446,6 +548,10 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
if multiple accounts are used.
verbose : boolean (default True)
Verbose output
+ private_key : str (optional)
+ Service account private key in JSON format. Can be file path
+ or string contents. This is useful for remote server
+ authentication (eg. jupyter iPython notebook on remote host)
Returns
-------
@@ -457,8 +563,9 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
if not project_id:
raise TypeError("Missing required parameter: project_id")
- connector = GbqConnector(project_id, reauth=reauth)
- schema, pages = connector.run_query(query, verbose=verbose)
+ connector = GbqConnector(project_id, reauth=reauth, verbose=verbose,
+ private_key=private_key)
+ schema, pages = connector.run_query(query)
dataframe_list = []
while len(pages) > 0:
page = pages.pop()
@@ -492,11 +599,18 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
# if there are no NaN's. This is presently due to a
# limitation of numpy in handling missing data.
final_df._data = final_df._data.downcast(dtypes='infer')
+
+ connector.print_elapsed_seconds(
+ 'Total time taken',
+ datetime.now().strftime('s.\nFinished at %Y-%m-%d %H:%M:%S.'),
+ 0
+ )
+
return final_df
def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
- verbose=True, reauth=False, if_exists='fail'):
+ verbose=True, reauth=False, if_exists='fail', private_key=None):
"""Write a DataFrame to a Google BigQuery table.
THIS IS AN EXPERIMENTAL LIBRARY
@@ -520,6 +634,10 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
'fail': If table exists, do nothing.
'replace': If table exists, drop it, recreate it, and insert data.
'append': If table exists, insert data. Create if does not exist.
+ private_key : str (optional)
+ Service account private key in JSON format. Can be file path
+ or string contents. This is useful for remote server
+ authentication (eg. jupyter iPython notebook on remote host)
"""
if if_exists not in ('fail', 'replace', 'append'):
@@ -529,10 +647,12 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
raise NotFoundException(
"Invalid Table Name. Should be of the form 'datasetId.tableId' ")
- connector = GbqConnector(project_id, reauth=reauth)
+ connector = GbqConnector(project_id, reauth=reauth, verbose=verbose,
+ private_key=private_key)
dataset_id, table_id = destination_table.rsplit('.', 1)
- table = _Table(project_id, dataset_id, reauth=reauth)
+ table = _Table(project_id, dataset_id, reauth=reauth,
+ private_key=private_key)
table_schema = _generate_bq_schema(dataframe)
@@ -545,7 +665,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
"append or replace data.")
elif if_exists == 'replace':
connector.delete_and_recreate_table(
- dataset_id, table_id, table_schema, verbose)
+ dataset_id, table_id, table_schema)
elif if_exists == 'append':
if not connector.verify_schema(dataset_id, table_id, table_schema):
raise InvalidSchema("Please verify that the column order, "
@@ -555,7 +675,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
else:
table.create(table_id, table_schema)
- connector.load_data(dataframe, dataset_id, table_id, chunksize, verbose)
+ connector.load_data(dataframe, dataset_id, table_id, chunksize)
def generate_bq_schema(df, default_type='STRING'):
@@ -597,15 +717,12 @@ def _generate_bq_schema(df, default_type='STRING'):
class _Table(GbqConnector):
- def __init__(self, project_id, dataset_id, reauth=False):
+ def __init__(self, project_id, dataset_id, reauth=False, verbose=False,
+ private_key=None):
from apiclient.errors import HttpError
- self.test_google_api_imports()
- self.project_id = project_id
- self.reauth = reauth
- self.credentials = self.get_credentials()
- self.service = self.get_service(self.credentials)
self.http_error = HttpError
self.dataset_id = dataset_id
+ super(_Table, self).__init__(project_id, reauth, verbose, private_key)
def exists(self, table_id):
""" Check if a table exists in Google BigQuery
@@ -653,8 +770,10 @@ def create(self, table_id, schema):
raise TableCreationError(
"The table could not be created because it already exists")
- if not _Dataset(self.project_id).exists(self.dataset_id):
- _Dataset(self.project_id).create(self.dataset_id)
+ if not _Dataset(self.project_id,
+ private_key=self.private_key).exists(self.dataset_id):
+ _Dataset(self.project_id,
+ private_key=self.private_key).create(self.dataset_id)
body = {
'schema': schema,
@@ -698,14 +817,11 @@ def delete(self, table_id):
class _Dataset(GbqConnector):
- def __init__(self, project_id, reauth=False):
+ def __init__(self, project_id, reauth=False, verbose=False,
+ private_key=None):
from apiclient.errors import HttpError
- self.test_google_api_imports()
- self.project_id = project_id
- self.reauth = reauth
- self.credentials = self.get_credentials()
- self.service = self.get_service(self.credentials)
self.http_error = HttpError
+ super(_Dataset, self).__init__(project_id, reauth, verbose, private_key)
def exists(self, dataset_id):
""" Check if a dataset exists in Google BigQuery
diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py
index 88a1e3e0a5cc3..4c3d19aad4d09 100644
--- a/pandas/io/tests/test_gbq.py
+++ b/pandas/io/tests/test_gbq.py
@@ -1,3 +1,4 @@
+import re
from datetime import datetime
import nose
import pytz
@@ -16,6 +17,9 @@
import pandas.util.testing as tm
PROJECT_ID = None
+PRIVATE_KEY_JSON_PATH = None
+PRIVATE_KEY_JSON_CONTENTS = None
+
DATASET_ID = 'pydata_pandas_bq_testing'
TABLE_ID = 'new_test'
DESTINATION_TABLE = "{0}.{1}".format(DATASET_ID + "1", TABLE_ID)
@@ -28,6 +32,24 @@
_HTTPLIB2_INSTALLED = False
_SETUPTOOLS_INSTALLED = False
+def _skip_if_no_project_id():
+ if not PROJECT_ID:
+ raise nose.SkipTest(
+ "Cannot run integration tests without a project id")
+
+def _skip_if_no_private_key_path():
+ if not PRIVATE_KEY_JSON_PATH:
+ raise nose.SkipTest("Cannot run integration tests without a "
+ "private key json file path")
+
+def _skip_if_no_private_key_contents():
+ if not PRIVATE_KEY_JSON_CONTENTS:
+ raise nose.SkipTest("Cannot run integration tests without a "
+ "private key json contents")
+
+ _skip_if_no_project_id()
+ _skip_if_no_private_key_path()
+ _skip_if_no_private_key_contents()
def _test_imports():
global _GOOGLE_API_CLIENT_INSTALLED, _GOOGLE_API_CLIENT_VALID_VERSION, \
@@ -49,8 +71,9 @@ def _test_imports():
from apiclient.discovery import build # noqa
from apiclient.errors import HttpError # noqa
- from oauth2client.client import OAuth2WebServerFlow # noqa
- from oauth2client.client import AccessTokenRefreshError # noqa
+ from oauth2client.client import OAuth2WebServerFlow
+ from oauth2client.client import AccessTokenRefreshError
+ from oauth2client.client import SignedJwtAssertionCredentials
from oauth2client.file import Storage # noqa
from oauth2client.tools import run_flow # noqa
@@ -96,13 +119,13 @@ def test_requirements():
raise nose.SkipTest(import_exception)
-def clean_gbq_environment():
- dataset = gbq._Dataset(PROJECT_ID)
+def clean_gbq_environment(private_key=None):
+ dataset = gbq._Dataset(PROJECT_ID, private_key=private_key)
for i in range(1, 10):
if DATASET_ID + str(i) in dataset.datasets():
dataset_id = DATASET_ID + str(i)
- table = gbq._Table(PROJECT_ID, dataset_id)
+ table = gbq._Table(PROJECT_ID, dataset_id, private_key=private_key)
for j in range(1, 20):
if TABLE_ID + str(j) in dataset.tables(dataset_id):
table.delete(TABLE_ID + str(j))
@@ -138,9 +161,7 @@ class TestGBQConnectorIntegration(tm.TestCase):
def setUp(self):
test_requirements()
- if not PROJECT_ID:
- raise nose.SkipTest(
- "Cannot run integration tests without a project id")
+ _skip_if_no_project_id()
self.sut = gbq.GbqConnector(PROJECT_ID)
@@ -153,8 +174,38 @@ def test_should_be_able_to_get_valid_credentials(self):
self.assertFalse(credentials.invalid, 'Returned credentials invalid')
def test_should_be_able_to_get_a_bigquery_service(self):
+ bigquery_service = self.sut.get_service()
+ self.assertTrue(bigquery_service is not None, 'No service returned')
+
+ def test_should_be_able_to_get_schema_from_query(self):
+ schema, pages = self.sut.run_query('SELECT 1')
+ self.assertTrue(schema is not None)
+
+ def test_should_be_able_to_get_results_from_query(self):
+ schema, pages = self.sut.run_query('SELECT 1')
+ self.assertTrue(pages is not None)
+
+
+class TestGBQConnectorServiceAccountKeyPathIntegration(tm.TestCase):
+ def setUp(self):
+ test_requirements()
+
+ _skip_if_no_project_id()
+ _skip_if_no_private_key_path()
+
+ self.sut = gbq.GbqConnector(PROJECT_ID,
+ private_key=PRIVATE_KEY_JSON_PATH)
+
+ def test_should_be_able_to_make_a_connector(self):
+ self.assertTrue(self.sut is not None,
+ 'Could not create a GbqConnector')
+
+ def test_should_be_able_to_get_valid_credentials(self):
credentials = self.sut.get_credentials()
- bigquery_service = self.sut.get_service(credentials)
+ self.assertFalse(credentials.invalid, 'Returned credentials invalid')
+
+ def test_should_be_able_to_get_a_bigquery_service(self):
+ bigquery_service = self.sut.get_service()
self.assertTrue(bigquery_service is not None, 'No service returned')
def test_should_be_able_to_get_schema_from_query(self):
@@ -166,8 +217,38 @@ def test_should_be_able_to_get_results_from_query(self):
self.assertTrue(pages is not None)
-class TestReadGBQUnitTests(tm.TestCase):
+class TestGBQConnectorServiceAccountKeyContentsIntegration(tm.TestCase):
+ def setUp(self):
+ test_requirements()
+
+ _skip_if_no_project_id()
+ _skip_if_no_private_key_contents()
+
+ self.sut = gbq.GbqConnector(PROJECT_ID,
+ private_key=PRIVATE_KEY_JSON_CONTENTS)
+
+ def test_should_be_able_to_make_a_connector(self):
+ self.assertTrue(self.sut is not None,
+ 'Could not create a GbqConnector')
+ def test_should_be_able_to_get_valid_credentials(self):
+ credentials = self.sut.get_credentials()
+ self.assertFalse(credentials.invalid, 'Returned credentials invalid')
+
+ def test_should_be_able_to_get_a_bigquery_service(self):
+ bigquery_service = self.sut.get_service()
+ self.assertTrue(bigquery_service is not None, 'No service returned')
+
+ def test_should_be_able_to_get_schema_from_query(self):
+ schema, pages = self.sut.run_query('SELECT 1')
+ self.assertTrue(schema is not None)
+
+ def test_should_be_able_to_get_results_from_query(self):
+ schema, pages = self.sut.run_query('SELECT 1')
+ self.assertTrue(pages is not None)
+
+
+class GBQUnitTests(tm.TestCase):
def setUp(self):
test_requirements()
@@ -212,6 +293,34 @@ def test_that_parse_data_works_properly(self):
correct_output = DataFrame({'VALID_STRING': ['PI']})
tm.assert_frame_equal(test_output, correct_output)
+ def test_read_gbq_with_invalid_private_key_json_should_fail(self):
+ with tm.assertRaises(gbq.InvalidPrivateKeyFormat):
+ gbq.read_gbq('SELECT 1', project_id='x', private_key='y')
+
+ def test_read_gbq_with_empty_private_key_json_should_fail(self):
+ with tm.assertRaises(gbq.InvalidPrivateKeyFormat):
+ gbq.read_gbq('SELECT 1', project_id='x', private_key='{}')
+
+ def test_read_gbq_with_private_key_json_wrong_types_should_fail(self):
+ with tm.assertRaises(gbq.InvalidPrivateKeyFormat):
+ gbq.read_gbq(
+ 'SELECT 1', project_id='x',
+ private_key='{ "client_email" : 1, "private_key" : True }')
+
+ def test_read_gbq_with_empty_private_key_file_should_fail(self):
+ with tm.ensure_clean() as empty_file_path:
+ with tm.assertRaises(gbq.InvalidPrivateKeyFormat):
+ gbq.read_gbq('SELECT 1', project_id='x',
+ private_key=empty_file_path)
+
+ def test_read_gbq_with_corrupted_private_key_json_should_fail(self):
+ _skip_if_no_private_key_contents()
+
+ with tm.assertRaises(gbq.InvalidPrivateKeyFormat):
+ gbq.read_gbq(
+ 'SELECT 1', project_id='x',
+ private_key=re.sub('[a-z]', '9', PRIVATE_KEY_JSON_CONTENTS))
+
class TestReadGBQIntegration(tm.TestCase):
@@ -221,9 +330,7 @@ def setUpClass(cls):
# put here any instruction you want to execute only *ONCE* *BEFORE*
# executing *ALL* tests described below.
- if not PROJECT_ID:
- raise nose.SkipTest(
- "Cannot run integration tests without a project id")
+ _skip_if_no_project_id()
test_requirements()
@@ -246,6 +353,20 @@ def tearDown(self):
# executed.
pass
+ def test_should_read_as_service_account_with_key_path(self):
+ _skip_if_no_private_key_path()
+ query = 'SELECT "PI" as VALID_STRING'
+ df = gbq.read_gbq(query, project_id=PROJECT_ID,
+ private_key=PRIVATE_KEY_JSON_PATH)
+ tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']}))
+
+ def test_should_read_as_service_account_with_key_contents(self):
+ _skip_if_no_private_key_contents()
+ query = 'SELECT "PI" as VALID_STRING'
+ df = gbq.read_gbq(query, project_id=PROJECT_ID,
+ private_key=PRIVATE_KEY_JSON_CONTENTS)
+ tm.assert_frame_equal(df, DataFrame({'VALID_STRING': ['PI']}))
+
def test_should_properly_handle_valid_strings(self):
query = 'SELECT "PI" as VALID_STRING'
df = gbq.read_gbq(query, project_id=PROJECT_ID)
@@ -384,11 +505,13 @@ def test_download_dataset_larger_than_200k_rows(self):
def test_zero_rows(self):
# Bug fix for https://github.com/pydata/pandas/issues/10273
- df = gbq.read_gbq("SELECT title, language FROM "
- "[publicdata:samples.wikipedia] where "
- "timestamp=-9999999",
+ df = gbq.read_gbq("SELECT title, id "
+ "FROM [publicdata:samples.wikipedia] "
+ "WHERE timestamp=-9999999",
project_id=PROJECT_ID)
- expected_result = DataFrame(columns=['title', 'language'])
+ page_array = np.zeros(
+ (0,), dtype=[('title', object), ('id', np.dtype(float))])
+ expected_result = DataFrame(page_array, columns=['title', 'id'])
self.assert_frame_equal(df, expected_result)
@@ -405,9 +528,7 @@ def setUpClass(cls):
# put here any instruction you want to execute only *ONCE* *BEFORE*
# executing *ALL* tests described below.
- if not PROJECT_ID:
- raise nose.SkipTest(
- "Cannot run integration tests without a project id")
+ _skip_if_no_project_id()
test_requirements()
clean_gbq_environment()
@@ -439,12 +560,12 @@ def tearDown(self):
def test_upload_data(self):
destination_table = DESTINATION_TABLE + "1"
- test_size = 1000001
+ test_size = 20001
df = make_mixed_dataframe_v2(test_size)
gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000)
- sleep(60) # <- Curses Google!!!
+ sleep(30) # <- Curses Google!!!
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}"
.format(destination_table),
@@ -479,7 +600,7 @@ def test_upload_data_if_table_exists_append(self):
# Test the if_exists parameter with value 'append'
gbq.to_gbq(df, destination_table, PROJECT_ID, if_exists='append')
- sleep(60) # <- Curses Google!!!
+ sleep(30) # <- Curses Google!!!
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}"
.format(destination_table),
@@ -505,7 +626,7 @@ def test_upload_data_if_table_exists_replace(self):
gbq.to_gbq(df_different_schema, destination_table,
PROJECT_ID, if_exists='replace')
- sleep(60) # <- Curses Google!!!
+ sleep(30) # <- Curses Google!!!
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}"
.format(destination_table),
@@ -619,6 +740,123 @@ def test_dataset_does_not_exist(self):
self.assertTrue(not self.dataset.exists(
DATASET_ID + "_not_found"), 'Expected dataset not to exist')
+
+class TestToGBQIntegrationServiceAccountKeyPath(tm.TestCase):
+ # Changes to BigQuery table schema may take up to 2 minutes as of May 2015
+ # As a workaround to this issue, each test should use a unique table name.
+ # Make sure to modify the for loop range in the tearDownClass when a new
+ # test is added
+ # See `Issue 191
+ # `__
+
+ @classmethod
+ def setUpClass(cls):
+ # - GLOBAL CLASS FIXTURES -
+ # put here any instruction you want to execute only *ONCE* *BEFORE*
+ # executing *ALL* tests described below.
+
+ _skip_if_no_project_id()
+ _skip_if_no_private_key_path()
+
+ test_requirements()
+ clean_gbq_environment(PRIVATE_KEY_JSON_PATH)
+
+ def setUp(self):
+ # - PER-TEST FIXTURES -
+ # put here any instruction you want to be run *BEFORE* *EVERY* test
+ # is executed.
+ pass
+
+ @classmethod
+ def tearDownClass(cls):
+ # - GLOBAL CLASS FIXTURES -
+ # put here any instruction you want to execute only *ONCE* *AFTER*
+ # executing all tests.
+
+ clean_gbq_environment(PRIVATE_KEY_JSON_PATH)
+
+ def tearDown(self):
+ # - PER-TEST FIXTURES -
+ # put here any instructions you want to be run *AFTER* *EVERY* test
+ # is executed.
+ pass
+
+ def test_upload_data_as_service_account_with_key_path(self):
+ destination_table = DESTINATION_TABLE + "11"
+
+ test_size = 10
+ df = make_mixed_dataframe_v2(test_size)
+
+ gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000,
+ private_key=PRIVATE_KEY_JSON_PATH)
+
+ sleep(30) # <- Curses Google!!!
+
+ result = gbq.read_gbq(
+ "SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table),
+ project_id=PROJECT_ID,
+ private_key=PRIVATE_KEY_JSON_PATH)
+
+ self.assertEqual(result['NUM_ROWS'][0], test_size)
+
+
+class TestToGBQIntegrationServiceAccountKeyContents(tm.TestCase):
+ # Changes to BigQuery table schema may take up to 2 minutes as of May 2015
+ # As a workaround to this issue, each test should use a unique table name.
+ # Make sure to modify the for loop range in the tearDownClass when a new
+ # test is added
+ # See `Issue 191
+ # `__
+
+ @classmethod
+ def setUpClass(cls):
+ # - GLOBAL CLASS FIXTURES -
+ # put here any instruction you want to execute only *ONCE* *BEFORE*
+ # executing *ALL* tests described below.
+
+ _skip_if_no_project_id()
+ _skip_if_no_private_key_contents()
+
+ test_requirements()
+ clean_gbq_environment(PRIVATE_KEY_JSON_CONTENTS)
+
+ def setUp(self):
+ # - PER-TEST FIXTURES -
+ # put here any instruction you want to be run *BEFORE* *EVERY* test
+ # is executed.
+ pass
+
+ @classmethod
+ def tearDownClass(cls):
+ # - GLOBAL CLASS FIXTURES -
+ # put here any instruction you want to execute only *ONCE* *AFTER*
+ # executing all tests.
+
+ clean_gbq_environment(PRIVATE_KEY_JSON_CONTENTS)
+
+ def tearDown(self):
+ # - PER-TEST FIXTURES -
+ # put here any instructions you want to be run *AFTER* *EVERY* test
+ # is executed.
+ pass
+
+ def test_upload_data_as_service_account_with_key_contents(self):
+ destination_table = DESTINATION_TABLE + "12"
+
+ test_size = 10
+ df = make_mixed_dataframe_v2(test_size)
+
+ gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000,
+ private_key=PRIVATE_KEY_JSON_CONTENTS)
+
+ sleep(30) # <- Curses Google!!!
+
+ result = gbq.read_gbq(
+ "SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table),
+ project_id=PROJECT_ID,
+ private_key=PRIVATE_KEY_JSON_CONTENTS)
+ self.assertEqual(result['NUM_ROWS'][0], test_size)
+
if __name__ == '__main__':
nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'],
exit=False)