Skip to content

fix: correctly transform query job timeout configuration and exceptions #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/requirements-3.7-0.24.2.conda
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ coverage
db-dtypes==0.3.1
fastavro
flake8
freezegun
numpy==1.16.6
google-cloud-bigquery==1.27.2
google-cloud-bigquery-storage==1.1.0
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-3.9-1.3.4.conda
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ coverage
db-dtypes
fastavro
flake8
freezegun
google-cloud-bigquery
google-cloud-bigquery-storage
numpy
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def default(session):
"-c",
constraints_path,
)
session.install("freezegun", "-c", constraints_path)

if session.python == "3.9":
extras = ""
Expand Down
1 change: 1 addition & 0 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
system_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
cov_level=96,
unit_test_external_dependencies=["freezegun"],
unit_test_extras=extras,
unit_test_extras_by_python=extras_by_python,
system_test_extras=extras,
Expand Down
91 changes: 68 additions & 23 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import copy
import concurrent.futures
from datetime import datetime
import logging
import re
Expand Down Expand Up @@ -378,6 +380,9 @@ def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
raise QueryTimeout("Reason: {0}".format(ex))

raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
Expand Down Expand Up @@ -406,8 +411,41 @@ def download_table(
user_dtypes=dtypes,
)

def _wait_for_query_job(self, query_reply, timeout_ms):
"""Wait for query to complete, pausing occasionally to update progress.

Args:
query_reply (QueryJob):
A query job which has started.

timeout_ms (Optional[int]):
How long to wait before cancelling the query.
"""
# Wait at most 10 seconds so we can show progress.
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
# Include a tqdm progress bar here instead of a stream of log messages.
timeout_sec = 10.0
if timeout_ms:
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)

while query_reply.state != "DONE":
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")

if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
self.client.cancel_job(
query_reply.job_id, location=query_reply.location
)
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))

try:
query_reply.result(timeout=timeout_sec)
except concurrent.futures.TimeoutError:
# Use our own timeout logic
pass
except self.http_error as ex:
self.process_http_error(ex)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
from google.cloud import bigquery
import pandas
Expand Down Expand Up @@ -449,28 +487,11 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)

while query_reply.state != "DONE":
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")

timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
"timeoutMs"
)
timeout_ms = int(timeout_ms) if timeout_ms else None
if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))

timeout_sec = 1.0
if timeout_ms:
# Wait at most 1 second so we can show progress bar
timeout_sec = min(1.0, timeout_ms / 1000.0)

try:
query_reply.result(timeout=timeout_sec)
except TimeoutError:
# Use our own timeout logic
pass
except self.http_error as ex:
self.process_http_error(ex)
timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
"timeoutMs"
)
timeout_ms = int(timeout_ms) if timeout_ms else None
self._wait_for_query_job(query_reply, timeout_ms)

