Skip to content

Commit a4ff065

Browse files
tonypartheniouparthea
authored andcommitted
Add ability to set the allowLargeResults option in Google BigQuery
1 parent 5049b5e commit a4ff065

File tree

4 files changed

+184
-9
lines changed

4 files changed

+184
-9
lines changed

doc/source/io.rst

+40-1
Original file line numberDiff line numberDiff line change
@@ -4091,7 +4091,7 @@ destination DataFrame as well as a preferred column order as follows:
40914091
40924092
data_frame = pd.read_gbq('SELECT * FROM test_dataset.test_table',
40934093
index_col='index_column_name',
4094-
col_order=['col1', 'col2', 'col3'], projectid)
4094+
col_order=['col1', 'col2', 'col3'], projectid=projectid)
40954095
40964096
.. note::
40974097

@@ -4102,6 +4102,45 @@ destination DataFrame as well as a preferred column order as follows:
41024102

41034103
You can toggle the verbose output via the ``verbose`` flag which defaults to ``True``.
41044104

4105+
You can send the query results directly to a table in BigQuery by setting the ``destination_table`` argument.
4106+
4107+
For example,
4108+
4109+
.. code-block:: python
4110+
4111+
df.read_gbq('SELECT * FROM test_dataset.test_table', project_id=projectid, destination_table='my_dataset.my_table')
4112+
4113+
.. note::
4114+
4115+
When the ``destination_table`` argument is set, an empty dataframe will be returned.
4116+
4117+
.. note::
4118+
4119+
The destination table and destination dataset will automatically be created if they do not already exist.
4120+
4121+
The ``if_exists`` argument can be used to dictate whether to ``'fail'``, ``'replace'``
4122+
or ``'append'`` if the destination table already exists when using the ``destination_table`` argument.
4123+
The default value is ``'fail'``.
4124+
4125+
For example, assume that ``if_exists`` is set to ``'fail'``. The following snippet will raise
4126+
a ``TableCreationError`` if the destination table already exists.
4127+
4128+
.. code-block:: python
4129+
4130+
df.read_gbq('SELECT * FROM test_dataset.test_table',
4131+
project_id=projectid,
4132+
destination_table='my_dataset.my_table',
4133+
if_exists='fail')
4134+
4135+
.. note::
4136+
4137+
If you plan to run a query that may return larger results, you can set the ``allow_large_results`` argument
4138+
which defaults to ``False``. Setting the ``allow_large_results`` argument will effectively set the BigQuery
4139+
``'allowLargeResults'`` option to true in the BigQuery job configuration.
4140+
4141+
Queries that return large results will take longer to execute, even if the result set is small,
4142+
and are subject to `additional limitations <https://cloud.google.com/bigquery/querying-data?hl=en#largequeryresults>`__.
4143+
41054144
Writing DataFrames
41064145
''''''''''''''''''
41074146

doc/source/whatsnew/v0.17.0.txt

+2
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ Google BigQuery Enhancements
333333
- ``InvalidColumnOrder`` and ``InvalidPageToken`` in the gbq module will raise ``ValueError`` instead of ``IOError``.
334334
- The ``generate_bq_schema()`` function is now deprecated and will be removed in a future version (:issue:`11121`)
335335
- Update the gbq module to support Python 3 (:issue:`11094`).
336+
- Modify :func:`pandas.io.gbq.read_gbq()` to allow users to redirect the query results to a destination table via the `destination_table` parameter. See the :ref:`docs <io.bigquery>` for more details (:issue:`11209`)
337+
- Modify :func:`pandas.io.gbq.read_gbq()` to allow users to allow users to set the ``'allowLargeResults'`` option in the BigQuery job configuration via the ``allow_large_results`` parameter. (:issue:`11209`)
336338

337339
.. _whatsnew_0170.enhancements.other:
338340

pandas/io/gbq.py

+62-3
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def process_insert_errors(insert_errors, verbose):
195195

196196
raise StreamingInsertError
197197

198-
def run_query(self, query, verbose=True):
198+
def run_query(self, query, verbose=True, destination_table=None, if_exists='fail', allow_large_results=False):
199199
from apiclient.errors import HttpError
200200
from oauth2client.client import AccessTokenRefreshError
201201

@@ -211,6 +211,48 @@ def run_query(self, query, verbose=True):
211211
}
212212
}
213213

