diff --git a/.coveragerc b/.coveragerc index ba50bf32..88b85d03 100644 --- a/.coveragerc +++ b/.coveragerc @@ -22,7 +22,7 @@ omit = google/cloud/__init__.py [report] -fail_under = 88 +fail_under = 89 show_missing = True exclude_lines = # Re-enable the standard pragma diff --git a/noxfile.py b/noxfile.py index df3378bf..398b4dc2 100644 --- a/noxfile.py +++ b/noxfile.py @@ -259,7 +259,7 @@ def cover(session): test runs (not system test runs), and then erases coverage data. """ session.install("coverage", "pytest-cov") - session.run("coverage", "report", "--show-missing", "--fail-under=88") + session.run("coverage", "report", "--show-missing", "--fail-under=89") session.run("coverage", "erase") diff --git a/owlbot.py b/owlbot.py index 5ef93de7..9849f98f 100644 --- a/owlbot.py +++ b/owlbot.py @@ -33,7 +33,7 @@ templated_files = common.py_library( unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"], system_test_python_versions=["3.7", "3.8", "3.9", "3.10"], - cov_level=88, + cov_level=89, unit_test_extras=extras, system_test_extras=extras, intersphinx_dependencies={ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index a1ae2896..5dcc3fd0 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -3,12 +3,21 @@ # license that can be found in the LICENSE file. import logging +import re import time import warnings from datetime import datetime +import typing +from typing import Any, Dict, Optional, Union import numpy as np +# Only import at module-level at type checking time to avoid circular +# dependencies in the pandas package, which has an optional dependency on +# pandas-gbq. +if typing.TYPE_CHECKING: # pragma: NO COVER + import pandas + # Required dependencies, but treat as optional so that _test_google_api_imports # can provide a better error message. try: @@ -64,6 +73,10 @@ def _test_google_api_imports(): raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex +def _is_query(query_or_table: str) -> bool: + return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None + + class DatasetCreationError(ValueError): """ Raised when the create dataset method fails @@ -374,6 +387,30 @@ def process_http_error(ex): raise GenericGBQException("Reason: {0}".format(ex)) + def download_table( + self, + table_id: str, + max_results: Optional[int] = None, + progress_bar_type: Optional[str] = None, + dtypes: Optional[Dict[str, Union[str, Any]]] = None, + ) -> "pandas.DataFrame": + self._start_timer() + + try: + table_ref = bigquery.TableReference.from_string( + table_id, default_project=self.project_id + ) + rows_iter = self.client.list_rows(table_ref, max_results=max_results) + except self.http_error as ex: + self.process_http_error(ex) + + return self._download_results( + rows_iter, + max_results=max_results, + progress_bar_type=progress_bar_type, + user_dtypes=dtypes, + ) + def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): from concurrent.futures import TimeoutError from google.auth.exceptions import RefreshError @@ -390,15 +427,6 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): if config is not None: job_config.update(config) - if "query" in config and "query" in config["query"]: - if query is not None: - raise ValueError( - "Query statement can't be specified " - "inside config while it is specified " - "as parameter" - ) - query = config["query"].pop("query") - self._start_timer() try: @@ -464,15 +492,25 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): ) dtypes = kwargs.get("dtypes") + + # Ensure destination is populated. + try: + query_reply.result() + except self.http_error as ex: + self.process_http_error(ex) + + rows_iter = self.client.list_rows( + query_reply.destination, max_results=max_results + ) return self._download_results( - query_reply, + rows_iter, max_results=max_results, progress_bar_type=progress_bar_type, user_dtypes=dtypes, ) def _download_results( - self, query_job, max_results=None, progress_bar_type=None, user_dtypes=None, + self, rows_iter, max_results=None, progress_bar_type=None, user_dtypes=None, ): # No results are desired, so don't bother downloading anything. if max_results == 0: @@ -504,11 +542,6 @@ def _download_results( to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client try: - query_job.result() - # Get the table schema, so that we can list rows. - destination = self.client.get_table(query_job.destination) - rows_iter = self.client.list_rows(destination, max_results=max_results) - schema_fields = [field.to_api_repr() for field in rows_iter.schema] conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) conversion_dtypes.update(user_dtypes) @@ -644,7 +677,7 @@ def _cast_empty_df_dtypes(schema_fields, df): def read_gbq( - query, + query_or_table, project_id=None, index_col=None, col_order=None, @@ -668,17 +701,18 @@ def read_gbq( This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here - `__. + `__. See the :ref:`How to authenticate with Google BigQuery ` guide for authentication instructions. Parameters ---------- - query : str - SQL-Like Query to return data values. + query_or_table : str + SQL query to return data values. If the string is a table ID, fetch the + rows directly from the table without running a query. project_id : str, optional - Google BigQuery Account project ID. Optional when available from + Google Cloud Platform project ID. Optional when available from the environment. index_col : str, optional Name of result column to use for index in results DataFrame. @@ -688,14 +722,14 @@ def read_gbq( reauth : boolean, default False Force Google BigQuery to re-authenticate the user. This is useful if multiple accounts are used. - auth_local_webserver : boolean, default False - Use the `local webserver flow`_ instead of the `console flow`_ - when getting user credentials. - - .. _local webserver flow: - http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server - .. _console flow: - http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console + auth_local_webserver : bool, default False + Use the `local webserver flow + `_ + instead of the `console flow + `_ + when getting user credentials. Your code must run on the same machine + as your web browser and your web browser can access your application + via ``localhost:808X``. .. versionadded:: 0.2.0 dialect : str, default 'standard' @@ -745,13 +779,6 @@ def read_gbq( `__ permission on the project you are billing queries to. - **Note:** Due to a `known issue in the ``google-cloud-bigquery`` - package - `__ - (fixed in version 1.11.0), you must write your query results to a - destination table. To do this with ``read_gbq``, supply a - ``configuration`` dictionary. - This feature requires the ``google-cloud-bigquery-storage`` and ``pyarrow`` packages. @@ -823,6 +850,15 @@ def read_gbq( if dialect not in ("legacy", "standard"): raise ValueError("'{0}' is not valid for dialect".format(dialect)) + if configuration and "query" in configuration and "query" in configuration["query"]: + if query_or_table is not None: + raise ValueError( + "Query statement can't be specified " + "inside config while it is specified " + "as parameter" + ) + query_or_table = configuration["query"].pop("query") + connector = GbqConnector( project_id, reauth=reauth, @@ -834,13 +870,21 @@ def read_gbq( use_bqstorage_api=use_bqstorage_api, ) - final_df = connector.run_query( - query, - configuration=configuration, - max_results=max_results, - progress_bar_type=progress_bar_type, - dtypes=dtypes, - ) + if _is_query(query_or_table): + final_df = connector.run_query( + query_or_table, + configuration=configuration, + max_results=max_results, + progress_bar_type=progress_bar_type, + dtypes=dtypes, + ) + else: + final_df = connector.download_table( + query_or_table, + max_results=max_results, + progress_bar_type=progress_bar_type, + dtypes=dtypes, + ) # Reindex the DataFrame on the provided column if index_col is not None: @@ -889,7 +933,7 @@ def to_gbq( This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here - `__. + `__. See the :ref:`How to authenticate with Google BigQuery ` guide for authentication instructions. @@ -902,7 +946,7 @@ def to_gbq( Name of table to be written, in the form ``dataset.tablename`` or ``project.dataset.tablename``. project_id : str, optional - Google BigQuery Account project ID. Optional when available from + Google Cloud Platform project ID. Optional when available from the environment. chunksize : int, optional Number of rows to be inserted in each chunk from the dataframe. @@ -920,13 +964,13 @@ def to_gbq( ``'append'`` If table exists, insert data. Create if does not exist. auth_local_webserver : bool, default False - Use the `local webserver flow`_ instead of the `console flow`_ - when getting user credentials. - - .. _local webserver flow: - http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server - .. _console flow: - http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console + Use the `local webserver flow + `_ + instead of the `console flow + `_ + when getting user credentials. Your code must run on the same machine + as your web browser and your web browser can access your application + via ``localhost:808X``. .. versionadded:: 0.2.0 table_schema : list of dicts, optional diff --git a/pandas_gbq/timestamp.py b/pandas_gbq/timestamp.py index e0b41475..c6bb6d93 100644 --- a/pandas_gbq/timestamp.py +++ b/pandas_gbq/timestamp.py @@ -7,6 +7,8 @@ Private module. """ +import pandas.api.types + def localize_df(df, schema_fields): """Localize any TIMESTAMP columns to tz-aware type. @@ -38,7 +40,11 @@ def localize_df(df, schema_fields): if "mode" in field and field["mode"].upper() == "REPEATED": continue - if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None: + if ( + field["type"].upper() == "TIMESTAMP" + and pandas.api.types.is_datetime64_ns_dtype(df.dtypes[column]) + and df[column].dt.tz is None + ): df[column] = df[column].dt.tz_localize("UTC") return df diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 6ac55220..4ba8bf31 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -3,6 +3,7 @@ # license that can be found in the LICENSE file. import os +import functools import pathlib from google.cloud import bigquery @@ -56,6 +57,24 @@ def project(project_id): return project_id +@pytest.fixture +def to_gbq(credentials, project_id): + import pandas_gbq + + return functools.partial( + pandas_gbq.to_gbq, project_id=project_id, credentials=credentials + ) + + +@pytest.fixture +def read_gbq(credentials, project_id): + import pandas_gbq + + return functools.partial( + pandas_gbq.read_gbq, project_id=project_id, credentials=credentials + ) + + @pytest.fixture() def random_dataset_id(bigquery_client: bigquery.Client, project_id: str): dataset_id = prefixer.create_prefix() diff --git a/tests/system/test_to_gbq.py b/tests/system/test_to_gbq.py index f7184024..a9274091 100644 --- a/tests/system/test_to_gbq.py +++ b/tests/system/test_to_gbq.py @@ -5,7 +5,6 @@ import datetime import decimal import collections -import functools import random import db_dtypes @@ -23,12 +22,8 @@ def api_method(request): @pytest.fixture -def method_under_test(credentials, project_id): - import pandas_gbq - - return functools.partial( - pandas_gbq.to_gbq, project_id=project_id, credentials=credentials - ) +def method_under_test(to_gbq): + return to_gbq SeriesRoundTripTestCase = collections.namedtuple( @@ -98,7 +93,7 @@ def method_under_test(credentials, project_id): def test_series_round_trip( method_under_test, random_dataset_id, - bigquery_client, + read_gbq, input_series, api_method, api_methods, @@ -114,7 +109,7 @@ def test_series_round_trip( ) method_under_test(df, table_id, api_method=api_method) - round_trip = bigquery_client.list_rows(table_id).to_dataframe() + round_trip = read_gbq(table_id) round_trip_series = round_trip["test_col"].sort_values().reset_index(drop=True) pandas.testing.assert_series_equal( round_trip_series, input_series, check_exact=True, check_names=False, @@ -244,8 +239,8 @@ def test_series_round_trip( ) def test_dataframe_round_trip_with_table_schema( method_under_test, + read_gbq, random_dataset_id, - bigquery_client, input_df, expected_df, table_schema, @@ -260,8 +255,8 @@ def test_dataframe_round_trip_with_table_schema( method_under_test( input_df, table_id, table_schema=table_schema, api_method=api_method ) - round_trip = bigquery_client.list_rows(table_id).to_dataframe( - dtypes=dict(zip(expected_df.columns, expected_df.dtypes)) + round_trip = read_gbq( + table_id, dtypes=dict(zip(expected_df.columns, expected_df.dtypes)), ) round_trip.sort_values("row_num", inplace=True) pandas.testing.assert_frame_equal(expected_df, round_trip) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index cfa1e819..3f0c5e53 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -26,18 +26,35 @@ def mock_bigquery_client(monkeypatch): # Constructor returns the mock itself, so this mock can be treated as the # constructor or the instance. mock_client.return_value = mock_client - mock_schema = [google.cloud.bigquery.SchemaField("_f0", "INTEGER")] - # Mock out SELECT 1 query results. + mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) mock_query.job_id = "some-random-id" mock_query.state = "DONE" mock_rows = mock.create_autospec(google.cloud.bigquery.table.RowIterator) mock_rows.total_rows = 1 - mock_rows.schema = mock_schema + mock_rows.__iter__.return_value = [(1,)] mock_query.result.return_value = mock_rows + mock_client.list_rows.return_value = mock_rows mock_client.query.return_value = mock_query # Mock table creation. monkeypatch.setattr(google.cloud.bigquery, "Client", mock_client) mock_client.reset_mock() + + # Mock out SELECT 1 query results. + def generate_schema(): + query = mock_client.query.call_args[0][0] if mock_client.query.call_args else "" + if query == "SELECT 1 AS int_col": + return [google.cloud.bigquery.SchemaField("int_col", "INTEGER")] + else: + return [google.cloud.bigquery.SchemaField("_f0", "INTEGER")] + + type(mock_rows).schema = mock.PropertyMock(side_effect=generate_schema) + + # Mock out get_table. + def get_table(table_ref_or_id, **kwargs): + return google.cloud.bigquery.Table(table_ref_or_id) + + mock_client.get_table.side_effect = get_table + return mock_client diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 8784a98b..df9241bc 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -8,6 +8,7 @@ import datetime from unittest import mock +import google.api_core.exceptions import numpy import pandas from pandas import DataFrame @@ -82,6 +83,25 @@ def test__bqschema_to_nullsafe_dtypes(type_, expected): assert result == {"x": expected} +@pytest.mark.parametrize( + ["query_or_table", "expected"], + [ + ("SELECT 1", True), + ("SELECT\n1", True), + ("SELECT\t1", True), + ("dataset.table", False), + (" dataset.table ", False), + ("\r\ndataset.table\r\n", False), + ("project-id.dataset.table", False), + (" project-id.dataset.table ", False), + ("\r\nproject-id.dataset.table\r\n", False), + ], +) +def test__is_query(query_or_table, expected): + result = gbq._is_query(query_or_table) + assert result == expected + + def test_GbqConnector_get_client_w_old_bq(monkeypatch, mock_bigquery_client): gbq._test_google_api_imports() connector = _make_connector() @@ -292,9 +312,10 @@ def test_read_gbq_with_no_project_id_given_should_fail(monkeypatch): gbq.read_gbq("SELECT 1", dialect="standard") -def test_read_gbq_with_inferred_project_id(monkeypatch): +def test_read_gbq_with_inferred_project_id(mock_bigquery_client): df = gbq.read_gbq("SELECT 1", dialect="standard") assert df is not None + mock_bigquery_client.query.assert_called_once() def test_read_gbq_with_inferred_project_id_from_service_account_credentials( @@ -473,7 +494,7 @@ def test_read_gbq_passes_dtypes(mock_bigquery_client, mock_service_account_crede def test_read_gbq_use_bqstorage_api( mock_bigquery_client, mock_service_account_credentials ): - if not FEATURES.bigquery_has_bqstorage: + if not FEATURES.bigquery_has_bqstorage: # pragma: NO COVER pytest.skip("requires BigQuery Storage API") mock_service_account_credentials.project_id = "service_account_project_id" @@ -505,3 +526,73 @@ def test_read_gbq_calls_tqdm(mock_bigquery_client, mock_service_account_credenti _, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args assert to_dataframe_kwargs["progress_bar_type"] == "foobar" + + +def test_read_gbq_with_full_table_id( + mock_bigquery_client, mock_service_account_credentials +): + mock_service_account_credentials.project_id = "service_account_project_id" + df = gbq.read_gbq( + "my-project.my_dataset.read_gbq_table", + credentials=mock_service_account_credentials, + project_id="param-project", + ) + assert df is not None + + mock_bigquery_client.query.assert_not_called() + sent_table = mock_bigquery_client.list_rows.call_args[0][0] + assert sent_table.project == "my-project" + assert sent_table.dataset_id == "my_dataset" + assert sent_table.table_id == "read_gbq_table" + + +def test_read_gbq_with_partial_table_id( + mock_bigquery_client, mock_service_account_credentials +): + mock_service_account_credentials.project_id = "service_account_project_id" + df = gbq.read_gbq( + "my_dataset.read_gbq_table", + credentials=mock_service_account_credentials, + project_id="param-project", + ) + assert df is not None + + mock_bigquery_client.query.assert_not_called() + sent_table = mock_bigquery_client.list_rows.call_args[0][0] + assert sent_table.project == "param-project" + assert sent_table.dataset_id == "my_dataset" + assert sent_table.table_id == "read_gbq_table" + + +def test_read_gbq_bypasses_query_with_table_id_and_max_results( + mock_bigquery_client, mock_service_account_credentials +): + mock_service_account_credentials.project_id = "service_account_project_id" + df = gbq.read_gbq( + "my-project.my_dataset.read_gbq_table", + credentials=mock_service_account_credentials, + max_results=11, + ) + assert df is not None + + mock_bigquery_client.query.assert_not_called() + sent_table = mock_bigquery_client.list_rows.call_args[0][0] + assert sent_table.project == "my-project" + assert sent_table.dataset_id == "my_dataset" + assert sent_table.table_id == "read_gbq_table" + sent_max_results = mock_bigquery_client.list_rows.call_args[1]["max_results"] + assert sent_max_results == 11 + + +def test_read_gbq_with_list_rows_error_translates_exception( + mock_bigquery_client, mock_service_account_credentials +): + mock_bigquery_client.list_rows.side_effect = ( + google.api_core.exceptions.NotFound("table not found"), + ) + + with pytest.raises(gbq.GenericGBQException, match="table not found"): + gbq.read_gbq( + "my-project.my_dataset.read_gbq_table", + credentials=mock_service_account_credentials, + )