Skip to content

Commit e7007a8

Browse files
authored
ENH: Add location parameter to read_gbq and to_gbq (#185)
* ENH: Add location parameter to read_gbq and to_gbq This allows queries to be run against datasets in the Tokyo region. Likewise, it enables loading dataframes into Tokyo datasets. The location parameter was added in 0.32.0, so this PR also updates the minimum google-cloud-bigquery version. * DOC: Add location parameter to changelog. Fix test to use private_key parameter so that it passes on Travis. * TST: lock conda google-cloud-bigquery version
1 parent e753cc4 commit e7007a8

File tree

10 files changed

+128
-166
lines changed

10 files changed

+128
-166
lines changed

ci/requirements-3.5-0.18.1.pip

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
google-auth==1.4.1
22
google-auth-oauthlib==0.0.1
3-
google-cloud-bigquery==0.29.0
3+
google-cloud-bigquery==0.32.0

docs/source/changelog.rst

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Changelog
77
- Project ID parameter is optional in ``read_gbq`` and ``to_gbq`` when it can
88
inferred from the environment. Note: you must still pass in a project ID when
99
using user-based authentication. (:issue:`103`)
10+
- Add location parameter to ``read_gbq`` and ``to_gbq`` so that pandas-gbq
11+
can work with datasets in the Tokyo region. (:issue:`177`)
1012
- Progress bar added for ``to_gbq``, through an optional library `tqdm` as
1113
dependency. (:issue:`162`)
1214

pandas_gbq/gbq.py

+34-18
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def _check_google_client_version():
3434
raise ImportError('Could not import pkg_resources (setuptools).')
3535

3636
# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
37-
bigquery_minimum_version = pkg_resources.parse_version('0.29.0')
37+
bigquery_minimum_version = pkg_resources.parse_version('0.32.0')
3838
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
3939
'google-cloud-bigquery').parsed_version
4040

@@ -152,12 +152,13 @@ class GbqConnector(object):
152152

153153
def __init__(self, project_id, reauth=False,
154154
private_key=None, auth_local_webserver=False,
155-
dialect='legacy'):
155+
dialect='legacy', location=None):
156156
from google.api_core.exceptions import GoogleAPIError
157157
from google.api_core.exceptions import ClientError
158158
from pandas_gbq import auth
159159
self.http_error = (ClientError, GoogleAPIError)
160160
self.project_id = project_id
161+
self.location = location
161162
self.reauth = reauth
162163
self.private_key = private_key
163164
self.auth_local_webserver = auth_local_webserver
@@ -215,9 +216,9 @@ def process_http_error(ex):
215216
raise GenericGBQException("Reason: {0}".format(ex))
216217

217218
def run_query(self, query, **kwargs):
218-
from google.auth.exceptions import RefreshError
219219
from concurrent.futures import TimeoutError
220-
import pandas_gbq.query
220+
from google.auth.exceptions import RefreshError
221+
from google.cloud import bigquery
221222

