Skip to content

Commit c210de1

Browse files
mr-mcoxparthea
authored andcommitted
When appending to a table, load if the dataframe contains a subset of the existing schema (googleapis#24)
* Improvements discused in PR conversation Accidentally left a duplicate test in Correcting change to schema made by auto-rebase Fixing missing assertTrue and reversion to not checking subset on append (both from rebase) Replacing AssertEqual Shortening line to pass flake * Making updates per jreback's requested changes * Fixing trailing whitespace * Adding detail to changelog * Use wait_for_job rather than sleep * Revert "Use wait_for_job rather than sleep" This reverts commit 8726a01. * Minor tweaks before merging * Update the to_gbq doc-string as suggested by @jreback * Make travis happy
1 parent 47bcc66 commit c210de1

File tree

4 files changed

+171
-10
lines changed

4 files changed

+171
-10
lines changed

docs/source/changelog.rst

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Changelog
66

77
- Resolve issue where the optional ``--noauth_local_webserver`` command line argument would not be propagated during the authentication process. (:issue:`35`)
88
- Drop support for Python 3.4 (:issue:`40`)
9+
- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. (:issue:`24`)
10+
911

1012
0.1.6 / 2017-05-03
1113
------------------

docs/source/writing.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists.
4040

4141
If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will
4242
be written to the table using the defined table schema and column types. The
43-
dataframe must match the destination table in structure and data types.
43+
dataframe must contain fields (matching name and type) currently in the destination table.
4444
If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a
4545
different schema, a delay of 2 minutes will be forced to ensure that the new schema
4646
has propagated in the Google environment. See

pandas_gbq/gbq.py

+83-9
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,25 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):
557557

558558
self._print("\n")
559559

560-
def verify_schema(self, dataset_id, table_id, schema):
560+
def schema(self, dataset_id, table_id):
561+
"""Retrieve the schema of the table
562+
563+
Obtain from BigQuery the field names and field types
564+
for the table defined by the parameters
565+
566+
Parameters
567+
----------
568+
dataset_id : str
569+
Name of the BigQuery dataset for the table
570+
table_id : str
571+
Name of the BigQuery table
572+
573+
Returns
574+
-------
575+
list of dicts
576+
Fields representing the schema
577+
"""
578+
561579
try:
562580
from googleapiclient.errors import HttpError
563581
except:
@@ -573,15 +591,67 @@ def verify_schema(self, dataset_id, table_id, schema):
573591
'type': field_remote['type']}
574592
for field_remote in remote_schema['fields']]
575593

576-
fields_remote = set([json.dumps(field_remote)
577-
for field_remote in remote_fields])
578-
fields_local = set(json.dumps(field_local)
579-
for field_local in schema['fields'])
580-
581-
return fields_remote == fields_local
594+
return remote_fields
582595
except HttpError as ex:
583596
self.process_http_error(ex)
584597

598+
def verify_schema(self, dataset_id, table_id, schema):
599+
"""Indicate whether schemas match exactly
600+
601+
Compare the BigQuery table identified in the parameters with
602+
the schema passed in and indicate whether all fields in the former
603+
are present in the latter. Order is not considered.
604+
605+
Parameters
606+
----------
607+
dataset_id :str
608+
Name of the BigQuery dataset for the table
609+
table_id : str
610+
Name of the BigQuery table
611+
schema : list(dict)
612+
Schema for comparison. Each item should have
613+
a 'name' and a 'type'
614+
615+
Returns
616+
-------
617+
bool
618+
Whether the schemas match
619+
"""
620+
621+
fields_remote = sorted(self.schema(dataset_id, table_id),
622+
key=lambda x: x['name'])
623+
fields_local = sorted(schema['fields'], key=lambda x: x['name'])
624+
625+
return fields_remote == fields_local
626+
627+
def schema_is_subset(self, dataset_id, table_id, schema):
628+
"""Indicate whether the schema to be uploaded is a subset
629+
630+
Compare the BigQuery table identified in the parameters with
631+
the schema passed in and indicate whether a subset of the fields in
632+
the former are present in the latter. Order is not considered.
633+
634+
Parameters
635+
----------
636+
dataset_id : str
637+
Name of the BigQuery dataset for the table
638+
table_id : str
639+
Name of the BigQuery table
640+
schema : list(dict)
641+
Schema for comparison. Each item should have
642+
a 'name' and a 'type'
643+
644+
Returns
645+
-------
646+
bool
647+
Whether the passed schema is a subset
648+
"""
649+
650+
fields_remote = self.schema(dataset_id, table_id)
651+
fields_local = schema['fields']
652+
653+
return all(field in fields_remote for field in fields_local)
654+
585655
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
586656
delay = 0
587657

