Skip to content

Commit 5e80908

Browse files
authored
ENH: Add use_bqstorage_api option to read_gbq (#270)
* ENH: Add use_bqstorage_api option to read_gbq The BigQuery Storage API provides a way to read query results quickly (and using multiple threads). It only works with large query results (~125 MB), but as of 1.11.1, the google-cloud-bigquery library can fallback to the BigQuery API to download results when a request to the BigQuery Storage API fails. As this API can increase costs (and may not be enabled on the user's project), this option is disabled by default. * Add to changelog. Remove comment about destination tables. * Add docs for using the BigQuery Storage API.
1 parent 141b2b4 commit 5e80908

File tree

4 files changed

+121
-2
lines changed

4 files changed

+121
-2
lines changed

docs/source/changelog.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ Enhancements
4141
available (contributed by @daureg)
4242
- ``read_gbq`` uses the timezone-aware ``DatetimeTZDtype(unit='ns',
4343
tz='UTC')`` dtype for BigQuery ``TIMESTAMP`` columns. (:issue:`269`)
44+
- Add ``use_bqstorage_api`` to :func:`read_gbq`. The BigQuery Storage API can
45+
be used to download large query results (>125 MB) more quickly. If the BQ
46+
Storage API can't be used, the BigQuery API is used instead. (:issue:`133`,
47+
:issue:`270`)
4448

4549
.. _changelog-0.9.0:
4650

docs/source/reading.rst

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,41 @@ DATETIME datetime64[ns]
8484
TIME datetime64[ns]
8585
DATE datetime64[ns]
8686
================== =========================
87+
88+
.. _reading-bqstorage-api:
89+
90+
Using the BigQuery Storage API
91+
------------------------------
92+
93+
Use the BigQuery Storage API to download large (>125 MB) query results more
94+
quickly (but at an `increased cost
95+
<https://cloud.google.com/bigquery/pricing#storage-api>`__) by setting
96+
``use_bqstorage_api`` to ``True``.
97+
98+
1. Enable the BigQuery Storage API on the project you are using to run
99+
queries.
100+
101+
`Enable the API
102+
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__.
103+
2. Ensure you have the `*bigquery.readsessions.create permission*
104+
<https://cloud.google.com/bigquery/docs/access-control#bq-permissions>`__. to
105+
create BigQuery Storage API read sessions. This permission is provided by
106+
the `*bigquery.user* role
107+
<https://cloud.google.com/bigquery/docs/access-control#roles>`__.
108+
4. Install the ``google-cloud-bigquery-storage``, ``fastavro``, and
109+
``python-snappy`` packages.
110+
111+
With pip:
112+
113+
..code-block:: sh
114+
115+
pip install --upgrade google-cloud-bigquery-storage fastavro python-snappy
116+
117+
With conda:
118+
119+
conda install -c conda-forge google-cloud-bigquery-storage fastavro python-snappy
120+
4. Set ``use_bqstorage_api`` to ``True`` when calling the
121+
:func:`~pandas_gbq.read_gbq` function. As of the ``google-cloud-bigquery``
122+
package, version 1.11.1 or later,the function will fallback to the
123+
BigQuery API if the BigQuery Storage API cannot be used, such as with
124+
small query results.

pandas_gbq/gbq.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
import numpy as np
77

8+
try:
9+
# The BigQuery Storage API client is an optional dependency. It is only
10+
# required when use_bqstorage_api=True.
11+
from google.cloud import bigquery_storage_v1beta1
12+
except ImportError: # pragma: NO COVER
13+
bigquery_storage_v1beta1 = None
14+
815
from pandas_gbq.exceptions import AccessDenied
916

1017
logger = logging.getLogger(__name__)
@@ -302,6 +309,7 @@ def __init__(
302309
dialect="standard",
303310
location=None,
304311
credentials=None,
312+
use_bqstorage_api=False,
305313
):
306314
global context
307315
from google.api_core.exceptions import GoogleAPIError
@@ -352,6 +360,9 @@ def __init__(
352360
context.project = self.project_id
353361

354362
self.client = self.get_client()
363+
self.bqstorage_client = _make_bqstorage_client(
364+
use_bqstorage_api, self.credentials
365+
)
355366

356367
# BQ Queries costs $5 per TB. First 1 TB per month is free
357368
# see here for more: https://cloud.google.com/bigquery/pricing
@@ -489,7 +500,9 @@ def run_query(self, query, **kwargs):
489500

490501
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
491502
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
492-
df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes)
503+
df = rows_iter.to_dataframe(
504+
dtypes=nullsafe_dtypes, bqstorage_client=self.bqstorage_client
505+
)
493506

494507
if df.empty:
495508
df = _cast_empty_df_dtypes(schema_fields, df)
@@ -727,6 +740,21 @@ def _localize_df(schema_fields, df):
727740
return df
728741

729742

743+
def _make_bqstorage_client(use_bqstorage_api, credentials):
744+
if not use_bqstorage_api:
745+
return None
746+
747+
if bigquery_storage_v1beta1 is None:
748+
raise ImportError(
749+
"Install the google-cloud-bigquery-storage and fastavro packages "
750+
"to use the BigQuery Storage API."
751+
)
752+
753+
return bigquery_storage_v1beta1.BigQueryStorageClient(
754+
credentials=credentials
755+
)
756+
757+
730758
def read_gbq(
731759
query,
732760
project_id=None,
@@ -738,6 +766,7 @@ def read_gbq(
738766
location=None,
739767
configuration=None,
740768
credentials=None,
769+
use_bqstorage_api=False,
741770
verbose=None,
742771
private_key=None,
743772
):
@@ -815,6 +844,27 @@ def read_gbq(
815844
:class:`google.oauth2.service_account.Credentials` directly.
816845
817846
.. versionadded:: 0.8.0
847+
use_bqstorage_api : bool, default False
848+
Use the `BigQuery Storage API
849+
<https://cloud.google.com/bigquery/docs/reference/storage/>`__ to
850+
download query results quickly, but at an increased cost. To use this
851+
API, first `enable it in the Cloud Console
852+
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__.
853+
You must also have the `bigquery.readsessions.create
854+
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
855+
permission on the project you are billing queries to.
856+
857+
**Note:** Due to a `known issue in the ``google-cloud-bigquery``
858+
package
859+
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
860+
(fixed in version 1.11.0), you must write your query results to a
861+
destination table. To do this with ``read_gbq``, supply a
862+
``configuration`` dictionary.
863+
864+
This feature requires the ``google-cloud-bigquery-storage`` and
865+
``fastavro`` packages.
866+
867+
.. versionadded:: 0.10.0
818868
verbose : None, deprecated
819869
Deprecated in Pandas-GBQ 0.4.0. Use the `logging module
820870
to adjust verbosity instead
@@ -871,6 +921,7 @@ def read_gbq(
871921
location=location,
872922
credentials=credentials,
873923
private_key=private_key,
924+
use_bqstorage_api=use_bqstorage_api,
874925
)
875926

876927
final_df = connector.run_query(query, configuration=configuration)

tests/system/test_gbq.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,10 +895,36 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path):
895895
location="asia-northeast1",
896896
private_key=private_key_path,
897897
)
898-
print(df)
899898
assert df["max_year"][0] >= 2000
900899

901900

901+
@pytest.mark.skip(reason="large query for BQ Storage API tests")
902+
def test_read_gbq_w_bqstorage_api(credentials):
903+
df = gbq.read_gbq(
904+
"""
905+
SELECT
906+
dependency_name,
907+
dependency_platform,
908+
project_name,
909+
project_id,
910+
version_number,
911+
version_id,
912+
dependency_kind,
913+
optional_dependency,
914+
dependency_requirements,
915+
dependency_project_id
916+
FROM
917+
`bigquery-public-data.libraries_io.dependencies`
918+
WHERE
919+
LOWER(dependency_platform) = 'npm'
920+
LIMIT 2500000
921+
""",
922+
use_bqstorage_api=True,
923+
credentials=credentials,
924+
)
925+
assert len(df) == 2500000
926+
927+
902928
class TestToGBQIntegration(object):
903929
@pytest.fixture(autouse=True, scope="function")
904930
def setup(self, project, credentials, random_dataset_id):

0 commit comments

Comments
 (0)