-
Notifications
You must be signed in to change notification settings - Fork 125
feat: accepts a table ID, which downloads the table without a query #443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
9a9d3fd
6adf233
73a791a
9b1eb0d
ec9ddaf
9cc7c74
dd51ad8
e1ad679
d29bc2a
cb8f24f
56b73b2
8a61e97
3f7900b
3c53f1f
5ce125f
ea660f4
6704991
9a1ca16
a9075df
b3061b6
ed3f9d9
b9b017c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
# license that can be found in the LICENSE file. | ||
|
||
import logging | ||
import re | ||
import time | ||
import warnings | ||
from datetime import datetime | ||
|
@@ -64,6 +65,10 @@ def _test_google_api_imports(): | |
raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex | ||
|
||
|
||
def _is_query(query_or_table: str) -> bool: | ||
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None | ||
|
||
|
||
class DatasetCreationError(ValueError): | ||
""" | ||
Raised when the create dataset method fails | ||
|
@@ -374,6 +379,26 @@ def process_http_error(ex): | |
|
||
raise GenericGBQException("Reason: {0}".format(ex)) | ||
|
||
def download_table( | ||
self, table_id, max_results=None, progress_bar_type=None, dtypes=None | ||
): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (nit) Maybe completely annotate new methods? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a start, |
||
self._start_timer() | ||
|
||
try: | ||
table_ref = bigquery.TableReference.from_string( | ||
table_id, default_project=self.project_id | ||
) | ||
rows_iter = self.client.list_rows(table_ref, max_results=max_results) | ||
except self.http_error as ex: | ||
self.process_http_error(ex) | ||
|
||
return self._download_results( | ||
rows_iter, | ||
max_results=max_results, | ||
progress_bar_type=progress_bar_type, | ||
user_dtypes=dtypes, | ||
) | ||
|
||
def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): | ||
from concurrent.futures import TimeoutError | ||
from google.auth.exceptions import RefreshError | ||
|
@@ -390,15 +415,6 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): | |
if config is not None: | ||
job_config.update(config) | ||
|
||
if "query" in config and "query" in config["query"]: | ||
if query is not None: | ||
raise ValueError( | ||
"Query statement can't be specified " | ||
"inside config while it is specified " | ||
"as parameter" | ||
) | ||
query = config["query"].pop("query") | ||
|
||
self._start_timer() | ||
|
||
try: | ||
|
@@ -464,15 +480,25 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): | |
) | ||
|
||
dtypes = kwargs.get("dtypes") | ||
|
||
# Ensure destination is populated. | ||
try: | ||
query_reply.result() | ||
except self.http_error as ex: | ||
self.process_http_error(ex) | ||
|
||
rows_iter = self.client.list_rows( | ||
query_reply.destination, max_results=max_results | ||
) | ||
return self._download_results( | ||
query_reply, | ||
rows_iter, | ||
max_results=max_results, | ||
progress_bar_type=progress_bar_type, | ||
user_dtypes=dtypes, | ||
) | ||
|
||
def _download_results( | ||
self, query_job, max_results=None, progress_bar_type=None, user_dtypes=None, | ||
self, rows_iter, max_results=None, progress_bar_type=None, user_dtypes=None, | ||
): | ||
# No results are desired, so don't bother downloading anything. | ||
if max_results == 0: | ||
|
@@ -504,11 +530,6 @@ def _download_results( | |
to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client | ||
|
||
try: | ||
query_job.result() | ||
# Get the table schema, so that we can list rows. | ||
destination = self.client.get_table(query_job.destination) | ||
rows_iter = self.client.list_rows(destination, max_results=max_results) | ||
|
||
schema_fields = [field.to_api_repr() for field in rows_iter.schema] | ||
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) | ||
conversion_dtypes.update(user_dtypes) | ||
|
@@ -644,7 +665,7 @@ def _cast_empty_df_dtypes(schema_fields, df): | |
|
||
|
||
def read_gbq( | ||
query, | ||
query_or_table, | ||
project_id=None, | ||
index_col=None, | ||
col_order=None, | ||
|
@@ -668,17 +689,18 @@ def read_gbq( | |
|
||
This method uses the Google Cloud client library to make requests to | ||
Google BigQuery, documented `here | ||
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__. | ||
<https://googleapis.dev/python/bigquery/latest/index.html>`__. | ||
|
||
See the :ref:`How to authenticate with Google BigQuery <authentication>` | ||
guide for authentication instructions. | ||
|
||
Parameters | ||
---------- | ||
query : str | ||
SQL-Like Query to return data values. | ||
query_or_table : str | ||
SQL query to return data values. If the string is a table ID, fetch the | ||
rows directly from the table without running a query. | ||
project_id : str, optional | ||
Google BigQuery Account project ID. Optional when available from | ||
Google Cloud Platform project ID. Optional when available from | ||
the environment. | ||
index_col : str, optional | ||
Name of result column to use for index in results DataFrame. | ||
|
@@ -688,14 +710,14 @@ def read_gbq( | |
reauth : boolean, default False | ||
Force Google BigQuery to re-authenticate the user. This is useful | ||
if multiple accounts are used. | ||
auth_local_webserver : boolean, default False | ||
Use the `local webserver flow`_ instead of the `console flow`_ | ||
when getting user credentials. | ||
|
||
.. _local webserver flow: | ||
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server | ||
.. _console flow: | ||
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console | ||
auth_local_webserver : bool, default False | ||
Use the `local webserver flow | ||
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_ | ||
instead of the `console flow | ||
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_ | ||
when getting user credentials. Your code must run on the same machine | ||
as your web browser and your web browser can access your application | ||
via ``localhost:808X``. | ||
|
||
.. versionadded:: 0.2.0 | ||
dialect : str, default 'standard' | ||
|
@@ -745,13 +767,6 @@ def read_gbq( | |
<https://cloud.google.com/bigquery/docs/access-control#roles>`__ | ||
permission on the project you are billing queries to. | ||
|
||
**Note:** Due to a `known issue in the ``google-cloud-bigquery`` | ||
package | ||
<https://github.com/googleapis/google-cloud-python/pull/7633>`__ | ||
(fixed in version 1.11.0), you must write your query results to a | ||
destination table. To do this with ``read_gbq``, supply a | ||
``configuration`` dictionary. | ||
|
||
This feature requires the ``google-cloud-bigquery-storage`` and | ||
``pyarrow`` packages. | ||
|
||
|
@@ -823,6 +838,15 @@ def read_gbq( | |
if dialect not in ("legacy", "standard"): | ||
raise ValueError("'{0}' is not valid for dialect".format(dialect)) | ||
|
||
if configuration and "query" in configuration and "query" in configuration["query"]: | ||
if query_or_table is not None: | ||
raise ValueError( | ||
"Query statement can't be specified " | ||
"inside config while it is specified " | ||
"as parameter" | ||
) | ||
query_or_table = configuration["query"].pop("query") | ||
|
||
connector = GbqConnector( | ||
project_id, | ||
reauth=reauth, | ||
|
@@ -834,13 +858,21 @@ def read_gbq( | |
use_bqstorage_api=use_bqstorage_api, | ||
) | ||
|
||
final_df = connector.run_query( | ||
query, | ||
configuration=configuration, | ||
max_results=max_results, | ||
progress_bar_type=progress_bar_type, | ||
dtypes=dtypes, | ||
) | ||
if _is_query(query_or_table): | ||
final_df = connector.run_query( | ||
query_or_table, | ||
configuration=configuration, | ||
max_results=max_results, | ||
progress_bar_type=progress_bar_type, | ||
dtypes=dtypes, | ||
) | ||
else: | ||
final_df = connector.download_table( | ||
query_or_table, | ||
max_results=max_results, | ||
progress_bar_type=progress_bar_type, | ||
dtypes=dtypes, | ||
) | ||
|
||
# Reindex the DataFrame on the provided column | ||
if index_col is not None: | ||
|
@@ -889,7 +921,7 @@ def to_gbq( | |
|
||
This method uses the Google Cloud client library to make requests to | ||
Google BigQuery, documented `here | ||
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__. | ||
<https://googleapis.dev/python/bigquery/latest/index.html>`__. | ||
|
||
See the :ref:`How to authenticate with Google BigQuery <authentication>` | ||
guide for authentication instructions. | ||
|
@@ -902,7 +934,7 @@ def to_gbq( | |
Name of table to be written, in the form ``dataset.tablename`` or | ||
``project.dataset.tablename``. | ||
project_id : str, optional | ||
Google BigQuery Account project ID. Optional when available from | ||
Google Cloud Platform project ID. Optional when available from | ||
the environment. | ||
chunksize : int, optional | ||
Number of rows to be inserted in each chunk from the dataframe. | ||
|
@@ -920,13 +952,13 @@ def to_gbq( | |
``'append'`` | ||
If table exists, insert data. Create if does not exist. | ||
auth_local_webserver : bool, default False | ||
Use the `local webserver flow`_ instead of the `console flow`_ | ||
when getting user credentials. | ||
|
||
.. _local webserver flow: | ||
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server | ||
.. _console flow: | ||
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console | ||
Use the `local webserver flow | ||
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_ | ||
instead of the `console flow | ||
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_ | ||
when getting user credentials. Your code must run on the same machine | ||
as your web browser and your web browser can access your application | ||
via ``localhost:808X``. | ||
|
||
.. versionadded:: 0.2.0 | ||
table_schema : list of dicts, optional | ||
|
Uh oh!
There was an error while loading. Please reload this page.