Skip to content

Add ability to set the allowLargeResults option in BigQuery #10474 #11209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4091,7 +4091,7 @@ destination DataFrame as well as a preferred column order as follows:

data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table',
index_col='index_column_name',
col_order=['col1', 'col2', 'col3'], projectid)
col_order=['col1', 'col2', 'col3'], projectid=projectid)

.. note::

Expand All @@ -4102,6 +4102,45 @@ destination DataFrame as well as a preferred column order as follows:

You can toggle the verbose output via the ``verbose`` flag which defaults to ``True``.

You can send the query results directly to a table in BigQuery by setting the ``destination_table`` argument.

For example,

.. code-block:: python

df.read_gbq('SELECT * FROM test_dataset.test_table', project_id=projectid, destination_table='my_dataset.my_table')

.. note::

When the ``destination_table`` argument is set, an empty dataframe will be returned.

.. note::

The destination table and destination dataset will automatically be created if they do not already exist.

The ``if_exists`` argument can be used to dictate whether to ``'fail'``, ``'replace'``
or ``'append'`` if the destination table already exists when using the ``destination_table`` argument.
The default value is ``'fail'``.

For example, assume that ``if_exists`` is set to ``'fail'``. The following snippet will raise
a ``TableCreationError`` if the destination table already exists.

.. code-block:: python

df.read_gbq('SELECT * FROM test_dataset.test_table',
project_id=projectid,
destination_table='my_dataset.my_table',
if_exists='fail')

.. note::

If you plan to run a query that may return larger results, you can set the ``allow_large_results`` argument
which defaults to ``False``. Setting the ``allow_large_results`` argument will effectively set the BigQuery
``'allowLargeResults'`` option to true in the BigQuery job configuration.

Queries that return large results will take longer to execute, even if the result set is small,
and are subject to `additional limitations <https://cloud.google.com/bigquery/querying-data?hl=en#largequeryresults>`__.

Writing DataFrames
''''''''''''''''''

Expand Down
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v0.17.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ Google BigQuery Enhancements
- ``InvalidColumnOrder`` and ``InvalidPageToken`` in the gbq module will raise ``ValueError`` instead of ``IOError``.
- The ``generate_bq_schema()`` function is now deprecated and will be removed in a future version (:issue:`11121`)
- Update the gbq module to support Python 3 (:issue:`11094`).
- Modify :func:`pandas.io.gbq.read_gbq()` to allow users to redirect the query results to a destination table via the `destination_table` parameter. See the :ref:`docs <io.bigquery>` for more details (:issue:`11209`)
- Modify :func:`pandas.io.gbq.read_gbq()` to allow users to allow users to set the ``'allowLargeResults'`` option in the BigQuery job configuration via the ``allow_large_results`` parameter. (:issue:`11209`)

.. _whatsnew_0170.enhancements.other:

Expand Down
65 changes: 62 additions & 3 deletions pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def process_insert_errors(insert_errors, verbose):

raise StreamingInsertError

def run_query(self, query, verbose=True):
def run_query(self, query, verbose=True, destination_table=None, if_exists='fail', allow_large_results=False):
from apiclient.errors import HttpError
from oauth2client.client import AccessTokenRefreshError

Expand All @@ -211,6 +211,48 @@ def run_query(self, query, verbose=True):
}
}

if destination_table:
if if_exists not in ('fail', 'replace', 'append'):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if '.' not in destination_table:
raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ")

dataset_id, table_id = destination_table.rsplit('.', 1)

job_data['configuration']['query'] = {
'query': query,
'destinationTable': {
"projectId": self.project_id,
"datasetId": dataset_id,
"tableId": table_id,
},
'writeDisposition': 'WRITE_EMPTY', # A 'duplicate' error is returned in the job result if table exists
'allowLargeResults': 'false' # If true, allows the query to produce large result tables.
}

if allow_large_results:
job_data['configuration']['query']['allowLargeResults'] = 'true'

table = _Table(self.project_id, dataset_id)

if table.exists(table_id):
if if_exists == 'fail':
raise TableCreationError("Could not create the table because it already exists. "
"Change the if_exists parameter to append or replace data.")
elif if_exists == 'replace':
# If the table already exists, instruct BigQuery to overwrite the table data.
job_data['configuration']['query']['writeDisposition'] = 'WRITE_TRUNCATE'
elif if_exists == 'append':
# If the table already exists, instruct BigQuery to append to the table data.
job_data['configuration']['query']['writeDisposition'] = 'WRITE_APPEND'

dataset = _Dataset(self.project_id)

# create the destination dataset if it does not already exist
if not dataset.exists(dataset_id):
dataset.create(dataset_id)

try:
query_reply = job_collection.insert(projectId=self.project_id, body=job_data).execute()
except AccessTokenRefreshError:
Expand All @@ -237,6 +279,9 @@ def run_query(self, query, verbose=True):
# Only read schema on first page
schema = query_reply['schema']