214+
if destination_table:
215+
if if_exists not in ('fail', 'replace', 'append'):
216+
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
217+
218+
if '.' not in destination_table:
219+
raise NotFoundException("Invalid Table Name. Should be of the form 'datasetId.tableId' ")
220+
221+
dataset_id, table_id = destination_table.rsplit('.', 1)
222+
223+
job_data['configuration']['query'] = {
224+
'query': query,
225+
'destinationTable': {
226+
"projectId": self.project_id,
227+
"datasetId": dataset_id,
228+
"tableId": table_id,
229+
},
230+
'writeDisposition': 'WRITE_EMPTY', # A 'duplicate' error is returned in the job result if table exists
231+
'allowLargeResults': 'false' # If true, allows the query to produce large result tables.
232+
}
233+
234+
if allow_large_results:
235+
job_data['configuration']['query']['allowLargeResults'] = 'true'
236+
237+
table = _Table(self.project_id, dataset_id)
238+
239+
if table.exists(table_id):
240+
if if_exists == 'fail':
241+
raise TableCreationError("Could not create the table because it already exists. "
242+
"Change the if_exists parameter to append or replace data.")
243+
elif if_exists == 'replace':
244+
# If the table already exists, instruct BigQuery to overwrite the table data.
245+
job_data['configuration']['query']['writeDisposition'] = 'WRITE_TRUNCATE'
246+
elif if_exists == 'append':
247+
# If the table already exists, instruct BigQuery to append to the table data.
248+
job_data['configuration']['query']['writeDisposition'] = 'WRITE_APPEND'
249+
250+
dataset = _Dataset(self.project_id)
251+
252+
# create the destination dataset if it does not already exist
253+
if not dataset.exists(dataset_id):
254+
dataset.create(dataset_id)
255+
214256
try:
215257
query_reply = job_collection.insert(projectId=self.project_id, body=job_data).execute()
216258
except AccessTokenRefreshError:
@@ -237,6 +279,9 @@ def run_query(self, query, verbose=True):
237279
# Only read schema on first page
238280
schema = query_reply['schema']
239281

282+
if destination_table:
283+
return schema, list()
284+
240285
# Loop through each page of data
241286
while 'rows' in query_reply and current_row < total_rows:
242287
page = query_reply['rows']
@@ -385,7 +430,8 @@ def _parse_entry(field_value, field_type):
385430
return field_value
386431

387432

