diff --git a/pandas_gbq/__init__.py b/pandas_gbq/__init__.py index 76c33d60..184f8c44 100644 --- a/pandas_gbq/__init__.py +++ b/pandas_gbq/__init__.py @@ -5,9 +5,10 @@ import warnings from pandas_gbq import version as pandas_gbq_version +from pandas_gbq.contexts import Context, context from . import _versions_helpers -from .gbq import Context, context, read_gbq, to_gbq # noqa +from .gbq import read_gbq, to_gbq # noqa sys_major, sys_minor, sys_micro = _versions_helpers.extract_runtime_version() if sys_major == 3 and sys_minor in (7, 8): diff --git a/pandas_gbq/contexts.py b/pandas_gbq/contexts.py new file mode 100644 index 00000000..76a5a1e2 --- /dev/null +++ b/pandas_gbq/contexts.py @@ -0,0 +1,120 @@ +# Copyright (c) 2025 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + + +class Context(object): + """Storage for objects to be used throughout a session. + + A Context object is initialized when the ``pandas_gbq`` module is + imported, and can be found at :attr:`pandas_gbq.context`. + """ + + def __init__(self): + self._credentials = None + self._project = None + # dialect defaults to None so that read_gbq can stop warning if set. + self._dialect = None + + @property + def credentials(self): + """ + Credentials to use for Google APIs. + + These credentials are automatically cached in memory by calls to + :func:`pandas_gbq.read_gbq` and :func:`pandas_gbq.to_gbq`. To + manually set the credentials, construct an + :class:`google.auth.credentials.Credentials` object and set it as + the context credentials as demonstrated in the example below. See + `auth docs`_ for more information on obtaining credentials. + + .. _auth docs: http://google-auth.readthedocs.io + /en/latest/user-guide.html#obtaining-credentials + + Returns + ------- + google.auth.credentials.Credentials + + Examples + -------- + + Manually setting the context credentials: + + >>> import pandas_gbq + >>> from google.oauth2 import service_account + >>> credentials = service_account.Credentials.from_service_account_file( + ... '/path/to/key.json', + ... ) + >>> pandas_gbq.context.credentials = credentials + """ + return self._credentials + + @credentials.setter + def credentials(self, value): + self._credentials = value + + @property + def project(self): + """Default project to use for calls to Google APIs. + + Returns + ------- + str + + Examples + -------- + + Manually setting the context project: + + >>> import pandas_gbq + >>> pandas_gbq.context.project = 'my-project' + """ + return self._project + + @project.setter + def project(self, value): + self._project = value + + @property + def dialect(self): + """ + Default dialect to use in :func:`pandas_gbq.read_gbq`. + + Allowed values for the BigQuery SQL syntax dialect: + + ``'legacy'`` + Use BigQuery's legacy SQL dialect. For more information see + `BigQuery Legacy SQL Reference + `__. + ``'standard'`` + Use BigQuery's standard SQL, which is + compliant with the SQL 2011 standard. For more information + see `BigQuery Standard SQL Reference + `__. + + Returns + ------- + str + + Examples + -------- + + Setting the default syntax to standard: + + >>> import pandas_gbq + >>> pandas_gbq.context.dialect = 'standard' + """ + return self._dialect + + @dialect.setter + def dialect(self, value): + self._dialect = value + + +# Create an empty context, used to cache credentials. +context = Context() +"""A :class:`pandas_gbq.Context` object used to cache credentials. + +Credentials automatically are cached in-memory by :func:`pandas_gbq.read_gbq` +and :func:`pandas_gbq.to_gbq`. +""" diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index af58212e..1acec712 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -3,6 +3,70 @@ # license that can be found in the LICENSE file. +class DatasetCreationError(ValueError): + """ + Raised when the create dataset method fails + """ + + +class InvalidColumnOrder(ValueError): + """ + Raised when the provided column order for output + results DataFrame does not match the schema + returned by BigQuery. + """ + + +class InvalidIndexColumn(ValueError): + """ + Raised when the provided index column for output + results DataFrame does not match the schema + returned by BigQuery. + """ + + +class InvalidPageToken(ValueError): + """ + Raised when Google BigQuery fails to return, + or returns a duplicate page token. + """ + + +class InvalidSchema(ValueError): + """ + Raised when the provided DataFrame does + not match the schema of the destination + table in BigQuery. + """ + + def __init__(self, message: str): + self._message = message + + @property + def message(self) -> str: + return self._message + + +class NotFoundException(ValueError): + """ + Raised when the project_id, table or dataset provided in the query could + not be found. + """ + + +class TableCreationError(ValueError): + """ + Raised when the create table method fails + """ + + def __init__(self, message: str): + self._message = message + + @property + def message(self) -> str: + return self._message + + class GenericGBQException(ValueError): """ Raised when an unrecognized Google API Error occurs. diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 932c98fe..8db1d4ea 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -6,32 +6,31 @@ from datetime import datetime import logging import re -import time -import typing -from typing import Any, Dict, Optional, Sequence, Union import warnings -import numpy as np - -# Only import at module-level at type checking time to avoid circular -# dependencies in the pandas package, which has an optional dependency on -# pandas-gbq. -if typing.TYPE_CHECKING: # pragma: NO COVER - import pandas - -import pandas_gbq.constants -import pandas_gbq.exceptions -from pandas_gbq.exceptions import GenericGBQException, QueryTimeout +from pandas_gbq.contexts import Context # noqa - backward compatible export +from pandas_gbq.contexts import context +from pandas_gbq.exceptions import ( # noqa - backward compatible export + DatasetCreationError, + GenericGBQException, + InvalidColumnOrder, + InvalidIndexColumn, + NotFoundException, + TableCreationError, +) +from pandas_gbq.exceptions import InvalidPageToken # noqa - backward compatible export +from pandas_gbq.exceptions import InvalidSchema # noqa - backward compatible export +from pandas_gbq.exceptions import QueryTimeout # noqa - backward compatible export from pandas_gbq.features import FEATURES -import pandas_gbq.query +from pandas_gbq.gbq_connector import ( # noqa - backward compatible export + GbqConnector, + _bqschema_to_nullsafe_dtypes, + _finalize_dtypes, + create_user_agent, +) +from pandas_gbq.gbq_connector import _get_client # noqa - backward compatible export import pandas_gbq.schema import pandas_gbq.schema.pandas_to_bigquery -import pandas_gbq.timestamp - -try: - import tqdm # noqa -except ImportError: - tqdm = None logger = logging.getLogger(__name__) @@ -72,601 +71,6 @@ def _is_query(query_or_table: str) -> bool: return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None -class DatasetCreationError(ValueError): - """ - Raised when the create dataset method fails - """ - - -class InvalidColumnOrder(ValueError): - """ - Raised when the provided column order for output - results DataFrame does not match the schema - returned by BigQuery. - """ - - -class InvalidIndexColumn(ValueError): - """ - Raised when the provided index column for output - results DataFrame does not match the schema - returned by BigQuery. - """ - - -class InvalidPageToken(ValueError): - """ - Raised when Google BigQuery fails to return, - or returns a duplicate page token. - """ - - -class InvalidSchema(ValueError): - """ - Raised when the provided DataFrame does - not match the schema of the destination - table in BigQuery. - """ - - def __init__(self, message: str): - self._message = message - - @property - def message(self) -> str: - return self._message - - -class NotFoundException(ValueError): - """ - Raised when the project_id, table or dataset provided in the query could - not be found. - """ - - -class TableCreationError(ValueError): - """ - Raised when the create table method fails - """ - - def __init__(self, message: str): - self._message = message - - @property - def message(self) -> str: - return self._message - - -class Context(object): - """Storage for objects to be used throughout a session. - - A Context object is initialized when the ``pandas_gbq`` module is - imported, and can be found at :attr:`pandas_gbq.context`. - """ - - def __init__(self): - self._credentials = None - self._project = None - # dialect defaults to None so that read_gbq can stop warning if set. - self._dialect = None - - @property - def credentials(self): - """ - Credentials to use for Google APIs. - - These credentials are automatically cached in memory by calls to - :func:`pandas_gbq.read_gbq` and :func:`pandas_gbq.to_gbq`. To - manually set the credentials, construct an - :class:`google.auth.credentials.Credentials` object and set it as - the context credentials as demonstrated in the example below. See - `auth docs`_ for more information on obtaining credentials. - - .. _auth docs: http://google-auth.readthedocs.io - /en/latest/user-guide.html#obtaining-credentials - - Returns - ------- - google.auth.credentials.Credentials - - Examples - -------- - - Manually setting the context credentials: - - >>> import pandas_gbq - >>> from google.oauth2 import service_account - >>> credentials = service_account.Credentials.from_service_account_file( - ... '/path/to/key.json', - ... ) - >>> pandas_gbq.context.credentials = credentials - """ - return self._credentials - - @credentials.setter - def credentials(self, value): - self._credentials = value - - @property - def project(self): - """Default project to use for calls to Google APIs. - - Returns - ------- - str - - Examples - -------- - - Manually setting the context project: - - >>> import pandas_gbq - >>> pandas_gbq.context.project = 'my-project' - """ - return self._project - - @project.setter - def project(self, value): - self._project = value - - @property - def dialect(self): - """ - Default dialect to use in :func:`pandas_gbq.read_gbq`. - - Allowed values for the BigQuery SQL syntax dialect: - - ``'legacy'`` - Use BigQuery's legacy SQL dialect. For more information see - `BigQuery Legacy SQL Reference - `__. - ``'standard'`` - Use BigQuery's standard SQL, which is - compliant with the SQL 2011 standard. For more information - see `BigQuery Standard SQL Reference - `__. - - Returns - ------- - str - - Examples - -------- - - Setting the default syntax to standard: - - >>> import pandas_gbq - >>> pandas_gbq.context.dialect = 'standard' - """ - return self._dialect - - @dialect.setter - def dialect(self, value): - self._dialect = value - - -# Create an empty context, used to cache credentials. -context = Context() -"""A :class:`pandas_gbq.Context` object used to cache credentials. - -Credentials automatically are cached in-memory by :func:`pandas_gbq.read_gbq` -and :func:`pandas_gbq.to_gbq`. -""" - - -class GbqConnector(object): - def __init__( - self, - project_id, - reauth=False, - private_key=None, - auth_local_webserver=True, - dialect="standard", - location=None, - credentials=None, - use_bqstorage_api=False, - auth_redirect_uri=None, - client_id=None, - client_secret=None, - user_agent=None, - rfc9110_delimiter=False, - bigquery_client=None, - ): - from google.api_core.exceptions import ClientError, GoogleAPIError - - from pandas_gbq import auth - - self.http_error = (ClientError, GoogleAPIError) - self.project_id = project_id - self.location = location - self.reauth = reauth - self.private_key = private_key - self.auth_local_webserver = auth_local_webserver - self.dialect = dialect - self.credentials = credentials - self.auth_redirect_uri = auth_redirect_uri - self.client_id = client_id - self.client_secret = client_secret - self.user_agent = user_agent - self.rfc9110_delimiter = rfc9110_delimiter - self.use_bqstorage_api = use_bqstorage_api - - if bigquery_client is not None: - # If a bq client is already provided, use it to populate auth fields. - self.project_id = bigquery_client.project - self.credentials = bigquery_client._credentials - self.client = bigquery_client - return - - default_project = None - - # Service account credentials have a project associated with them. - # Prefer that project if none was supplied. - if self.project_id is None and hasattr(self.credentials, "project_id"): - self.project_id = credentials.project_id - - # Load credentials from cache. - if not self.credentials: - self.credentials = context.credentials - default_project = context.project - - # Credentials were explicitly asked for, so don't use the cache. - if private_key or reauth or not self.credentials: - self.credentials, default_project = auth.get_credentials( - private_key=private_key, - project_id=project_id, - reauth=reauth, - auth_local_webserver=auth_local_webserver, - auth_redirect_uri=auth_redirect_uri, - client_id=client_id, - client_secret=client_secret, - ) - - if self.project_id is None: - self.project_id = default_project - - if self.project_id is None: - raise ValueError("Could not determine project ID and one was not supplied.") - - # Cache the credentials if they haven't been set yet. - if context.credentials is None: - context.credentials = self.credentials - if context.project is None: - context.project = self.project_id - - self.client = _get_client( - self.user_agent, self.rfc9110_delimiter, self.project_id, self.credentials - ) - - def _start_timer(self): - self.start = time.time() - - def get_elapsed_seconds(self): - return round(time.time() - self.start, 2) - - def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6): - sec = self.get_elapsed_seconds() - if sec > overlong: - logger.info("{} {} {}".format(prefix, sec, postfix)) - - def get_client(self): - import google.api_core.client_info - - bigquery = FEATURES.bigquery_try_import() - - user_agent = create_user_agent( - user_agent=self.user_agent, rfc9110_delimiter=self.rfc9110_delimiter - ) - - client_info = google.api_core.client_info.ClientInfo( - user_agent=user_agent, - ) - return bigquery.Client( - project=self.project_id, - credentials=self.credentials, - client_info=client_info, - ) - - @staticmethod - def process_http_error(ex): - # See `BigQuery Troubleshooting Errors - # `__ - - message = ( - ex.message.casefold() - if hasattr(ex, "message") and ex.message is not None - else "" - ) - if "cancelled" in message: - raise QueryTimeout("Reason: {0}".format(ex)) - elif "schema does not match" in message: - error_message = ex.errors[0]["message"] - raise InvalidSchema(f"Reason: {error_message}") - elif "already exists: table" in message: - error_message = ex.errors[0]["message"] - raise TableCreationError(f"Reason: {error_message}") - else: - raise GenericGBQException("Reason: {0}".format(ex)) from ex - - def download_table( - self, - table_id: str, - max_results: Optional[int] = None, - progress_bar_type: Optional[str] = None, - dtypes: Optional[Dict[str, Union[str, Any]]] = None, - ) -> "pandas.DataFrame": - from google.cloud import bigquery - - self._start_timer() - - try: - table_ref = bigquery.TableReference.from_string( - table_id, default_project=self.project_id - ) - rows_iter = self.client.list_rows(table_ref, max_results=max_results) - except self.http_error as ex: - self.process_http_error(ex) - - return self._download_results( - rows_iter, - max_results=max_results, - progress_bar_type=progress_bar_type, - user_dtypes=dtypes, - ) - - def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): - from google.cloud import bigquery - - job_config_dict = { - "query": { - "useLegacySql": self.dialect - == "legacy" - # 'allowLargeResults', 'createDisposition', - # 'preserveNulls', destinationTable, useQueryCache - } - } - config = kwargs.get("configuration") - if config is not None: - job_config_dict.update(config) - - timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[ - "query" - ].get("timeoutMs") - - if timeout_ms: - timeout_ms = int(timeout_ms) - # Having too small a timeout_ms results in individual - # API calls timing out before they can finish. - # ~300 milliseconds is rule of thumb for bare minimum - # latency from the BigQuery API, however, 400 milliseconds - # produced too many issues with flakybot failures. - minimum_latency = 500 - if timeout_ms < minimum_latency: - raise QueryTimeout( - f"Query timeout must be at least 500 milliseconds: timeout_ms equals {timeout_ms}." - ) - else: - timeout_ms = None - - self._start_timer() - job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict) - - if FEATURES.bigquery_has_query_and_wait: - rows_iter = pandas_gbq.query.query_and_wait_via_client_library( - self, - self.client, - query, - location=self.location, - project_id=self.project_id, - job_config=job_config, - max_results=max_results, - timeout_ms=timeout_ms, - ) - else: - rows_iter = pandas_gbq.query.query_and_wait( - self, - self.client, - query, - location=self.location, - project_id=self.project_id, - job_config=job_config, - max_results=max_results, - timeout_ms=timeout_ms, - ) - - dtypes = kwargs.get("dtypes") - return self._download_results( - rows_iter, - max_results=max_results, - progress_bar_type=progress_bar_type, - user_dtypes=dtypes, - ) - - def _download_results( - self, - rows_iter, - max_results=None, - progress_bar_type=None, - user_dtypes=None, - ): - # No results are desired, so don't bother downloading anything. - if max_results == 0: - return None - - if user_dtypes is None: - user_dtypes = {} - - create_bqstorage_client = self.use_bqstorage_api - if max_results is not None: - create_bqstorage_client = False - - # If we're downloading a large table, BigQuery DataFrames might be a - # better fit. Not all code paths will populate rows_iter._table, but - # if it's not populated that means we are working with a small result - # set. - if (table_ref := getattr(rows_iter, "_table", None)) is not None: - table = self.client.get_table(table_ref) - if ( - isinstance((num_bytes := table.num_bytes), int) - and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES - ): - num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB - warnings.warn( - f"Recommendation: Your results are {num_gib:.1f} GiB. " - "Consider using BigQuery DataFrames (https://bit.ly/bigframes-intro)" - "to process large results with pandas compatible APIs with transparent SQL " - "pushdown to BigQuery engine. This provides an opportunity to save on costs " - "and improve performance. " - "Please reach out to bigframes-feedback@google.com with any " - "questions or concerns. To disable this message, run " - "warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)", - category=pandas_gbq.exceptions.LargeResultsWarning, - # user's code - # -> read_gbq - # -> run_query - # -> download_results - stacklevel=4, - ) - - try: - schema_fields = [field.to_api_repr() for field in rows_iter.schema] - conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) - conversion_dtypes.update(user_dtypes) - df = rows_iter.to_dataframe( - dtypes=conversion_dtypes, - progress_bar_type=progress_bar_type, - create_bqstorage_client=create_bqstorage_client, - ) - except self.http_error as ex: - self.process_http_error(ex) - - df = _finalize_dtypes(df, schema_fields) - - logger.debug("Got {} rows.\n".format(rows_iter.total_rows)) - return df - - def load_data( - self, - dataframe, - destination_table_ref, - write_disposition, - chunksize=None, - schema=None, - progress_bar=True, - api_method: str = "load_parquet", - billing_project: Optional[str] = None, - ): - from pandas_gbq import load - - total_rows = len(dataframe) - - try: - chunks = load.load_chunks( - self.client, - dataframe, - destination_table_ref, - chunksize=chunksize, - schema=schema, - location=self.location, - api_method=api_method, - write_disposition=write_disposition, - billing_project=billing_project, - ) - if progress_bar and tqdm: - chunks = tqdm.tqdm(chunks) - for remaining_rows in chunks: - logger.info( - "\r{} out of {} rows loaded.".format( - total_rows - remaining_rows, total_rows - ) - ) - except self.http_error as ex: - self.process_http_error(ex) - - -def _bqschema_to_nullsafe_dtypes(schema_fields): - """Specify explicit dtypes based on BigQuery schema. - - This function only specifies a dtype when the dtype allows nulls. - Otherwise, use pandas's default dtype choice. - - See: http://pandas.pydata.org/pandas-docs/dev/missing_data.html - #missing-data-casting-rules-and-indexing - """ - import db_dtypes - - # If you update this mapping, also update the table at - # `docs/reading.rst`. - dtype_map = { - "FLOAT": np.dtype(float), - "INTEGER": "Int64", - "TIME": db_dtypes.TimeDtype(), - # Note: Other types such as 'datetime64[ns]' and db_types.DateDtype() - # are not included because the pandas range does not align with the - # BigQuery range. We need to attempt a conversion to those types and - # fall back to 'object' when there are out-of-range values. - } - - # Amend dtype_map with newer extension types if pandas version allows. - if FEATURES.pandas_has_boolean_dtype: - dtype_map["BOOLEAN"] = "boolean" - - dtypes = {} - for field in schema_fields: - name = str(field["name"]) - # Array BigQuery type is represented as an object column containing - # list objects. - if field["mode"].upper() == "REPEATED": - dtypes[name] = "object" - continue - - dtype = dtype_map.get(field["type"].upper()) - if dtype: - dtypes[name] = dtype - - return dtypes - - -def _finalize_dtypes( - df: "pandas.DataFrame", schema_fields: Sequence[Dict[str, Any]] -) -> "pandas.DataFrame": - """ - Attempt to change the dtypes of those columns that don't map exactly. - - For example db_dtypes.DateDtype() and datetime64[ns] cannot represent - 0001-01-01, but they can represent dates within a couple hundred years of - 1970. See: - https://github.com/googleapis/python-bigquery-pandas/issues/365 - """ - import db_dtypes - import pandas.api.types - - # If you update this mapping, also update the table at - # `docs/reading.rst`. - dtype_map = { - "DATE": db_dtypes.DateDtype(), - "DATETIME": "datetime64[ns]", - "TIMESTAMP": "datetime64[ns]", - } - - for field in schema_fields: - # This method doesn't modify ARRAY/REPEATED columns. - if field["mode"].upper() == "REPEATED": - continue - - name = str(field["name"]) - dtype = dtype_map.get(field["type"].upper()) - - # Avoid deprecated conversion to timezone-naive dtype by only casting - # object dtypes. - if dtype and pandas.api.types.is_object_dtype(df[name]): - df[name] = df[name].astype(dtype, errors="ignore") - - # Ensure any TIMESTAMP columns are tz-aware. - df = pandas_gbq.timestamp.localize_df(df, schema_fields) - - return df - - def _transform_read_gbq_configuration(configuration): """ For backwards-compatibility, convert any previously client-side only @@ -1453,78 +857,3 @@ def create(self, dataset_id): self.client.create_dataset(dataset) except self.http_error as ex: self.process_http_error(ex) - - -def create_user_agent( - user_agent: Optional[str] = None, rfc9110_delimiter: bool = False -) -> str: - """Creates a user agent string. - - The legacy format of our the user agent string was: `product-x.y.z` (where x, - y, and z are the major, minor, and micro version numbers). - - Users are able to prepend this string with their own user agent identifier - to render something similar to ` pandas-x.y.z`. - - The legacy format used a hyphen to separate the product from the product - version which differs slightly from the format recommended by RFC9110, which is: - `product/x.y.z`. To produce a user agent more in line with the RFC, set - rfc9110_delimiter to True. This setting does not depend on whether a - user_agent is also supplied. - - Reference: - https://www.rfc-editor.org/info/rfc9110 - - Args: - user_agent (Optional[str]): User agent string. - - rfc9110_delimiter (Optional[bool]): Sets delimiter to a hyphen or a slash. - Default is False, meaning a hyphen will be used. - - Returns (str): - Customized user agent string. - - Deprecation Warning: - In a future major release, the default delimiter will be changed to - a `/` in accordance with RFC9110. - """ - import pandas as pd - - if rfc9110_delimiter: - delimiter = "/" - else: - warnings.warn( - "In a future major release, the default delimiter will be " - "changed to a `/` in accordance with RFC9110.", - PendingDeprecationWarning, - stacklevel=2, - ) - delimiter = "-" - - identity = f"pandas{delimiter}{pd.__version__}" - - if user_agent is None: - user_agent = identity - else: - user_agent = f"{user_agent} {identity}" - - return user_agent - - -def _get_client(user_agent, rfc9110_delimiter, project_id, credentials): - import google.api_core.client_info - - bigquery = FEATURES.bigquery_try_import() - - user_agent = create_user_agent( - user_agent=user_agent, rfc9110_delimiter=rfc9110_delimiter - ) - - client_info = google.api_core.client_info.ClientInfo( - user_agent=user_agent, - ) - return bigquery.Client( - project=project_id, - credentials=credentials, - client_info=client_info, - ) diff --git a/pandas_gbq/gbq_connector.py b/pandas_gbq/gbq_connector.py new file mode 100644 index 00000000..97a22db4 --- /dev/null +++ b/pandas_gbq/gbq_connector.py @@ -0,0 +1,527 @@ +# Copyright (c) 2025 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + + +import logging +import time +import typing +from typing import Any, Dict, Optional, Sequence, Union +import warnings + +import numpy as np + +# Only import at module-level at type checking time to avoid circular +# dependencies in the pandas package, which has an optional dependency on +# pandas-gbq. +if typing.TYPE_CHECKING: # pragma: NO COVER + import pandas + +import pandas_gbq.constants +from pandas_gbq.contexts import context +import pandas_gbq.exceptions +from pandas_gbq.exceptions import ( + GenericGBQException, + InvalidSchema, + QueryTimeout, + TableCreationError, +) +from pandas_gbq.features import FEATURES +import pandas_gbq.query +import pandas_gbq.timestamp + +try: + import tqdm # noqa +except ImportError: + tqdm = None + +logger = logging.getLogger(__name__) + + +class GbqConnector: + def __init__( + self, + project_id, + reauth=False, + private_key=None, + auth_local_webserver=True, + dialect="standard", + location=None, + credentials=None, + use_bqstorage_api=False, + auth_redirect_uri=None, + client_id=None, + client_secret=None, + user_agent=None, + rfc9110_delimiter=False, + bigquery_client=None, + ): + from google.api_core.exceptions import ClientError, GoogleAPIError + + from pandas_gbq import auth + + self.http_error = (ClientError, GoogleAPIError) + self.project_id = project_id + self.location = location + self.reauth = reauth + self.private_key = private_key + self.auth_local_webserver = auth_local_webserver + self.dialect = dialect + self.credentials = credentials + self.auth_redirect_uri = auth_redirect_uri + self.client_id = client_id + self.client_secret = client_secret + self.user_agent = user_agent + self.rfc9110_delimiter = rfc9110_delimiter + self.use_bqstorage_api = use_bqstorage_api + + if bigquery_client is not None: + # If a bq client is already provided, use it to populate auth fields. + self.project_id = bigquery_client.project + self.credentials = bigquery_client._credentials + self.client = bigquery_client + return + + default_project = None + + # Service account credentials have a project associated with them. + # Prefer that project if none was supplied. + if self.project_id is None and hasattr(self.credentials, "project_id"): + self.project_id = credentials.project_id + + # Load credentials from cache. + if not self.credentials: + self.credentials = context.credentials + default_project = context.project + + # Credentials were explicitly asked for, so don't use the cache. + if private_key or reauth or not self.credentials: + self.credentials, default_project = auth.get_credentials( + private_key=private_key, + project_id=project_id, + reauth=reauth, + auth_local_webserver=auth_local_webserver, + auth_redirect_uri=auth_redirect_uri, + client_id=client_id, + client_secret=client_secret, + ) + + if self.project_id is None: + self.project_id = default_project + + if self.project_id is None: + raise ValueError("Could not determine project ID and one was not supplied.") + + # Cache the credentials if they haven't been set yet. + if context.credentials is None: + context.credentials = self.credentials + if context.project is None: + context.project = self.project_id + + self.client = _get_client( + self.user_agent, self.rfc9110_delimiter, self.project_id, self.credentials + ) + + def _start_timer(self): + self.start = time.time() + + def get_elapsed_seconds(self): + return round(time.time() - self.start, 2) + + def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6): + sec = self.get_elapsed_seconds() + if sec > overlong: + logger.info("{} {} {}".format(prefix, sec, postfix)) + + def get_client(self): + import google.api_core.client_info + + bigquery = FEATURES.bigquery_try_import() + + user_agent = create_user_agent( + user_agent=self.user_agent, rfc9110_delimiter=self.rfc9110_delimiter + ) + + client_info = google.api_core.client_info.ClientInfo( + user_agent=user_agent, + ) + return bigquery.Client( + project=self.project_id, + credentials=self.credentials, + client_info=client_info, + ) + + @staticmethod + def process_http_error(ex): + # See `BigQuery Troubleshooting Errors + # `__ + + message = ( + ex.message.casefold() + if hasattr(ex, "message") and ex.message is not None + else "" + ) + if "cancelled" in message: + raise QueryTimeout("Reason: {0}".format(ex)) + elif "schema does not match" in message: + error_message = ex.errors[0]["message"] + raise InvalidSchema(f"Reason: {error_message}") + elif "already exists: table" in message: + error_message = ex.errors[0]["message"] + raise TableCreationError(f"Reason: {error_message}") + else: + raise GenericGBQException("Reason: {0}".format(ex)) from ex + + def download_table( + self, + table_id: str, + max_results: Optional[int] = None, + progress_bar_type: Optional[str] = None, + dtypes: Optional[Dict[str, Union[str, Any]]] = None, + ) -> "pandas.DataFrame": + from google.cloud import bigquery + + self._start_timer() + + try: + table_ref = bigquery.TableReference.from_string( + table_id, default_project=self.project_id + ) + rows_iter = self.client.list_rows(table_ref, max_results=max_results) + except self.http_error as ex: + self.process_http_error(ex) + + return self._download_results( + rows_iter, + max_results=max_results, + progress_bar_type=progress_bar_type, + user_dtypes=dtypes, + ) + + def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): + from google.cloud import bigquery + + job_config_dict = { + "query": { + "useLegacySql": self.dialect + == "legacy" + # 'allowLargeResults', 'createDisposition', + # 'preserveNulls', destinationTable, useQueryCache + } + } + config = kwargs.get("configuration") + if config is not None: + job_config_dict.update(config) + + timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[ + "query" + ].get("timeoutMs") + + if timeout_ms: + timeout_ms = int(timeout_ms) + # Having too small a timeout_ms results in individual + # API calls timing out before they can finish. + # ~300 milliseconds is rule of thumb for bare minimum + # latency from the BigQuery API, however, 400 milliseconds + # produced too many issues with flakybot failures. + minimum_latency = 500 + if timeout_ms < minimum_latency: + raise QueryTimeout( + f"Query timeout must be at least 500 milliseconds: timeout_ms equals {timeout_ms}." + ) + else: + timeout_ms = None + + self._start_timer() + job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict) + + if FEATURES.bigquery_has_query_and_wait: + rows_iter = pandas_gbq.query.query_and_wait_via_client_library( + self, + self.client, + query, + location=self.location, + project_id=self.project_id, + job_config=job_config, + max_results=max_results, + timeout_ms=timeout_ms, + ) + else: + rows_iter = pandas_gbq.query.query_and_wait( + self, + self.client, + query, + location=self.location, + project_id=self.project_id, + job_config=job_config, + max_results=max_results, + timeout_ms=timeout_ms, + ) + + dtypes = kwargs.get("dtypes") + return self._download_results( + rows_iter, + max_results=max_results, + progress_bar_type=progress_bar_type, + user_dtypes=dtypes, + ) + + def _download_results( + self, + rows_iter, + max_results=None, + progress_bar_type=None, + user_dtypes=None, + ): + # No results are desired, so don't bother downloading anything. + if max_results == 0: + return None + + if user_dtypes is None: + user_dtypes = {} + + create_bqstorage_client = self.use_bqstorage_api + if max_results is not None: + create_bqstorage_client = False + + # If we're downloading a large table, BigQuery DataFrames might be a + # better fit. Not all code paths will populate rows_iter._table, but + # if it's not populated that means we are working with a small result + # set. + if (table_ref := getattr(rows_iter, "_table", None)) is not None: + table = self.client.get_table(table_ref) + if ( + isinstance((num_bytes := table.num_bytes), int) + and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES + ): + num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB + warnings.warn( + f"Recommendation: Your results are {num_gib:.1f} GiB. " + "Consider using BigQuery DataFrames (https://bit.ly/bigframes-intro)" + "to process large results with pandas compatible APIs with transparent SQL " + "pushdown to BigQuery engine. This provides an opportunity to save on costs " + "and improve performance. " + "Please reach out to bigframes-feedback@google.com with any " + "questions or concerns. To disable this message, run " + "warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)", + category=pandas_gbq.exceptions.LargeResultsWarning, + # user's code + # -> read_gbq + # -> run_query + # -> download_results + stacklevel=4, + ) + + try: + schema_fields = [field.to_api_repr() for field in rows_iter.schema] + conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) + conversion_dtypes.update(user_dtypes) + df = rows_iter.to_dataframe( + dtypes=conversion_dtypes, + progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, + ) + except self.http_error as ex: + self.process_http_error(ex) + + df = _finalize_dtypes(df, schema_fields) + + logger.debug("Got {} rows.\n".format(rows_iter.total_rows)) + return df + + def load_data( + self, + dataframe, + destination_table_ref, + write_disposition, + chunksize=None, + schema=None, + progress_bar=True, + api_method: str = "load_parquet", + billing_project: Optional[str] = None, + ): + from pandas_gbq import load + + total_rows = len(dataframe) + + try: + chunks = load.load_chunks( + self.client, + dataframe, + destination_table_ref, + chunksize=chunksize, + schema=schema, + location=self.location, + api_method=api_method, + write_disposition=write_disposition, + billing_project=billing_project, + ) + if progress_bar and tqdm: + chunks = tqdm.tqdm(chunks) + for remaining_rows in chunks: + logger.info( + "\r{} out of {} rows loaded.".format( + total_rows - remaining_rows, total_rows + ) + ) + except self.http_error as ex: + self.process_http_error(ex) + + +def _bqschema_to_nullsafe_dtypes(schema_fields): + """Specify explicit dtypes based on BigQuery schema. + + This function only specifies a dtype when the dtype allows nulls. + Otherwise, use pandas's default dtype choice. + + See: http://pandas.pydata.org/pandas-docs/dev/missing_data.html + #missing-data-casting-rules-and-indexing + """ + import db_dtypes + + # If you update this mapping, also update the table at + # `docs/reading.rst`. + dtype_map = { + "FLOAT": np.dtype(float), + "INTEGER": "Int64", + "TIME": db_dtypes.TimeDtype(), + # Note: Other types such as 'datetime64[ns]' and db_types.DateDtype() + # are not included because the pandas range does not align with the + # BigQuery range. We need to attempt a conversion to those types and + # fall back to 'object' when there are out-of-range values. + } + + # Amend dtype_map with newer extension types if pandas version allows. + if FEATURES.pandas_has_boolean_dtype: + dtype_map["BOOLEAN"] = "boolean" + + dtypes = {} + for field in schema_fields: + name = str(field["name"]) + # Array BigQuery type is represented as an object column containing + # list objects. + if field["mode"].upper() == "REPEATED": + dtypes[name] = "object" + continue + + dtype = dtype_map.get(field["type"].upper()) + if dtype: + dtypes[name] = dtype + + return dtypes + + +def _finalize_dtypes( + df: "pandas.DataFrame", schema_fields: Sequence[Dict[str, Any]] +) -> "pandas.DataFrame": + """ + Attempt to change the dtypes of those columns that don't map exactly. + + For example db_dtypes.DateDtype() and datetime64[ns] cannot represent + 0001-01-01, but they can represent dates within a couple hundred years of + 1970. See: + https://github.com/googleapis/python-bigquery-pandas/issues/365 + """ + import db_dtypes + import pandas.api.types + + # If you update this mapping, also update the table at + # `docs/reading.rst`. + dtype_map = { + "DATE": db_dtypes.DateDtype(), + "DATETIME": "datetime64[ns]", + "TIMESTAMP": "datetime64[ns]", + } + + for field in schema_fields: + # This method doesn't modify ARRAY/REPEATED columns. + if field["mode"].upper() == "REPEATED": + continue + + name = str(field["name"]) + dtype = dtype_map.get(field["type"].upper()) + + # Avoid deprecated conversion to timezone-naive dtype by only casting + # object dtypes. + if dtype and pandas.api.types.is_object_dtype(df[name]): + df[name] = df[name].astype(dtype, errors="ignore") + + # Ensure any TIMESTAMP columns are tz-aware. + df = pandas_gbq.timestamp.localize_df(df, schema_fields) + + return df + + +def _get_client(user_agent, rfc9110_delimiter, project_id, credentials): + import google.api_core.client_info + + bigquery = FEATURES.bigquery_try_import() + + user_agent = create_user_agent( + user_agent=user_agent, rfc9110_delimiter=rfc9110_delimiter + ) + + client_info = google.api_core.client_info.ClientInfo( + user_agent=user_agent, + ) + return bigquery.Client( + project=project_id, + credentials=credentials, + client_info=client_info, + ) + + +def create_user_agent( + user_agent: Optional[str] = None, rfc9110_delimiter: bool = False +) -> str: + """Creates a user agent string. + + The legacy format of our the user agent string was: `product-x.y.z` (where x, + y, and z are the major, minor, and micro version numbers). + + Users are able to prepend this string with their own user agent identifier + to render something similar to ` pandas-x.y.z`. + + The legacy format used a hyphen to separate the product from the product + version which differs slightly from the format recommended by RFC9110, which is: + `product/x.y.z`. To produce a user agent more in line with the RFC, set + rfc9110_delimiter to True. This setting does not depend on whether a + user_agent is also supplied. + + Reference: + https://www.rfc-editor.org/info/rfc9110 + + Args: + user_agent (Optional[str]): User agent string. + + rfc9110_delimiter (Optional[bool]): Sets delimiter to a hyphen or a slash. + Default is False, meaning a hyphen will be used. + + Returns (str): + Customized user agent string. + + Deprecation Warning: + In a future major release, the default delimiter will be changed to + a `/` in accordance with RFC9110. + """ + import pandas as pd + + if rfc9110_delimiter: + delimiter = "/" + else: + warnings.warn( + "In a future major release, the default delimiter will be " + "changed to a `/` in accordance with RFC9110.", + PendingDeprecationWarning, + stacklevel=2, + ) + delimiter = "-" + + identity = f"pandas{delimiter}{pd.__version__}" + + if user_agent is None: + user_agent = identity + else: + user_agent = f"{user_agent} {identity}" + + return user_agent diff --git a/tests/system/test_read_gbq.py b/tests/system/test_read_gbq.py index 72cb6b66..946da668 100644 --- a/tests/system/test_read_gbq.py +++ b/tests/system/test_read_gbq.py @@ -645,7 +645,9 @@ def test_empty_dataframe(read_gbq, use_bqstorage_api): } ) result = read_gbq(query, use_bqstorage_api=use_bqstorage_api) - pandas.testing.assert_frame_equal(result, expected, check_index_type=False) + pandas.testing.assert_frame_equal( + result, expected, check_index_type=False, check_dtype=False + ) def test_dml_query(read_gbq, writable_table: str):