if destination_table:
return schema, list()

# Loop through each page of data
while 'rows' in query_reply and current_row < total_rows:
page = query_reply['rows']
Expand Down Expand Up @@ -385,7 +430,8 @@ def _parse_entry(field_value, field_type):
return field_value


def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True):
def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True,
destination_table=None, if_exists='fail', allow_large_results=False):
"""Load data from Google BigQuery.

THIS IS AN EXPERIMENTAL LIBRARY
Expand All @@ -412,6 +458,15 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals
if multiple accounts are used.
verbose : boolean (default True)
Verbose output
destination_table : string
Name of table to be written, in the form 'dataset.tablename'
if_exists : {'fail', 'replace', 'append'}, default 'fail'
'fail': If table exists, do nothing.
'replace': If table exists, drop it, recreate it, and insert data.
'append': If table exists, insert data. Create if does not exist.
allow_large_results : boolean (default False)
Enables the Google BigQuery allowLargeResults option which is necessary for
queries that may return larger results.

Returns
-------
Expand All @@ -424,7 +479,11 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals
raise TypeError("Missing required parameter: project_id")

connector = GbqConnector(project_id, reauth=reauth)
schema, pages = connector.run_query(query, verbose=verbose)
schema, pages = connector.run_query(query,
verbose=verbose,
destination_table=destination_table,
if_exists=if_exists,
allow_large_results=allow_large_results)
dataframe_list = []
while len(pages) > 0:
page = pages.pop()
Expand Down
85 changes: 80 additions & 5 deletions pandas/io/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,28 +204,32 @@ class TestReadGBQIntegration(tm.TestCase):
@classmethod
def setUpClass(cls):
# - GLOBAL CLASS FIXTURES -
# put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests
# described below.
# put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests
# described below.

if not PROJECT_ID:
raise nose.SkipTest("Cannot run integration tests without a project id")

test_requirements()
clean_gbq_environment()

gbq._Dataset(PROJECT_ID).create(DATASET_ID + "7")

def setUp(self):
# - PER-TEST FIXTURES -
# put here any instruction you want to be run *BEFORE* *EVERY* test is executed.
# put here any instruction you want to be run *BEFORE* *EVERY* test is executed.
pass

@classmethod
def tearDownClass(cls):
# - GLOBAL CLASS FIXTURES -
# put here any instruction you want to execute only *ONCE* *AFTER* executing all tests.
pass

clean_gbq_environment()

def tearDown(self):
# - PER-TEST FIXTURES -
# put here any instructions you want to be run *AFTER* *EVERY* test is executed.
# put here any instructions you want to be run *AFTER* *EVERY* test is executed.
pass

def test_should_properly_handle_valid_strings(self):
Expand Down Expand Up @@ -357,6 +361,77 @@ def test_zero_rows(self):
expected_result = DataFrame(columns=['title', 'language'])
self.assert_frame_equal(df, expected_result)

def test_redirect_query_results_to_destination_table_default(self):
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "1")
test_size = 100

gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table)
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
self.assertEqual(result['NUM_ROWS'][0], test_size)

def test_redirect_query_results_to_destination_table_if_table_exists_fail(self):
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "2")
test_size = 100

# Test redirecting the query results to a destination table without specifying the if_exists parameter
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table)
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
self.assertEqual(result['NUM_ROWS'][0], test_size)

# Confirm that the default action is to to fail if the table exists and if_exists parameter is not provided
with tm.assertRaises(gbq.TableCreationError):
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table)

# Test the if_exists parameter with value 'fail'
with tm.assertRaises(gbq.TableCreationError):
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table, if_exists='fail')

def test_redirect_query_results_to_destination_table_if_table_exists_append(self):
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "3")
test_size = 100

# Initialize table with sample data
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table)

# Test the if_exists parameter with value 'append'
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table, if_exists='append')

result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
self.assertEqual(result['NUM_ROWS'][0], test_size * 2)

# Try redirecting data an existing table with different schema, confirm failure
with tm.assertRaises(gbq.GenericGBQException):
gbq.read_gbq("SELECT title FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table, if_exists='append')

def test_redirect_query_results_to_destination_table_if_table_exists_replace(self):
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "4")
test_size = 100

# Initialize table with sample data
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table)

# Test the if_exists parameter with the value 'replace'
gbq.read_gbq("SELECT title FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table, if_exists='replace')

result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
self.assertEqual(result['NUM_ROWS'][0], test_size)

def test_redirect_query_results_to_destination_table_dataset_does_not_exist(self):
destination_table = "{0}.{1}".format(DATASET_ID + "8", TABLE_ID + "5")
test_size = 100
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
destination_table=destination_table)
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
self.assertEqual(result['NUM_ROWS'][0], test_size)

class TestToGBQIntegration(tm.TestCase):
# Changes to BigQuery table schema may take up to 2 minutes as of May 2015
Expand Down