if query_reply.cache_hit:
logger.debug("Query done.\nCache hit.\n")
Expand Down Expand Up @@ -673,6 +694,28 @@ def _finalize_dtypes(
return df


def _transform_read_gbq_configuration(configuration):
"""
For backwards-compatibility, convert any previously client-side only
parameters such as timeoutMs to the property name expected by the REST API.

Makes a copy of configuration if changes are needed.
"""

if configuration is None:
return None

timeout_ms = configuration.get("query", {}).get("timeoutMs")
if timeout_ms is not None:
# Transform timeoutMs to an actual server-side configuration.
# https://github.com/googleapis/python-bigquery-pandas/issues/479
configuration = copy.deepcopy(configuration)
del configuration["query"]["timeoutMs"]
configuration["jobTimeoutMs"] = timeout_ms

return configuration


def read_gbq(
query_or_table,
project_id=None,
Expand Down Expand Up @@ -847,6 +890,8 @@ def read_gbq(
if dialect not in ("legacy", "standard"):
raise ValueError("'{0}' is not valid for dialect".format(dialect))

configuration = _transform_read_gbq_configuration(configuration)

if configuration and "query" in configuration and "query" in configuration["query"]:
if query_or_table is not None:
raise ValueError(
Expand Down
17 changes: 4 additions & 13 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,22 +473,13 @@ def test_configuration_raises_value_error_with_multiple_config(self, project_id)

def test_timeout_configuration(self, project_id):
sql_statement = """
SELECT
SUM(bottles_sold) total_bottles,
UPPER(category_name) category_name,
magnitude,
liquor.zip_code zip_code
FROM `bigquery-public-data.iowa_liquor_sales.sales` liquor
JOIN `bigquery-public-data.geo_us_boundaries.zip_codes` zip_codes
ON liquor.zip_code = zip_codes.zip_code
JOIN `bigquery-public-data.noaa_historic_severe_storms.tornado_paths` tornados
ON liquor.date = tornados.storm_date
WHERE ST_INTERSECTS(tornado_path_geom, zip_code_geom)
GROUP BY category_name, magnitude, zip_code
ORDER BY magnitude ASC, total_bottles DESC
select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 10000))
"""
configs = [
# pandas-gbq timeout configuration. Transformed to REST API compatible version.
{"query": {"useQueryCache": False, "timeoutMs": 1}},
# REST API job timeout. See:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms
{"query": {"useQueryCache": False}, "jobTimeoutMs": 1},
]
for config in configs:
Expand Down
65 changes: 65 additions & 0 deletions tests/unit/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

# -*- coding: utf-8 -*-

import concurrent.futures
import copy
import datetime
from unittest import mock

import freezegun
import google.api_core.exceptions
import numpy
import pandas
Expand Down Expand Up @@ -114,6 +116,61 @@ def test__is_query(query_or_table, expected):
assert result == expected


@pytest.mark.parametrize(
["original", "expected"],
[
(None, None),
({}, {}),
({"query": {"useQueryCache": False}}, {"query": {"useQueryCache": False}}),
({"jobTimeoutMs": "1234"}, {"jobTimeoutMs": "1234"}),
({"query": {"timeoutMs": "1234"}}, {"query": {}, "jobTimeoutMs": "1234"}),
],
)
def test__transform_read_gbq_configuration_makes_copy(original, expected):
should_change = original == expected
got = gbq._transform_read_gbq_configuration(original)
assert got == expected
# Catch if we accidentally modified the original.
did_change = original == got
assert did_change == should_change


def test__wait_for_query_job_exits_when_done(mock_bigquery_client):
connector = _make_connector()
connector.client = mock_bigquery_client
connector.start = datetime.datetime(2020, 1, 1).timestamp()

mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob)
type(mock_query).state = mock.PropertyMock(side_effect=("RUNNING", "DONE"))
mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout")

with freezegun.freeze_time("2020-01-01 00:00:00", tick=False):
connector._wait_for_query_job(mock_query, 60)

mock_bigquery_client.cancel_job.assert_not_called()


def test__wait_for_query_job_cancels_after_timeout(mock_bigquery_client):
connector = _make_connector()
connector.client = mock_bigquery_client
connector.start = datetime.datetime(2020, 1, 1).timestamp()

mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob)
mock_query.job_id = "a-random-id"
mock_query.location = "job-location"
mock_query.state = "RUNNING"
mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout")

with freezegun.freeze_time(
"2020-01-01 00:00:00", auto_tick_seconds=15
), pytest.raises(gbq.QueryTimeout):
connector._wait_for_query_job(mock_query, 60)

mock_bigquery_client.cancel_job.assert_called_with(
"a-random-id", location="job-location"
)


def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client):
gbq._test_google_api_imports()
pytest.importorskip("google.api_core.client_info")
Expand All @@ -125,6 +182,14 @@ def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client):
assert kwargs["client_info"].user_agent == "pandas-{}".format(pandas.__version__)


def test_GbqConnector_process_http_error_transforms_timeout():
original = google.api_core.exceptions.GoogleAPICallError(
"Job execution was cancelled: Job timed out after 0s"
)
with pytest.raises(gbq.QueryTimeout):
gbq.GbqConnector.process_http_error(original)


def test_to_gbq_should_fail_if_invalid_table_name_passed():
with pytest.raises(gbq.NotFoundException):
gbq.to_gbq(DataFrame([[1]]), "invalid_table_name", project_id="1234")
Expand Down