diff --git a/doc/source/api.rst b/doc/source/api.rst index b1fe77c298d71..3445f9263101e 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -110,10 +110,7 @@ Google BigQuery read_gbq to_gbq - generate_bq_schema - create_table - delete_table - table_exists + .. currentmodule:: pandas diff --git a/doc/source/io.rst b/doc/source/io.rst index 6affbedad3ae2..ce8064b302c3b 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4005,14 +4005,10 @@ The key functions are: .. currentmodule:: pandas.io.gbq .. autosummary:: - :toctree: generated/ + :toctree: generated/ - read_gbq - to_gbq - generate_bq_schema - create_table - delete_table - table_exists + read_gbq + to_gbq .. currentmodule:: pandas @@ -4078,8 +4074,7 @@ Assume we want to write a DataFrame ``df`` into a BigQuery table using :func:`~p .. note:: - If the destination table does not exist, a new table will be created. The - destination dataset id must already exist in order for a new table to be created. + The destination table and destination dataset will automatically be created if they do not already exist. The ``if_exists`` argument can be used to dictate whether to ``'fail'``, ``'replace'`` or ``'append'`` if the destination table already exists. The default value is ``'fail'``. @@ -4146,19 +4141,13 @@ For example: often as the service seems to be changing and evolving. BiqQuery is best for analyzing large sets of data quickly, but it is not a direct replacement for a transactional database. - Creating BigQuery Tables '''''''''''''''''''''''' -As of 0.17.0, the gbq module has a function :func:`~pandas.io.gbq.create_table` which allows users -to create a table in BigQuery. The only requirement is that the dataset must already exist. -The schema may be generated from a pandas DataFrame using the :func:`~pandas.io.gbq.generate_bq_schema` function below. - -For example: - -.. code-block:: python +.. warning:: - gbq.create_table('my_dataset.my_table', schema, projectid) + As of 0.17, the function :func:`~pandas.io.gbq.generate_bq_schema` has been deprecated and will be + removed in a future version. As of 0.15.2, the gbq module has a function :func:`~pandas.io.gbq.generate_bq_schema` which will produce the dictionary representation schema of the specified pandas DataFrame. @@ -4174,31 +4163,6 @@ produce the dictionary representation schema of the specified pandas DataFrame. {'name': 'my_int64', 'type': 'INTEGER'}, {'name': 'my_string', 'type': 'STRING'}]} -Deleting BigQuery Tables -'''''''''''''''''''''''' - -As of 0.17.0, the gbq module has a function :func:`~pandas.io.gbq.delete_table` which allows users to delete a table -in Google BigQuery. - -For example: - -.. code-block:: python - - gbq.delete_table('my_dataset.my_table', projectid) - -The following function can be used to check whether a table exists prior to calling ``table_exists``: - -:func:`~pandas.io.gbq.table_exists`. - -The return value will be of type boolean. - -For example: - -.. code-block:: python - - In [12]: gbq.table_exists('my_dataset.my_table', projectid) - Out[12]: True - .. note:: If you delete and re-create a BigQuery table with the same name, but different table schema, diff --git a/doc/source/whatsnew/v0.17.0.txt b/doc/source/whatsnew/v0.17.0.txt index 3d4d113940dec..4df0f4a33d196 100644 --- a/doc/source/whatsnew/v0.17.0.txt +++ b/doc/source/whatsnew/v0.17.0.txt @@ -328,8 +328,8 @@ has been changed to make this keyword unnecessary - the change is shown below. Google BigQuery Enhancements ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - Added ability to automatically create a table using the :func:`pandas.io.gbq.to_gbq` function if destination table does not exist. (:issue:`8325`). +- Added ability to automatically create a dataset using the :func:`pandas.io.gbq.to_gbq` function if destination dataset does not exist. (:issue:`11121`). - Added ability to replace an existing table and schema when calling the :func:`pandas.io.gbq.to_gbq` function via the ``if_exists`` argument. See the :ref:`docs ` for more details (:issue:`8325`). -- Added the following functions to the gbq module: :func:`pandas.io.gbq.table_exists`, :func:`pandas.io.gbq.create_table`, and :func:`pandas.io.gbq.delete_table`. See the :ref:`docs ` for more details (:issue:`8325`). - ``InvalidColumnOrder`` and ``InvalidPageToken`` in the gbq module will raise ``ValueError`` instead of ``IOError``. .. _whatsnew_0170.enhancements.other: diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 37e7cb944814a..91eb11f5192af 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -1,7 +1,6 @@ from datetime import datetime import json import logging -import sys from time import sleep import uuid @@ -12,6 +11,7 @@ from pandas.core.api import DataFrame from pandas.tools.merge import concat from pandas.core.common import PandasError +from pandas.util.decorators import deprecate def _check_google_client_version(): @@ -41,6 +41,13 @@ class AccessDenied(PandasError, ValueError): pass +class DatasetCreationError(PandasError, ValueError): + """ + Raised when the create dataset method fails + """ + pass + + class GenericGBQException(PandasError, ValueError): """ Raised when an unrecognized Google API Error occurs. @@ -65,7 +72,6 @@ class InvalidPageToken(PandasError, ValueError): pass - class InvalidSchema(PandasError, ValueError): """ Raised when the provided DataFrame does @@ -85,7 +91,8 @@ class NotFoundException(PandasError, ValueError): class StreamingInsertError(PandasError, ValueError): """ Raised when BigQuery reports a streaming insert error. - For more information see `Streaming Data Into BigQuery `__ + For more information see `Streaming Data Into BigQuery + `__ """ @@ -137,7 +144,8 @@ def get_credentials(self): return credentials - def get_service(self, credentials): + @staticmethod + def get_service(credentials): import httplib2 from apiclient.discovery import build @@ -149,7 +157,8 @@ def get_service(self, credentials): return bigquery_service - def process_http_error(self, ex): + @staticmethod + def process_http_error(ex): # See `BigQuery Troubleshooting Errors `__ status = json.loads(ex.content)['error'] @@ -164,7 +173,8 @@ def process_http_error(self, ex): raise GenericGBQException(errors) - def process_insert_errors(self, insert_errors, verbose): + @staticmethod + def process_insert_errors(insert_errors, verbose): for insert_error in insert_errors: row = insert_error['index'] errors = insert_error.get('errors', None) @@ -201,7 +211,7 @@ def run_query(self, query, verbose=True): query_reply = job_collection.insert(projectId=self.project_id, body=job_data).execute() except AccessTokenRefreshError: raise AccessDenied("The credentials have been revoked or expired, please re-run the application " - "to re-authorize") + "to re-authorize") except HttpError as ex: self.process_http_error(ex) @@ -231,7 +241,9 @@ def run_query(self, query, verbose=True): page_token = query_reply.get('pageToken', None) if not page_token and current_row < total_rows: - raise InvalidPageToken("Required pageToken was missing. Recieved {0} of {1} rows".format(current_row, total_rows)) + raise InvalidPageToken( + "Required pageToken was missing. Received {0} of {1} rows".format(current_row, + total_rows)) elif page_token in seen_page_tokens: raise InvalidPageToken("A duplicate pageToken was returned") @@ -302,21 +314,6 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize, verbose): if verbose: print("\n") - def table_exists(self, dataset_id, table_id): - from apiclient.errors import HttpError - - try: - self.service.tables().get( - projectId=self.project_id, - datasetId=dataset_id, - tableId=table_id).execute() - return True - except HttpError as ex: - if ex.resp.status == 404: - return False - else: - self.process_http_error(ex) - def verify_schema(self, dataset_id, table_id, schema): from apiclient.errors import HttpError @@ -330,40 +327,6 @@ def verify_schema(self, dataset_id, table_id, schema): except HttpError as ex: self.process_http_error(ex) - def create_table(self, dataset_id, table_id, schema): - from apiclient.errors import HttpError - - body = { - 'schema': schema, - 'tableReference': { - 'tableId': table_id, - 'projectId': self.project_id, - 'datasetId': dataset_id - } - } - - try: - self.service.tables().insert( - projectId=self.project_id, - datasetId=dataset_id, - body=body - ).execute() - except HttpError as ex: - self.process_http_error(ex) - - def delete_table(self, dataset_id, table_id): - from apiclient.errors import HttpError - - try: - self.service.tables().delete( - datasetId=dataset_id, - projectId=self.project_id, - tableId=table_id - ).execute() - - except HttpError as ex: - self.process_http_error(ex) - def delete_and_recreate_table(self, dataset_id, table_id, table_schema, verbose): delay = 0 @@ -376,9 +339,9 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema, verbose) print('The existing table has a different schema. Please wait 2 minutes. See Google BigQuery issue #191') delay = 120 - self.delete_table(dataset_id, table_id) - self.create_table(dataset_id, table_id, table_schema) - + table = _Table(self.project_id, dataset_id) + table.delete(table_id) + table.create(table_id, table_schema) sleep(delay) @@ -530,10 +493,12 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, connector = GbqConnector(project_id, reauth=reauth) dataset_id, table_id = destination_table.rsplit('.', 1) - table_schema = generate_bq_schema(dataframe) + table = _Table(project_id, dataset_id, reauth=reauth) + + table_schema = _generate_bq_schema(dataframe) # If table exists, check if_exists parameter - if connector.table_exists(dataset_id, table_id): + if table.exists(table_id): if if_exists == 'fail': raise TableCreationError("Could not create the table because it already exists. " "Change the if_exists parameter to append or replace data.") @@ -543,12 +508,12 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, if not connector.verify_schema(dataset_id, table_id, table_schema): raise InvalidSchema("The schema of the destination table does not match") else: - connector.create_table(dataset_id, table_id, table_schema) + table.create(table_id, table_schema) connector.load_data(dataframe, dataset_id, table_id, chunksize, verbose) -def generate_bq_schema(df, default_type='STRING'): +def _generate_bq_schema(df, default_type='STRING'): """ Given a passed df, generate the associated Google BigQuery schema. Parameters @@ -576,75 +541,257 @@ def generate_bq_schema(df, default_type='STRING'): return {'fields': fields} +generate_bq_schema = deprecate('generate_bq_schema', _generate_bq_schema) -def table_exists(table, project_id): - """ Check if a table exists in Google BigQuery given a table and project id - .. versionadded:: 0.17.0 +class _Table(GbqConnector): - Parameters - ---------- - table : str - Name of table to be verified, in the form 'dataset.tablename' - project_id : str - Google BigQuery Account project ID. + def __init__(self, project_id, dataset_id, reauth=False): + from apiclient.errors import HttpError + self.test_google_api_imports() + self.project_id = project_id + self.reauth = reauth + self.credentials = self.get_credentials() + self.service = self.get_service(self.credentials) + self.http_error = HttpError + self.dataset_id = dataset_id - Returns - ------- - boolean - true if table exists, otherwise false - """ + def exists(self, table_id): + """ Check if a table exists in Google BigQuery - if '.' not in table: - raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ") + .. versionadded:: 0.17.0 + + Parameters + ---------- + table : str + Name of table to be verified + + Returns + ------- + boolean + true if table exists, otherwise false + """ + + try: + self.service.tables().get( + projectId=self.project_id, + datasetId=self.dataset_id, + tableId=table_id).execute() + return True + except self.http_error as ex: + if ex.resp.status == 404: + return False + else: + self.process_http_error(ex) - connector = GbqConnector(project_id) - dataset_id, table_id = table.rsplit('.', 1) + def create(self, table_id, schema): + """ Create a table in Google BigQuery given a table and schema - return connector.table_exists(dataset_id, table_id) + .. versionadded:: 0.17.0 + Parameters + ---------- + table : str + Name of table to be written + schema : str + Use the generate_bq_schema to generate your table schema from a dataframe. + """ -def create_table(table, schema, project_id): - """ Create a table in Google BigQuery given a table, schema and project id + if self.exists(table_id): + raise TableCreationError("The table could not be created because it already exists") - .. versionadded:: 0.17.0 + if not _Dataset(self.project_id).exists(self.dataset_id): + _Dataset(self.project_id).create(self.dataset_id) - Parameters - ---------- - table : str - Name of table to be written, in the form 'dataset.tablename' - schema : str - Use the generate_bq_schema to generate your table schema from a dataframe. - project_id : str - Google BigQuery Account project ID. - """ + body = { + 'schema': schema, + 'tableReference': { + 'tableId': table_id, + 'projectId': self.project_id, + 'datasetId': self.dataset_id + } + } + + try: + self.service.tables().insert( + projectId=self.project_id, + datasetId=self.dataset_id, + body=body).execute() + except self.http_error as ex: + self.process_http_error(ex) - if table_exists(table, project_id): - raise TableCreationError("The table could not be created because it already exists") + def delete(self, table_id): + """ Delete a table in Google BigQuery - connector = GbqConnector(project_id) - dataset_id, table_id = table.rsplit('.', 1) + .. versionadded:: 0.17.0 - return connector.create_table(dataset_id, table_id, schema) + Parameters + ---------- + table : str + Name of table to be deleted + """ + if not self.exists(table_id): + raise NotFoundException("Table does not exist") -def delete_table(table, project_id): - """ Delete a table in Google BigQuery given a table and project id + try: + self.service.tables().delete( + datasetId=self.dataset_id, + projectId=self.project_id, + tableId=table_id).execute() + except self.http_error as ex: + self.process_http_error(ex) - .. versionadded:: 0.17.0 - Parameters - ---------- - table : str - Name of table to be written, in the form 'dataset.tablename' - project_id : str - Google BigQuery Account project ID. - """ +class _Dataset(GbqConnector): + + def __init__(self, project_id, reauth=False): + from apiclient.errors import HttpError + self.test_google_api_imports() + self.project_id = project_id + self.reauth = reauth + self.credentials = self.get_credentials() + self.service = self.get_service(self.credentials) + self.http_error = HttpError + + def exists(self, dataset_id): + """ Check if a dataset exists in Google BigQuery + + .. versionadded:: 0.17.0 + + Parameters + ---------- + dataset_id : str + Name of dataset to be verified + + Returns + ------- + boolean + true if dataset exists, otherwise false + """ + + try: + self.service.datasets().get( + projectId=self.project_id, + datasetId=dataset_id).execute() + return True + except self.http_error as ex: + if ex.resp.status == 404: + return False + else: + self.process_http_error(ex) + + def datasets(self): + """ Return a list of datasets in Google BigQuery - if not table_exists(table, project_id): - raise NotFoundException("Table does not exist") + .. versionadded:: 0.17.0 - connector = GbqConnector(project_id) - dataset_id, table_id = table.rsplit('.', 1) + Parameters + ---------- + None + + Returns + ------- + list + List of datasets under the specific project + """ + + try: + list_dataset_response = self.service.datasets().list( + projectId=self.project_id).execute().get('datasets', None) + + if not list_dataset_response: + return [] + + dataset_list = list() + + for row_num, raw_row in enumerate(list_dataset_response): + dataset_list.append(raw_row['datasetReference']['datasetId']) + + return dataset_list + except self.http_error as ex: + self.process_http_error(ex) + + def create(self, dataset_id): + """ Create a dataset in Google BigQuery + + .. versionadded:: 0.17.0 + + Parameters + ---------- + dataset : str + Name of dataset to be written + """ + + if self.exists(dataset_id): + raise DatasetCreationError("The dataset could not be created because it already exists") + + body = { + 'datasetReference': { + 'projectId': self.project_id, + 'datasetId': dataset_id + } + } + + try: + self.service.datasets().insert( + projectId=self.project_id, + body=body).execute() + except self.http_error as ex: + self.process_http_error(ex) + + def delete(self, dataset_id): + """ Delete a dataset in Google BigQuery + + .. versionadded:: 0.17.0 + + Parameters + ---------- + dataset : str + Name of dataset to be deleted + """ + + if not self.exists(dataset_id): + raise NotFoundException("Dataset {0} does not exist".format(dataset_id)) + + try: + self.service.datasets().delete( + datasetId=dataset_id, + projectId=self.project_id).execute() + + except self.http_error as ex: + self.process_http_error(ex) + + def tables(self, dataset_id): + """ List tables in the specific dataset in Google BigQuery + + .. versionadded:: 0.17.0 + + Parameters + ---------- + dataset : str + Name of dataset to list tables for + + Returns + ------- + list + List of tables under the specific dataset + """ + + try: + list_table_response = self.service.tables().list( + projectId=self.project_id, + datasetId=dataset_id).execute().get('tables', None) + + if not list_table_response: + return [] + + table_list = list() + + for row_num, raw_row in enumerate(list_table_response): + table_list.append(raw_row['tableReference']['tableId']) + + return table_list + except self.http_error as ex: + self.process_http_error(ex) - return connector.delete_table(dataset_id, table_id) diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index 990050b8ac544..2b3d226cd3e0b 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -1,12 +1,6 @@ -import ast from datetime import datetime -import json import nose -import os import pytz -import shutil -import subprocess -import sys import platform from time import sleep @@ -22,6 +16,9 @@ import pandas.util.testing as tm PROJECT_ID = None +DATASET_ID = 'pydata_pandas_bq_testing' +TABLE_ID = 'new_test' +DESTINATION_TABLE = "{0}.{1}".format(DATASET_ID + "1", TABLE_ID) VERSION = platform.python_version() @@ -32,14 +29,6 @@ _SETUPTOOLS_INSTALLED = False -def missing_bq(): - try: - subprocess.call(['bq', 'ls']) - return False - except OSError: - return True - - def _test_imports(): if not compat.PY3: @@ -102,14 +91,33 @@ def test_requirements(): raise nose.SkipTest(import_exception) +def clean_gbq_environment(): + dataset = gbq._Dataset(PROJECT_ID) + + for i in range(1, 10): + if DATASET_ID + str(i) in dataset.datasets(): + dataset_id = DATASET_ID + str(i) + table = gbq._Table(PROJECT_ID, dataset_id) + for j in range(1, 20): + if TABLE_ID + str(j) in dataset.tables(dataset_id): + table.delete(TABLE_ID + str(j)) + + dataset.delete(dataset_id) + + def make_mixed_dataframe_v2(test_size): # create df to test for all BQ datatypes except RECORD - bools = np.random.randint(2, size=(1,test_size)).astype(bool) + bools = np.random.randint(2, size=(1, test_size)).astype(bool) flts = np.random.randn(1, test_size) - ints = np.random.randint(1, 10, size=(1,test_size)) - strs = np.random.randint(1, 10, size=(1,test_size)).astype(str) + ints = np.random.randint(1, 10, size=(1, test_size)) + strs = np.random.randint(1, 10, size=(1, test_size)).astype(str) times = [datetime.now(pytz.timezone('US/Arizona')) for t in xrange(test_size)] - return DataFrame({'bools': bools[0], 'flts': flts[0], 'ints': ints[0], 'strs': strs[0], 'times': times[0]}, index=range(test_size)) + return DataFrame({'bools': bools[0], + 'flts': flts[0], + 'ints': ints[0], + 'strs': strs[0], + 'times': times[0]}, + index=range(test_size)) class TestGBQConnectorIntegration(tm.TestCase): @@ -194,15 +202,10 @@ def setUpClass(cls): # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests # described below. - test_requirements() - if not PROJECT_ID: raise nose.SkipTest("Cannot run integration tests without a project id") - if missing_bq(): - raise nose.SkipTest("Cannot run read_gbq tests without bq command line client") - - subprocess.call(['bq', 'mk', PROJECT_ID + ':pydata_pandas_bq_testing']) + test_requirements() def setUp(self): # - PER-TEST FIXTURES - @@ -213,13 +216,12 @@ def setUp(self): def tearDownClass(cls): # - GLOBAL CLASS FIXTURES - # put here any instruction you want to execute only *ONCE* *AFTER* executing all tests. - subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing']) + pass def tearDown(self): # - PER-TEST FIXTURES - # put here any instructions you want to be run *AFTER* *EVERY* test is executed. - if gbq.table_exists('pydata_pandas_bq_testing.new_test', PROJECT_ID): - subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing.new_test']) + pass def test_should_properly_handle_valid_strings(self): query = 'SELECT "PI" as VALID_STRING' @@ -331,14 +333,17 @@ def test_bad_table_name(self): gbq.read_gbq("SELECT * FROM [publicdata:samples.nope]", project_id=PROJECT_ID) def test_download_dataset_larger_than_200k_rows(self): + test_size = 200005 # Test for known BigQuery bug in datasets larger than 100k rows # http://stackoverflow.com/questions/19145587/bq-py-not-paging-results - df = gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] GROUP EACH BY id ORDER BY id ASC LIMIT 200005", project_id=PROJECT_ID) - self.assertEqual(len(df.drop_duplicates()), 200005) + df = gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] GROUP EACH BY id ORDER BY id ASC LIMIT {0}".format(test_size), + project_id=PROJECT_ID) + self.assertEqual(len(df.drop_duplicates()), test_size) def test_zero_rows(self): # Bug fix for https://github.com/pydata/pandas/issues/10273 - df = gbq.read_gbq("SELECT title, language FROM [publicdata:samples.wikipedia] where timestamp=-9999999", project_id=PROJECT_ID) + df = gbq.read_gbq("SELECT title, language FROM [publicdata:samples.wikipedia] where timestamp=-9999999", + project_id=PROJECT_ID) expected_result = DataFrame(columns=['title', 'language']) self.assert_frame_equal(df, expected_result) @@ -352,122 +357,118 @@ class TestToGBQIntegration(tm.TestCase): @classmethod def setUpClass(cls): # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests - # described below. - - test_requirements() + # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests + # described below. if not PROJECT_ID: raise nose.SkipTest("Cannot run integration tests without a project id") - if missing_bq(): - raise nose.SkipTest("Cannot run to_gbq tests without bq command line client") + test_requirements() + clean_gbq_environment() - subprocess.call(['bq', 'mk', PROJECT_ID + ':pydata_pandas_bq_testing']) + gbq._Dataset(PROJECT_ID).create(DATASET_ID + "1") def setUp(self): # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test is executed. - pass + # put here any instruction you want to be run *BEFORE* *EVERY* test is executed. + + self.dataset = gbq._Dataset(PROJECT_ID) + self.table = gbq._Table(PROJECT_ID, DATASET_ID + "1") @classmethod def tearDownClass(cls): # - GLOBAL CLASS FIXTURES - # put here any instruction you want to execute only *ONCE* *AFTER* executing all tests. - for i in range(1, 8): - if gbq.table_exists('pydata_pandas_bq_testing.new_test' + str(i), PROJECT_ID): - subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing.new_test' + str(i)]) - - subprocess.call(['bq', 'rm', '-f', PROJECT_ID + ':pydata_pandas_bq_testing']) + clean_gbq_environment() def tearDown(self): # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test is executed. + # put here any instructions you want to be run *AFTER* *EVERY* test is executed. pass def test_upload_data(self): - table_name = 'new_test1' + destination_table = DESTINATION_TABLE + "1" test_size = 1000001 df = make_mixed_dataframe_v2(test_size) - gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, chunksize=10000) + gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000) sleep(60) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing." + table_name, project_id=PROJECT_ID) + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), + project_id=PROJECT_ID) self.assertEqual(result['NUM_ROWS'][0], test_size) def test_upload_data_if_table_exists_fail(self): - table_name = 'new_test2' + destination_table = DESTINATION_TABLE + "2" test_size = 10 df = make_mixed_dataframe_v2(test_size) - - gbq.create_table('pydata_pandas_bq_testing.' + table_name, gbq.generate_bq_schema(df), PROJECT_ID) + self.table.create(TABLE_ID + "2", gbq._generate_bq_schema(df)) # Test the default value of if_exists is 'fail' with tm.assertRaises(gbq.TableCreationError): - gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID) + gbq.to_gbq(df, destination_table, PROJECT_ID) # Test the if_exists parameter with value 'fail' with tm.assertRaises(gbq.TableCreationError): - gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='fail') + gbq.to_gbq(df, destination_table, PROJECT_ID, if_exists='fail') def test_upload_data_if_table_exists_append(self): - table_name = 'new_test3' + destination_table = DESTINATION_TABLE + "3" test_size = 10 df = make_mixed_dataframe_v2(test_size) df_different_schema = tm.makeMixedDataFrame() # Initialize table with sample data - gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, chunksize=10000) + gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000) # Test the if_exists parameter with value 'append' - gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='append') + gbq.to_gbq(df, destination_table, PROJECT_ID, if_exists='append') sleep(60) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing." + table_name, project_id=PROJECT_ID) + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), project_id=PROJECT_ID) self.assertEqual(result['NUM_ROWS'][0], test_size * 2) # Try inserting with a different schema, confirm failure with tm.assertRaises(gbq.InvalidSchema): - gbq.to_gbq(df_different_schema, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='append') + gbq.to_gbq(df_different_schema, destination_table, PROJECT_ID, if_exists='append') def test_upload_data_if_table_exists_replace(self): - table_name = 'new_test4' + destination_table = DESTINATION_TABLE + "4" test_size = 10 df = make_mixed_dataframe_v2(test_size) df_different_schema = tm.makeMixedDataFrame() # Initialize table with sample data - gbq.to_gbq(df, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, chunksize=10000) + gbq.to_gbq(df, destination_table, PROJECT_ID, chunksize=10000) # Test the if_exists parameter with the value 'replace'. - gbq.to_gbq(df_different_schema, "pydata_pandas_bq_testing." + table_name, PROJECT_ID, if_exists='replace') + gbq.to_gbq(df_different_schema, destination_table, PROJECT_ID, if_exists='replace') sleep(60) # <- Curses Google!!! - result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM pydata_pandas_bq_testing." + table_name, project_id=PROJECT_ID) + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), project_id=PROJECT_ID) self.assertEqual(result['NUM_ROWS'][0], 5) def test_google_upload_errors_should_raise_exception(self): - table_name = 'new_test5' + destination_table = DESTINATION_TABLE + "5" test_timestamp = datetime.now(pytz.timezone('US/Arizona')) bad_df = DataFrame({'bools': [False, False], 'flts': [0.0, 1.0], 'ints': [0, '1'], 'strs': ['a', 1], 'times': [test_timestamp, test_timestamp]}, index=range(2)) with tm.assertRaises(gbq.StreamingInsertError): - gbq.to_gbq(bad_df, 'pydata_pandas_bq_testing.' + table_name, PROJECT_ID, verbose=True) + gbq.to_gbq(bad_df, destination_table, PROJECT_ID, verbose=True) - def test_generate_bq_schema(self): + def test_generate_schema(self): df = tm.makeMixedDataFrame() - schema = gbq.generate_bq_schema(df) + schema = gbq._generate_bq_schema(df) test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, {'name': 'B', 'type': 'FLOAT'}, @@ -476,40 +477,70 @@ def test_generate_bq_schema(self): self.assertEqual(schema, test_schema) - def test_create_bq_table(self): - table_name = 'new_test6' - + def test_create_table(self): + destination_table = TABLE_ID + "6" test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, {'name': 'B', 'type': 'FLOAT'}, {'name': 'C', 'type': 'STRING'}, {'name': 'D', 'type': 'TIMESTAMP'}]} - - gbq.create_table('pydata_pandas_bq_testing.' + table_name, test_schema, PROJECT_ID) - - self.assertTrue(gbq.table_exists('pydata_pandas_bq_testing.' + table_name, PROJECT_ID), 'Expected table to exist') + self.table.create(destination_table, test_schema) + self.assertTrue(self.table.exists(destination_table), 'Expected table to exist') def test_table_does_not_exist(self): - table_name = 'new_test7' - self.assertTrue(not gbq.table_exists('pydata_pandas_bq_testing.' + table_name, PROJECT_ID), - 'Expected table not to exist') - - def test_delete_bq_table(self): - table_name = 'new_test8' + self.assertTrue(not self.table.exists(TABLE_ID + "7"), 'Expected table not to exist') + def test_delete_table(self): + destination_table = TABLE_ID + "8" test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, {'name': 'B', 'type': 'FLOAT'}, {'name': 'C', 'type': 'STRING'}, {'name': 'D', 'type': 'TIMESTAMP'}]} + self.table.create(destination_table, test_schema) + self.table.delete(destination_table) + self.assertTrue(not self.table.exists(destination_table), 'Expected table not to exist') - gbq.create_table('pydata_pandas_bq_testing.' + table_name, test_schema, PROJECT_ID) - - gbq.delete_table('pydata_pandas_bq_testing.' + table_name, PROJECT_ID) - - self.assertTrue(not gbq.table_exists('pydata_pandas_bq_testing.' + table_name, PROJECT_ID), - 'Expected table not to exist') - - def test_upload_data_dataset_not_found(self): - test_size = 10 - df = make_mixed_dataframe_v2(test_size) - - with tm.assertRaises(gbq.GenericGBQException): - gbq.create_table('pydata_pandas_bq_testing2.new_test', gbq.generate_bq_schema(df), PROJECT_ID) + def test_list_table(self): + destination_table = TABLE_ID + "9" + test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, {'name': 'D', 'type': 'TIMESTAMP'}]} + self.table.create(destination_table, test_schema) + self.assertTrue(destination_table in self.dataset.tables(DATASET_ID + "1"), + 'Expected table list to contain table {0}'.format(destination_table)) + + def test_list_dataset(self): + dataset_id = DATASET_ID + "1" + self.assertTrue(dataset_id in self.dataset.datasets(), + 'Expected dataset list to contain dataset {0}'.format(dataset_id)) + + def test_list_table_zero_results(self): + dataset_id = DATASET_ID + "2" + self.dataset.create(dataset_id) + table_list = gbq._Dataset(PROJECT_ID).tables(dataset_id) + self.assertEqual(len(table_list), 0, 'Expected gbq.list_table() to return 0') + + def test_create_dataset(self): + dataset_id = DATASET_ID + "3" + self.dataset.create(dataset_id) + self.assertTrue(dataset_id in self.dataset.datasets(), 'Expected dataset to exist') + + def test_delete_dataset(self): + dataset_id = DATASET_ID + "4" + self.dataset.create(dataset_id) + self.dataset.delete(dataset_id) + self.assertTrue(dataset_id not in self.dataset.datasets(), 'Expected dataset not to exist') + + def test_dataset_exists(self): + dataset_id = DATASET_ID + "5" + self.dataset.create(dataset_id) + self.assertTrue(self.dataset.exists(dataset_id), 'Expected dataset to exist') + + def create_table_data_dataset_does_not_exist(self): + dataset_id = DATASET_ID + "6" + table_id = TABLE_ID + "1" + table_with_new_dataset = gbq._Table(PROJECT_ID, dataset_id) + df = make_mixed_dataframe_v2(10) + table_with_new_dataset.create(table_id, gbq._generate_bq_schema(df)) + self.assertTrue(self.dataset.exists(dataset_id), 'Expected dataset to exist') + self.assertTrue(table_with_new_dataset.exists(table_id), 'Expected dataset to exist') + + def test_dataset_does_not_exist(self): + self.assertTrue(not self.dataset.exists(DATASET_ID + "_not_found"), 'Expected dataset not to exist') if __name__ == '__main__': nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'],