diff --git a/doc/source/io.rst b/doc/source/io.rst index 9def8be621aed..cf1ae24366fa8 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4091,7 +4091,7 @@ destination DataFrame as well as a preferred column order as follows: data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table', index_col='index_column_name', - col_order=['col1', 'col2', 'col3'], projectid) + col_order=['col1', 'col2', 'col3'], projectid=projectid) .. note:: @@ -4102,6 +4102,45 @@ destination DataFrame as well as a preferred column order as follows: You can toggle the verbose output via the ``verbose`` flag which defaults to ``True``. +You can send the query results directly to a table in BigQuery by setting the ``destination_table`` argument. + +For example, + +.. code-block:: python + + df.read_gbq('SELECT * FROM test_dataset.test_table', project_id=projectid, destination_table='my_dataset.my_table') + +.. note:: + + When the ``destination_table`` argument is set, an empty dataframe will be returned. + +.. note:: + + The destination table and destination dataset will automatically be created if they do not already exist. + +The ``if_exists`` argument can be used to dictate whether to ``'fail'``, ``'replace'`` +or ``'append'`` if the destination table already exists when using the ``destination_table`` argument. +The default value is ``'fail'``. + +For example, assume that ``if_exists`` is set to ``'fail'``. The following snippet will raise +a ``TableCreationError`` if the destination table already exists. + +.. code-block:: python + + df.read_gbq('SELECT * FROM test_dataset.test_table', + project_id=projectid, + destination_table='my_dataset.my_table', + if_exists='fail') + +.. note:: + + If you plan to run a query that may return larger results, you can set the ``allow_large_results`` argument + which defaults to ``False``. Setting the ``allow_large_results`` argument will effectively set the BigQuery + ``'allowLargeResults'`` option to true in the BigQuery job configuration. + + Queries that return large results will take longer to execute, even if the result set is small, + and are subject to `additional limitations `__. + Writing DataFrames '''''''''''''''''' diff --git a/doc/source/whatsnew/v0.17.0.txt b/doc/source/whatsnew/v0.17.0.txt index 9990d2bd1c78d..9facde9e82c88 100644 --- a/doc/source/whatsnew/v0.17.0.txt +++ b/doc/source/whatsnew/v0.17.0.txt @@ -333,6 +333,8 @@ Google BigQuery Enhancements - ``InvalidColumnOrder`` and ``InvalidPageToken`` in the gbq module will raise ``ValueError`` instead of ``IOError``. - The ``generate_bq_schema()`` function is now deprecated and will be removed in a future version (:issue:`11121`) - Update the gbq module to support Python 3 (:issue:`11094`). +- Modify :func:`pandas.io.gbq.read_gbq()` to allow users to redirect the query results to a destination table via the `destination_table` parameter. See the :ref:`docs ` for more details (:issue:`11209`) +- Modify :func:`pandas.io.gbq.read_gbq()` to allow users to allow users to set the ``'allowLargeResults'`` option in the BigQuery job configuration via the ``allow_large_results`` parameter. (:issue:`11209`) .. _whatsnew_0170.enhancements.other: diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index e9568db06f391..d19b6ee456f81 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -195,7 +195,7 @@ def process_insert_errors(insert_errors, verbose): raise StreamingInsertError - def run_query(self, query, verbose=True): + def run_query(self, query, verbose=True, destination_table=None, if_exists='fail', allow_large_results=False): from apiclient.errors import HttpError from oauth2client.client import AccessTokenRefreshError @@ -211,6 +211,48 @@ def run_query(self, query, verbose=True): } } + if destination_table: + if if_exists not in ('fail', 'replace', 'append'): + raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) + + if '.' not in destination_table: + raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ") + + dataset_id, table_id = destination_table.rsplit('.', 1) + + job_data['configuration']['query'] = { + 'query': query, + 'destinationTable': { + "projectId": self.project_id, + "datasetId": dataset_id, + "tableId": table_id, + }, + 'writeDisposition': 'WRITE_EMPTY', # A 'duplicate' error is returned in the job result if table exists + 'allowLargeResults': 'false' # If true, allows the query to produce large result tables. + } + + if allow_large_results: + job_data['configuration']['query']['allowLargeResults'] = 'true' + + table = _Table(self.project_id, dataset_id) + + if table.exists(table_id): + if if_exists == 'fail': + raise TableCreationError("Could not create the table because it already exists. " + "Change the if_exists parameter to append or replace data.") + elif if_exists == 'replace': + # If the table already exists, instruct BigQuery to overwrite the table data. + job_data['configuration']['query']['writeDisposition'] = 'WRITE_TRUNCATE' + elif if_exists == 'append': + # If the table already exists, instruct BigQuery to append to the table data. + job_data['configuration']['query']['writeDisposition'] = 'WRITE_APPEND' + + dataset = _Dataset(self.project_id) + + # create the destination dataset if it does not already exist + if not dataset.exists(dataset_id): + dataset.create(dataset_id) + try: query_reply = job_collection.insert(projectId=self.project_id, body=job_data).execute() except AccessTokenRefreshError: @@ -237,6 +279,9 @@ def run_query(self, query, verbose=True): # Only read schema on first page schema = query_reply['schema'] + if destination_table: + return schema, list() + # Loop through each page of data while 'rows' in query_reply and current_row < total_rows: page = query_reply['rows'] @@ -385,7 +430,8 @@ def _parse_entry(field_value, field_type): return field_value -def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True): +def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True, + destination_table=None, if_exists='fail', allow_large_results=False): """Load data from Google BigQuery. THIS IS AN EXPERIMENTAL LIBRARY @@ -412,6 +458,15 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals if multiple accounts are used. verbose : boolean (default True) Verbose output + destination_table : string + Name of table to be written, in the form 'dataset.tablename' + if_exists : {'fail', 'replace', 'append'}, default 'fail' + 'fail': If table exists, do nothing. + 'replace': If table exists, drop it, recreate it, and insert data. + 'append': If table exists, insert data. Create if does not exist. + allow_large_results : boolean (default False) + Enables the Google BigQuery allowLargeResults option which is necessary for + queries that may return larger results. Returns ------- @@ -424,7 +479,11 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals raise TypeError("Missing required parameter: project_id") connector = GbqConnector(project_id, reauth=reauth) - schema, pages = connector.run_query(query, verbose=verbose) + schema, pages = connector.run_query(query, + verbose=verbose, + destination_table=destination_table, + if_exists=if_exists, + allow_large_results=allow_large_results) dataframe_list = [] while len(pages) > 0: page = pages.pop() diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index cc1e901d8f119..8b55594044696 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -204,28 +204,32 @@ class TestReadGBQIntegration(tm.TestCase): @classmethod def setUpClass(cls): # - GLOBAL CLASS FIXTURES - - # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests - # described below. + # put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests + # described below. if not PROJECT_ID: raise nose.SkipTest("Cannot run integration tests without a project id") test_requirements() + clean_gbq_environment() + + gbq._Dataset(PROJECT_ID).create(DATASET_ID + "7") def setUp(self): # - PER-TEST FIXTURES - - # put here any instruction you want to be run *BEFORE* *EVERY* test is executed. + # put here any instruction you want to be run *BEFORE* *EVERY* test is executed. pass @classmethod def tearDownClass(cls): # - GLOBAL CLASS FIXTURES - # put here any instruction you want to execute only *ONCE* *AFTER* executing all tests. - pass + + clean_gbq_environment() def tearDown(self): # - PER-TEST FIXTURES - - # put here any instructions you want to be run *AFTER* *EVERY* test is executed. + # put here any instructions you want to be run *AFTER* *EVERY* test is executed. pass def test_should_properly_handle_valid_strings(self): @@ -357,6 +361,77 @@ def test_zero_rows(self): expected_result = DataFrame(columns=['title', 'language']) self.assert_frame_equal(df, expected_result) + def test_redirect_query_results_to_destination_table_default(self): + destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "1") + test_size = 100 + + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table) + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], test_size) + + def test_redirect_query_results_to_destination_table_if_table_exists_fail(self): + destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "2") + test_size = 100 + + # Test redirecting the query results to a destination table without specifying the if_exists parameter + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table) + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], test_size) + + # Confirm that the default action is to to fail if the table exists and if_exists parameter is not provided + with tm.assertRaises(gbq.TableCreationError): + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table) + + # Test the if_exists parameter with value 'fail' + with tm.assertRaises(gbq.TableCreationError): + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table, if_exists='fail') + + def test_redirect_query_results_to_destination_table_if_table_exists_append(self): + destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "3") + test_size = 100 + + # Initialize table with sample data + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table) + + # Test the if_exists parameter with value 'append' + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table, if_exists='append') + + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], test_size * 2) + + # Try redirecting data an existing table with different schema, confirm failure + with tm.assertRaises(gbq.GenericGBQException): + gbq.read_gbq("SELECT title FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table, if_exists='append') + + def test_redirect_query_results_to_destination_table_if_table_exists_replace(self): + destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "4") + test_size = 100 + + # Initialize table with sample data + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table) + + # Test the if_exists parameter with the value 'replace' + gbq.read_gbq("SELECT title FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table, if_exists='replace') + + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], test_size) + + def test_redirect_query_results_to_destination_table_dataset_does_not_exist(self): + destination_table = "{0}.{1}".format(DATASET_ID + "8", TABLE_ID + "5") + test_size = 100 + gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID, + destination_table=destination_table) + result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID) + self.assertEqual(result['NUM_ROWS'][0], test_size) class TestToGBQIntegration(tm.TestCase): # Changes to BigQuery table schema may take up to 2 minutes as of May 2015