222223
job_config = {
223224
'query': {
@@ -243,8 +244,8 @@ def run_query(self, query, **kwargs):
243244
logger.info('Requesting query... ')
244245
query_reply = self.client.query(
245246
query,
246-
job_config=pandas_gbq.query.query_config(
247-
job_config, BIGQUERY_INSTALLED_VERSION))
247+
job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
248+
location=self.location)
248249
logger.info('ok.\nQuery running...')
249250
except (RefreshError, ValueError):
250251
if self.private_key:
@@ -319,7 +320,7 @@ def load_data(
319320
try:
320321
chunks = load.load_chunks(self.client, dataframe, dataset_id,
321322
table_id, chunksize=chunksize,
322-
schema=schema)
323+
schema=schema, location=self.location)
323324
if progress_bar and tqdm:
324325
chunks = tqdm.tqdm(chunks)
325326
for remaining_rows in chunks:
@@ -470,7 +471,8 @@ def _parse_data(schema, rows):
470471

471472
def read_gbq(query, project_id=None, index_col=None, col_order=None,
472473
reauth=False, verbose=None, private_key=None,
473-
auth_local_webserver=False, dialect='legacy', **kwargs):
474+
auth_local_webserver=False, dialect='legacy', location=None,
475+
configuration=None):
474476
r"""Load data from Google BigQuery using google-cloud-python
475477
476478
The main method a user calls to execute a Query in Google BigQuery
@@ -520,17 +522,23 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
520522
compliant with the SQL 2011 standard. For more information
521523
see `BigQuery SQL Reference
522524
<https://cloud.google.com/bigquery/sql-reference/>`__
523-
verbose : None, deprecated
524-
525-
**kwargs : Arbitrary keyword arguments
526-
configuration (dict): query config parameters for job processing.
525+
location : str (optional)
526+
Location where the query job should run. See the `BigQuery locations
527+
<https://cloud.google.com/bigquery/docs/dataset-locations>
528+
documentation`__ for a list of available locations. The location must
529+
match that of any datasets used in the query.
530+
.. versionadded:: 0.5.0
531+
configuration : dict (optional)
532+
Query config parameters for job processing.
527533
For example:
528534
529535
configuration = {'query': {'useQueryCache': False}}
530536
531537
For more information see `BigQuery SQL Reference
532538
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__
533539
540+
verbose : None, deprecated
541+
534542
Returns
535543
-------
536544
df: DataFrame
@@ -550,9 +558,9 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
550558
raise ValueError("'{0}' is not valid for dialect".format(dialect))
551559

552560
connector = GbqConnector(
553-
project_id, reauth=reauth, private_key=private_key,
554-
dialect=dialect, auth_local_webserver=auth_local_webserver)
555-
schema, rows = connector.run_query(query, **kwargs)
561+
project_id, reauth=reauth, private_key=private_key, dialect=dialect,
562+
auth_local_webserver=auth_local_webserver, location=location)
563+
schema, rows = connector.run_query(query, configuration=configuration)
556564
final_df = _parse_data(schema, rows)
557565

558566
# Reindex the DataFrame on the provided column
@@ -595,7 +603,8 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
595603

596604
def to_gbq(dataframe, destination_table, project_id=None, chunksize=None,
597605
verbose=None, reauth=False, if_exists='fail', private_key=None,
598-
auth_local_webserver=False, table_schema=None, progress_bar=True):
606+
auth_local_webserver=False, table_schema=None, location=None,
607+
progress_bar=True):
599608
"""Write a DataFrame to a Google BigQuery table.
600609
601610
The main method a user calls to export pandas DataFrame contents to
@@ -648,9 +657,16 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=None,
648657
of DataFrame columns. See BigQuery API documentation on available
649658
names of a field.
650659
.. versionadded:: 0.3.1
651-
verbose : None, deprecated
660+
location : str (optional)
661+
Location where the load job should run. See the `BigQuery locations
662+
<https://cloud.google.com/bigquery/docs/dataset-locations>
663+
documentation`__ for a list of available locations. The location must
664+
match that of the target dataset.
665+
.. versionadded:: 0.5.0
652666
progress_bar : boolean, True by default. It uses the library `tqdm` to show
653667
the progress bar for the upload, chunk by chunk.
668+
.. versionadded:: 0.5.0
669+
verbose : None, deprecated
654670
"""
655671

656672
_test_google_api_imports()
@@ -670,7 +686,7 @@ def to_gbq(dataframe, destination_table, project_id=None, chunksize=None,
670686

671687
connector = GbqConnector(
672688
project_id, reauth=reauth, private_key=private_key,
673-
auth_local_webserver=auth_local_webserver)
689+
auth_local_webserver=auth_local_webserver, location=location)
674690
dataset_id, table_id = destination_table.rsplit('.', 1)
675691

676692
table = _Table(project_id, dataset_id, reauth=reauth,

pandas_gbq/load.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def encode_chunks(dataframe, chunksize=None):
4444

4545

4646
def load_chunks(
47-
client, dataframe, dataset_id, table_id, chunksize=None, schema=None):
47+
client, dataframe, dataset_id, table_id, chunksize=None, schema=None,
48+
location=None):
4849
destination_table = client.dataset(dataset_id).table(table_id)
4950
job_config = bigquery.LoadJobConfig()
5051
job_config.write_disposition = 'WRITE_APPEND'
@@ -71,4 +72,5 @@ def load_chunks(
7172
client.load_table_from_file(
7273
chunk_buffer,
7374
destination_table,
74-
job_config=job_config).result()
75+
job_config=job_config,
76+
location=location).result()

pandas_gbq/query.py

-25
This file was deleted.

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def readme():
2121
'pandas',
2222
'google-auth',
2323
'google-auth-oauthlib',
24-
'google-cloud-bigquery>=0.29.0',
24+
'google-cloud-bigquery>=0.32.0',
2525
]
2626

2727
extras = {

tests/system/conftest.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
import pytest
77

88

9-
@pytest.fixture
9+
@pytest.fixture(scope='session')
1010
def project_id():
1111
return (os.environ.get('GBQ_PROJECT_ID')
1212
or os.environ.get('GOOGLE_CLOUD_PROJECT')) # noqa
1313

1414

15-
@pytest.fixture
15+
@pytest.fixture(scope='session')
1616
def private_key_path():
1717
path = None
1818
if 'TRAVIS_BUILD_DIR' in os.environ:
@@ -36,7 +36,7 @@ def private_key_path():
3636
return path
3737

3838

39-
@pytest.fixture
39+
@pytest.fixture(scope='session')
4040
def private_key_contents(private_key_path):
4141
if private_key_path is None:
4242
return None

tests/system/test_gbq.py

+68-44
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import sys
44
from datetime import datetime
55
from random import randint
6-
from time import sleep
76

87
import numpy as np
98
import pandas.util.testing as tm
@@ -50,46 +49,46 @@ def gbq_connector(project, credentials):
5049
return gbq.GbqConnector(project, private_key=credentials)
5150

5251

53-
def clean_gbq_environment(dataset_prefix, private_key=None, project_id=None):
54-
dataset = gbq._Dataset(project_id, private_key=private_key)
55-
all_datasets = dataset.datasets()
56-
57-
retry = 3
58-
while retry > 0:
59-
try:
60-
retry = retry - 1
61-
for i in range(1, 10):
62-
dataset_id = dataset_prefix + str(i)
63-
if dataset_id in all_datasets:
64-
table = gbq._Table(project_id, dataset_id,
65-
private_key=private_key)
66-
67-
# Table listing is eventually consistent, so loop until
68-
# all tables no longer appear (max 30 seconds).
69-
table_retry = 30
70-
all_tables = dataset.tables(dataset_id)
71-
while all_tables and table_retry > 0:
72-
for table_id in all_tables:
73-
try:
74-
table.delete(table_id)
75-
except gbq.NotFoundException:
76-
pass
77-
sleep(1)
78-
table_retry = table_retry - 1
79-
all_tables = dataset.tables(dataset_id)
80-
81-
dataset.delete(dataset_id)
82-
retry = 0
83-
except gbq.GenericGBQException as ex:
84-
# Build in retry logic to work around the following errors :
85-
# An internal error occurred and the request could not be...
86-
# Dataset ... is still in use
87-
error_message = str(ex).lower()
88-
if ('an internal error occurred' in error_message or
89-
'still in use' in error_message) and retry > 0:
90-
sleep(30)
91-
else:
92-
raise ex
52+
@pytest.fixture(scope='module')
53+
def bigquery_client(project_id, private_key_path):
54+
from google.cloud import bigquery
55+
return bigquery.Client.from_service_account_json(
56+
private_key_path, project=project_id)
57+
58+
59+
@pytest.fixture(scope='module')
60+
def tokyo_dataset(bigquery_client):
61+
from google.cloud import bigquery
62+
dataset_id = 'tokyo_{}'.format(_get_dataset_prefix_random())
63+
dataset_ref = bigquery_client.dataset(dataset_id)
64+
dataset = bigquery.Dataset(dataset_ref)
65+
dataset.location = 'asia-northeast1'
66+
bigquery_client.create_dataset(dataset)
67+
yield dataset_id
68+
bigquery_client.delete_dataset(dataset_ref, delete_contents=True)
69+
70+
71+
@pytest.fixture(scope='module')
72+
def tokyo_table(bigquery_client, tokyo_dataset):
73+
table_id = 'tokyo_table'
74+
# Create a random table using DDL.
75+
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
76+
bigquery_client.query(
77+
"""CREATE TABLE {}.{}
78+
AS SELECT
79+
2000 + CAST(18 * RAND() as INT64) as year,
80+
IF(RAND() > 0.5,"foo","bar") as token
81+
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
82+
""".format(tokyo_dataset, table_id),
83+
location='asia-northeast1').result()
84+
return table_id
85+
86+
87+
def clean_gbq_environment(dataset_prefix, bigquery_client):
88+
for dataset in bigquery_client.list_datasets():
89+
if not dataset.dataset_id.startswith(dataset_prefix):
90+
continue
91+
bigquery_client.delete_dataset(dataset.reference, delete_contents=True)
9392

9493

9594
def make_mixed_dataframe_v2(test_size):
@@ -640,6 +639,16 @@ def test_array_of_floats(self, private_key_path, project_id):
640639
tm.assert_frame_equal(df, DataFrame([[[1.1, 2.2, 3.3], 4]],
641640
columns=["a", "b"]))
642641

642+
def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path):
643+
df = gbq.read_gbq(
644+
'SELECT MAX(year) AS max_year FROM {}.{}'.format(
645+
tokyo_dataset, tokyo_table),
646+
dialect='standard',
647+
location='asia-northeast1',
648+
private_key=private_key_path)
649+
print(df)
650+
assert df['max_year'][0] >= 2000
651+
643652

644653
class TestToGBQIntegration(object):
645654
# Changes to BigQuery table schema may take up to 2 minutes as of May 2015
@@ -649,13 +658,13 @@ class TestToGBQIntegration(object):
649658
# <https://code.google.com/p/google-bigquery/issues/detail?id=191>`__
650659

651660
@pytest.fixture(autouse=True, scope='function')
652-
def setup(self, project, credentials):
661+
def setup(self, project, credentials, bigquery_client):
653662
# - PER-TEST FIXTURES -
654663
# put here any instruction you want to be run *BEFORE* *EVERY* test is
655664
# executed.
656665

657666
self.dataset_prefix = _get_dataset_prefix_random()
658-
clean_gbq_environment(self.dataset_prefix, credentials, project)
667+
clean_gbq_environment(self.dataset_prefix, bigquery_client)
659668
self.dataset = gbq._Dataset(project,
660669
private_key=credentials)
661670
self.table = gbq._Table(project, self.dataset_prefix + "1",
@@ -667,7 +676,7 @@ def setup(self, project, credentials):
667676
self.dataset.create(self.dataset_prefix + "1")
668677
self.credentials = credentials
669678
yield
670-
clean_gbq_environment(self.dataset_prefix, self.credentials, project)
679+
clean_gbq_environment(self.dataset_prefix, bigquery_client)
671680

672681
def test_upload_data(self, project_id):
673682
test_id = "1"
@@ -1192,6 +1201,21 @@ def test_upload_data_with_different_df_and_user_schema(self, project_id):
11921201
assert self.table.verify_schema(dataset, table,
11931202
dict(fields=test_schema))
11941203

1204+
def test_upload_data_tokyo(
1205+
self, project_id, tokyo_dataset, bigquery_client):
1206+
test_size = 10
1207+
df = make_mixed_dataframe_v2(test_size)
1208+
tokyo_destination = '{}.to_gbq_test'.format(tokyo_dataset)
1209+
1210+
# Initialize table with sample data
1211+
gbq.to_gbq(
1212+
df, tokyo_destination, project_id, private_key=self.credentials,
1213+
location='asia-northeast1')
1214+
1215+
table = bigquery_client.get_table(
1216+
bigquery_client.dataset(tokyo_dataset).table('to_gbq_test'))
1217+
assert table.num_rows > 0
1218+
11951219
def test_list_dataset(self):
11961220
dataset_id = self.dataset_prefix + "1"
11971221
assert dataset_id in self.dataset.datasets()

0 commit comments

Comments
 (0)