Skip to content

Commit 3114e24

Browse files
BUG: use original schema when appending (#318)
* BUG: use original schema when appending Don't overwrite table schema when appending to an existing table * python 3.5 doesn't support f-string * cln: refactor `to_gbq` to avoid unnecessary extra table GET HTTP calls pandas-gbq already gets the table metadata when checking if a table exists. This refactoring avoids extra calls to get the table metadata when checking the schema. also, fix a bug where update_schema appends columns that aren't in the dataframe to the schema sent in the API request * doc: add fix to changelog * doc: revert accidental whitespace change Co-authored-by: Tim Swast <[email protected]>
1 parent 97e9a9e commit 3114e24

File tree

6 files changed

+180
-111
lines changed

6 files changed

+180
-111
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
- [ ] closes #xxxx
2+
- [ ] tests added / passed
3+
- [ ] passes `nox -s blacken lint`
4+
- [ ] `docs/source/changelog.rst` entry

docs/source/changelog.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
Changelog
22
=========
33

4+
.. _changelog-0.13.2:
5+
6+
0.13.2 / TBD
7+
------------
8+
9+
- Fix ``Provided Schema does not match Table`` error when the existing table
10+
contains required fields. (:issue:`315`)
11+
412
.. _changelog-0.13.1:
513

614
0.13.1 / 2020-02-13

pandas_gbq/gbq.py

Lines changed: 34 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@
55

66
import numpy as np
77

8+
# Required dependencies, but treat as optional so that _test_google_api_imports
9+
# can provide a better error message.
10+
try:
11+
from google.api_core import exceptions as google_exceptions
12+
from google.cloud import bigquery
13+
except ImportError: # pragma: NO COVER
14+
bigquery = None
15+
google_exceptions = None
16+
817
try:
918
# The BigQuery Storage API client is an optional dependency. It is only
1019
# required when use_bqstorage_api=True.
@@ -388,7 +397,6 @@ def sizeof_fmt(num, suffix="B"):
388397
return fmt % (num, "Y", suffix)
389398

390399
def get_client(self):
391-
from google.cloud import bigquery
392400
import pandas
393401

394402
try:
@@ -429,7 +437,6 @@ def run_query(
429437
):
430438
from concurrent.futures import TimeoutError
431439
from google.auth.exceptions import RefreshError
432-
from google.cloud import bigquery
433440

434441
job_config = {
435442
"query": {
@@ -640,15 +647,6 @@ def schema(self, dataset_id, table_id):
640647
except self.http_error as ex:
641648
self.process_http_error(ex)
642649

643-
def _clean_schema_fields(self, fields):
644-
"""Return a sanitized version of the schema for comparisons."""
645-
fields_sorted = sorted(fields, key=lambda field: field["name"])
646-
# Ignore mode and description when comparing schemas.
647-
return [
648-
{"name": field["name"], "type": field["type"]}
649-
for field in fields_sorted
650-
]
651-
652650
def verify_schema(self, dataset_id, table_id, schema):
653651
"""Indicate whether schemas match exactly
654652
@@ -672,43 +670,12 @@ def verify_schema(self, dataset_id, table_id, schema):
672670
Whether the schemas match
673671
"""
674672

675-
fields_remote = self._clean_schema_fields(
673+
fields_remote = pandas_gbq.schema._clean_schema_fields(
676674
self.schema(dataset_id, table_id)
677675
)
678-
fields_local = self._clean_schema_fields(schema["fields"])
679-
676+
fields_local = pandas_gbq.schema._clean_schema_fields(schema["fields"])
680677
return fields_remote == fields_local
681678

682-
def schema_is_subset(self, dataset_id, table_id, schema):
683-
"""Indicate whether the schema to be uploaded is a subset
684-
685-
Compare the BigQuery table identified in the parameters with
686-
the schema passed in and indicate whether a subset of the fields in
687-
the former are present in the latter. Order is not considered.
688-
689-
Parameters
690-
----------
691-
dataset_id : str
692-
Name of the BigQuery dataset for the table
693-
table_id : str
694-
Name of the BigQuery table
695-
schema : list(dict)
696-
Schema for comparison. Each item should have
697-
a 'name' and a 'type'
698-
699-
Returns
700-
-------
701-
bool
702-
Whether the passed schema is a subset
703-
"""
704-
705-
fields_remote = self._clean_schema_fields(
706-
self.schema(dataset_id, table_id)
707-
)
708-
fields_local = self._clean_schema_fields(schema["fields"])
709-
710-
return all(field in fields_remote for field in fields_local)
711-
712679
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
713680
table = _Table(
714681
self.project_id, dataset_id, credentials=self.credentials
@@ -1141,7 +1108,6 @@ def to_gbq(
11411108
"""
11421109

11431110
_test_google_api_imports()
1144-
from pandas_gbq import schema
11451111

11461112
if verbose is not None and SHOW_VERBOSE_DEPRECATION:
11471113
warnings.warn(
@@ -1168,25 +1134,31 @@ def to_gbq(
11681134
credentials=credentials,
11691135
private_key=private_key,
11701136
)
1137+
bqclient = connector.client
11711138
dataset_id, table_id = destination_table.rsplit(".", 1)
11721139

1173-
table = _Table(
1174-
project_id,
1175-
dataset_id,
1176-
location=location,
1177-
credentials=connector.credentials,
1178-
)
1179-
11801140
default_schema = _generate_bq_schema(dataframe)
11811141
if not table_schema:
11821142
table_schema = default_schema
11831143
else:
1184-
table_schema = schema.update_schema(
1144+
table_schema = pandas_gbq.schema.update_schema(
11851145
default_schema, dict(fields=table_schema)
11861146
)
11871147

11881148
# If table exists, check if_exists parameter
1189-
if table.exists(table_id):
1149+
try:
1150+
table = bqclient.get_table(destination_table)
1151+
except google_exceptions.NotFound:
1152+
table_connector = _Table(
1153+
project_id,
1154+
dataset_id,
1155+
location=location,
1156+
credentials=connector.credentials,
1157+
)
1158+
table_connector.create(table_id, table_schema)
1159+
else:
1160+
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)
1161+
11901162
if if_exists == "fail":
11911163
raise TableCreationError(
11921164
"Could not create the table because it "
@@ -1199,16 +1171,20 @@ def to_gbq(
11991171
dataset_id, table_id, table_schema
12001172
)
12011173
elif if_exists == "append":
1202-
if not connector.schema_is_subset(
1203-
dataset_id, table_id, table_schema
1174+
if not pandas_gbq.schema.schema_is_subset(
1175+
original_schema, table_schema
12041176
):
12051177
raise InvalidSchema(
12061178
"Please verify that the structure and "
12071179
"data types in the DataFrame match the "
12081180
"schema of the destination table."
12091181
)
1210-
else:
1211-
table.create(table_id, table_schema)
1182+
1183+
# Update the local `table_schema` so mode matches.
1184+
# See: https://github.com/pydata/pandas-gbq/issues/315
1185+
table_schema = pandas_gbq.schema.update_schema(
1186+
table_schema, original_schema
1187+
)
12121188

12131189
if dataframe.empty:
12141190
# Create the table (if needed), but don't try to run a load job with an

pandas_gbq/schema.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,59 @@
33
import copy
44

55

6+
def to_pandas_gbq(client_schema):
7+
"""Given a sequence of :class:`google.cloud.bigquery.schema.SchemaField`,
8+
return a schema in pandas-gbq API format.
9+
"""
10+
remote_fields = [
11+
field_remote.to_api_repr() for field_remote in client_schema
12+
]
13+
for field in remote_fields:
14+
field["type"] = field["type"].upper()
15+
field["mode"] = field["mode"].upper()
16+
17+
return {"fields": remote_fields}
18+
19+
20+
def _clean_schema_fields(fields):
21+
"""Return a sanitized version of the schema for comparisons.
22+
23+
The ``mode`` and ``description`` properties areis ignored because they
24+
are not generated by func:`pandas_gbq.schema.generate_bq_schema`.
25+
"""
26+
fields_sorted = sorted(fields, key=lambda field: field["name"])
27+
return [
28+
{"name": field["name"], "type": field["type"]}
29+
for field in fields_sorted
30+
]
31+
32+
33+
def schema_is_subset(schema_remote, schema_local):
34+
"""Indicate whether the schema to be uploaded is a subset
35+
36+
Compare the BigQuery table identified in the parameters with
37+
the schema passed in and indicate whether a subset of the fields in
38+
the former are present in the latter. Order is not considered.
39+
40+
Parameters
41+
----------
42+
schema_remote : dict
43+
Schema for comparison. Each item of ``fields`` should have a 'name'
44+
and a 'type'
45+
schema_local : dict
46+
Schema for comparison. Each item of ``fields`` should have a 'name'
47+
and a 'type'
48+
49+
Returns
50+
-------
51+
bool
52+
Whether the passed schema is a subset
53+
"""
54+
fields_remote = _clean_schema_fields(schema_remote.get("fields", []))
55+
fields_local = _clean_schema_fields(schema_local.get("fields", []))
56+
return all(field in fields_remote for field in fields_local)
57+
58+
659
def generate_bq_schema(dataframe, default_type="STRING"):
760
"""Given a passed dataframe, generate the associated Google BigQuery schema.
861
@@ -59,9 +112,6 @@ def update_schema(schema_old, schema_new):
59112
if name in field_indices:
60113
# replace old field with new field of same name
61114
output_fields[field_indices[name]] = field
62-
else:
63-
# add new field
64-
output_fields.append(field)
65115

66116
return {"fields": output_fields}
67117

tests/system/test_gbq.py

Lines changed: 30 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import pytz
1313

1414
from pandas_gbq import gbq
15+
import pandas_gbq.schema
1516

1617

1718
TABLE_ID = "new_test"
@@ -1637,7 +1638,7 @@ def test_retrieve_schema(gbq_table, gbq_connector):
16371638
}
16381639

16391640
gbq_table.create(table_id, test_schema)
1640-
actual = gbq_connector._clean_schema_fields(
1641+
actual = pandas_gbq.schema._clean_schema_fields(
16411642
gbq_connector.schema(gbq_table.dataset_id, table_id)
16421643
)
16431644
expected = [
@@ -1649,48 +1650,39 @@ def test_retrieve_schema(gbq_table, gbq_connector):
16491650
assert expected == actual, "Expected schema used to create table"
16501651

16511652

1652-
def test_schema_is_subset_passes_if_subset(gbq_table, gbq_connector):
1653-
# Issue #24 schema_is_subset indicates whether the schema of the
1654-
# dataframe is a subset of the schema of the bigquery table
1655-
table_id = "test_schema_is_subset_passes_if_subset"
1653+
def test_to_gbq_does_not_override_mode(gbq_table, gbq_connector):
1654+
# See: https://github.com/pydata/pandas-gbq/issues/315
1655+
table_id = "test_to_gbq_does_not_override_mode"
16561656
table_schema = {
16571657
"fields": [
1658-
{"name": "A", "type": "FLOAT"},
1659-
{"name": "B", "type": "FLOAT"},
1660-
{"name": "C", "type": "STRING"},
1661-
]
1662-
}
1663-
tested_schema = {
1664-
"fields": [
1665-
{"name": "A", "type": "FLOAT"},
1666-
{"name": "B", "type": "FLOAT"},
1658+
{
1659+
"mode": "REQUIRED",
1660+
"name": "A",
1661+
"type": "FLOAT",
1662+
"description": "A",
1663+
},
1664+
{
1665+
"mode": "NULLABLE",
1666+
"name": "B",
1667+
"type": "FLOAT",
1668+
"description": "B",
1669+
},
1670+
{
1671+
"mode": "NULLABLE",
1672+
"name": "C",
1673+
"type": "STRING",
1674+
"description": "C",
1675+
},
16671676
]
16681677
}
16691678

16701679
gbq_table.create(table_id, table_schema)
1671-
assert gbq_connector.schema_is_subset(
1672-
gbq_table.dataset_id, table_id, tested_schema
1680+
gbq.to_gbq(
1681+
pandas.DataFrame({"A": [1.0], "B": [2.0], "C": ["a"]}),
1682+
"{0}.{1}".format(gbq_table.dataset_id, table_id),
1683+
project_id=gbq_connector.project_id,
1684+
if_exists="append",
16731685
)
16741686

1675-
1676-
def test_schema_is_subset_fails_if_not_subset(gbq_table, gbq_connector):
1677-
# For pull request #24
1678-
table_id = "test_schema_is_subset_fails_if_not_subset"
1679-
table_schema = {
1680-
"fields": [
1681-
{"name": "A", "type": "FLOAT"},
1682-
{"name": "B", "type": "FLOAT"},
1683-
{"name": "C", "type": "STRING"},
1684-
]
1685-
}
1686-
tested_schema = {
1687-
"fields": [
1688-
{"name": "A", "type": "FLOAT"},
1689-
{"name": "C", "type": "FLOAT"},
1690-
]
1691-
}
1692-
1693-
gbq_table.create(table_id, table_schema)
1694-
assert not gbq_connector.schema_is_subset(
1695-
gbq_table.dataset_id, table_id, tested_schema
1696-
)
1687+
actual = gbq_connector.schema(gbq_table.dataset_id, table_id)
1688+
assert table_schema["fields"] == actual

0 commit comments

Comments
 (0)