@@ -810,7 +880,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
810880
if_exists : {'fail', 'replace', 'append'}, default 'fail'
811881
'fail': If table exists, do nothing.
812882
'replace': If table exists, drop it, recreate it, and insert data.
813-
'append': If table exists, insert data. Create if does not exist.
883+
'append': If table exists and the dataframe schema is a subset of
884+
the destination table schema, insert data. Create destination table
885+
if does not exist.
814886
private_key : str (optional)
815887
Service account private key in JSON format. Can be file path
816888
or string contents. This is useful for remote server
@@ -844,7 +916,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
844916
connector.delete_and_recreate_table(
845917
dataset_id, table_id, table_schema)
846918
elif if_exists == 'append':
847-
if not connector.verify_schema(dataset_id, table_id, table_schema):
919+
if not connector.schema_is_subset(dataset_id,
920+
table_id,
921+
table_schema):
848922
raise InvalidSchema("Please verify that the structure and "
849923
"data types in the DataFrame match the "
850924
"schema of the destination table.")

pandas_gbq/tests/test_gbq.py

+85
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,31 @@ def test_upload_data_if_table_exists_append(self):
10711071
_get_project_id(), if_exists='append',
10721072
private_key=_get_private_key_path())
10731073

1074+
def test_upload_subset_columns_if_table_exists_append(self):
1075+
# Issue 24: Upload is succesful if dataframe has columns
1076+
# which are a subset of the current schema
1077+
test_id = "16"
1078+
test_size = 10
1079+
df = make_mixed_dataframe_v2(test_size)
1080+
df_subset_cols = df.iloc[:, :2]
1081+
1082+
# Initialize table with sample data
1083+
gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(),
1084+
chunksize=10000, private_key=_get_private_key_path())
1085+
1086+
# Test the if_exists parameter with value 'append'
1087+
gbq.to_gbq(df_subset_cols,
1088+
self.destination_table + test_id, _get_project_id(),
1089+
if_exists='append', private_key=_get_private_key_path())
1090+
1091+
sleep(30) # <- Curses Google!!!
1092+
1093+
result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1094+
.format(self.destination_table + test_id),
1095+
project_id=_get_project_id(),
1096+
private_key=_get_private_key_path())
1097+
assert result['num_rows'][0] == test_size * 2
1098+
10741099
def test_upload_data_if_table_exists_replace(self):
10751100
test_id = "4"
10761101
test_size = 10
@@ -1258,6 +1283,66 @@ def test_verify_schema_ignores_field_mode(self):
12581283
assert self.sut.verify_schema(
12591284
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2)
12601285

1286+
def test_retrieve_schema(self):
1287+
# Issue #24 schema function returns the schema in biquery
1288+
test_id = "15"
1289+
test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'},
1290+
{'name': 'B', 'type': 'FLOAT'},
1291+
{'name': 'C', 'type': 'STRING'},
1292+
{'name': 'D', 'type': 'TIMESTAMP'}]}
1293+
1294+
self.table.create(TABLE_ID + test_id, test_schema)
1295+
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
1296+
expected = test_schema['fields']
1297+
assert expected == actual, 'Expected schema used to create table'
1298+
1299+
def test_schema_is_subset_passes_if_subset(self):
1300+
# Issue #24 schema_is_subset indicates whether the schema of the
1301+
# dataframe is a subset of the schema of the bigquery table
1302+
test_id = '16'
1303+
1304+
table_name = TABLE_ID + test_id
1305+
dataset = self.dataset_prefix + '1'
1306+
1307+
table_schema = {'fields': [{'name': 'A',
1308+
'type': 'FLOAT'},
1309+
{'name': 'B',
1310+
'type': 'FLOAT'},
1311+
{'name': 'C',
1312+
'type': 'STRING'}]}
1313+
tested_schema = {'fields': [{'name': 'A',
1314+
'type': 'FLOAT'},
1315+
{'name': 'B',
1316+
'type': 'FLOAT'}]}
1317+
1318+
self.table.create(table_name, table_schema)
1319+
1320+
assert self.sut.schema_is_subset(
1321+
dataset, table_name, tested_schema) is True
1322+
1323+
def test_schema_is_subset_fails_if_not_subset(self):
1324+
# For pull request #24
1325+
test_id = '17'
1326+
1327+
table_name = TABLE_ID + test_id
1328+
dataset = self.dataset_prefix + '1'
1329+
1330+
table_schema = {'fields': [{'name': 'A',
1331+
'type': 'FLOAT'},
1332+
{'name': 'B',
1333+
'type': 'FLOAT'},
1334+
{'name': 'C',
1335+
'type': 'STRING'}]}
1336+
tested_schema = {'fields': [{'name': 'A',
1337+
'type': 'FLOAT'},
1338+
{'name': 'C',
1339+
'type': 'FLOAT'}]}
1340+
1341+
self.table.create(table_name, table_schema)
1342+
1343+
assert self.sut.schema_is_subset(
1344+
dataset, table_name, tested_schema) is False
1345+
12611346
def test_list_dataset(self):
12621347
dataset_id = self.dataset_prefix + "1"
12631348
assert dataset_id in self.dataset.datasets()

0 commit comments

Comments
 (0)