388-
def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True):
433+
def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=False, verbose=True,
434+
destination_table=None, if_exists='fail', allow_large_results=False):
389435
"""Load data from Google BigQuery.
390436
391437
THIS IS AN EXPERIMENTAL LIBRARY
@@ -412,6 +458,15 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals
412458
if multiple accounts are used.
413459
verbose : boolean (default True)
414460
Verbose output
461+
destination_table : string
462+
Name of table to be written, in the form 'dataset.tablename'
463+
if_exists : {'fail', 'replace', 'append'}, default 'fail'
464+
'fail': If table exists, do nothing.
465+
'replace': If table exists, drop it, recreate it, and insert data.
466+
'append': If table exists, insert data. Create if does not exist.
467+
allow_large_results : boolean (default False)
468+
Enables the Google BigQuery allowLargeResults option which is necessary for
469+
queries that may return larger results.
415470
416471
Returns
417472
-------
@@ -424,7 +479,11 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, reauth=Fals
424479
raise TypeError("Missing required parameter: project_id")
425480

426481
connector = GbqConnector(project_id, reauth=reauth)
427-
schema, pages = connector.run_query(query, verbose=verbose)
482+
schema, pages = connector.run_query(query,
483+
verbose=verbose,
484+
destination_table=destination_table,
485+
if_exists=if_exists,
486+
allow_large_results=allow_large_results)
428487
dataframe_list = []
429488
while len(pages) > 0:
430489
page = pages.pop()

pandas/io/tests/test_gbq.py

+80-5
Original file line numberDiff line numberDiff line change
@@ -204,28 +204,32 @@ class TestReadGBQIntegration(tm.TestCase):
204204
@classmethod
205205
def setUpClass(cls):
206206
# - GLOBAL CLASS FIXTURES -
207-
# put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests
208-
# described below.
207+
# put here any instruction you want to execute only *ONCE* *BEFORE* executing *ALL* tests
208+
# described below.
209209

210210
if not PROJECT_ID:
211211
raise nose.SkipTest("Cannot run integration tests without a project id")
212212

213213
test_requirements()
214+
clean_gbq_environment()
215+
216+
gbq._Dataset(PROJECT_ID).create(DATASET_ID + "7")
214217

215218
def setUp(self):
216219
# - PER-TEST FIXTURES -
217-
# put here any instruction you want to be run *BEFORE* *EVERY* test is executed.
220+
# put here any instruction you want to be run *BEFORE* *EVERY* test is executed.
218221
pass
219222

220223
@classmethod
221224
def tearDownClass(cls):
222225
# - GLOBAL CLASS FIXTURES -
223226
# put here any instruction you want to execute only *ONCE* *AFTER* executing all tests.
224-
pass
227+
228+
clean_gbq_environment()
225229

226230
def tearDown(self):
227231
# - PER-TEST FIXTURES -
228-
# put here any instructions you want to be run *AFTER* *EVERY* test is executed.
232+
# put here any instructions you want to be run *AFTER* *EVERY* test is executed.
229233
pass
230234

231235
def test_should_properly_handle_valid_strings(self):
@@ -357,6 +361,77 @@ def test_zero_rows(self):
357361
expected_result = DataFrame(columns=['title', 'language'])
358362
self.assert_frame_equal(df, expected_result)
359363

364+
def test_redirect_query_results_to_destination_table_default(self):
365+
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "1")
366+
test_size = 100
367+
368+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
369+
destination_table=destination_table)
370+
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
371+
self.assertEqual(result['NUM_ROWS'][0], test_size)
372+
373+
def test_redirect_query_results_to_destination_table_if_table_exists_fail(self):
374+
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "2")
375+
test_size = 100
376+
377+
# Test redirecting the query results to a destination table without specifying the if_exists parameter
378+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
379+
destination_table=destination_table)
380+
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
381+
self.assertEqual(result['NUM_ROWS'][0], test_size)
382+
383+
# Confirm that the default action is to to fail if the table exists and if_exists parameter is not provided
384+
with tm.assertRaises(gbq.TableCreationError):
385+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
386+
destination_table=destination_table)
387+
388+
# Test the if_exists parameter with value 'fail'
389+
with tm.assertRaises(gbq.TableCreationError):
390+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
391+
destination_table=destination_table, if_exists='fail')
392+
393+
def test_redirect_query_results_to_destination_table_if_table_exists_append(self):
394+
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "3")
395+
test_size = 100
396+
397+
# Initialize table with sample data
398+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
399+
destination_table=destination_table)
400+
401+
# Test the if_exists parameter with value 'append'
402+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
403+
destination_table=destination_table, if_exists='append')
404+
405+
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
406+
self.assertEqual(result['NUM_ROWS'][0], test_size * 2)
407+
408+
# Try redirecting data an existing table with different schema, confirm failure
409+
with tm.assertRaises(gbq.GenericGBQException):
410+
gbq.read_gbq("SELECT title FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
411+
destination_table=destination_table, if_exists='append')
412+
413+
def test_redirect_query_results_to_destination_table_if_table_exists_replace(self):
414+
destination_table = "{0}.{1}".format(DATASET_ID + "7", TABLE_ID + "4")
415+
test_size = 100
416+
417+
# Initialize table with sample data
418+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
419+
destination_table=destination_table)
420+
421+
# Test the if_exists parameter with the value 'replace'
422+
gbq.read_gbq("SELECT title FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
423+
destination_table=destination_table, if_exists='replace')
424+
425+
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
426+
self.assertEqual(result['NUM_ROWS'][0], test_size)
427+
428+
def test_redirect_query_results_to_destination_table_dataset_does_not_exist(self):
429+
destination_table = "{0}.{1}".format(DATASET_ID + "8", TABLE_ID + "5")
430+
test_size = 100
431+
gbq.read_gbq("SELECT id FROM [publicdata:samples.wikipedia] LIMIT " + str(test_size), PROJECT_ID,
432+
destination_table=destination_table)
433+
result = gbq.read_gbq("SELECT COUNT(*) as NUM_ROWS FROM {0}".format(destination_table), PROJECT_ID)
434+
self.assertEqual(result['NUM_ROWS'][0], test_size)
360435

361436
class TestToGBQIntegration(tm.TestCase):
362437
# Changes to BigQuery table schema may take up to 2 minutes as of May 2015

0 commit comments

Comments
 (0)