Skip to content

Commit fe5c461

Browse files
committed
Improvements discused in PR conversation
Accidentally left a duplicate test in
1 parent 9dfd106 commit fe5c461

File tree

4 files changed

+147
-4
lines changed

4 files changed

+147
-4
lines changed

docs/source/changelog.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ Changelog
99

1010
- All gbq errors will simply be subclasses of ``ValueError`` and no longer inherit from the deprecated ``PandasError``.
1111

12+
0.1.5 / 2017-04-20
13+
------------------
14+
- When using ```to_gbq``` if ```if_exists``` is set to ```append```, dataframe needs to contain only a subset of the fields in the BigQuery schema. GH#24
15+
1216
0.1.4 / 2017-03-17
1317
------------------
1418

docs/source/writing.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists.
4040

4141
If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will
4242
be written to the table using the defined table schema and column types. The
43-
dataframe must match the destination table in structure and data types.
43+
dataframe must contain fields (matching name and type) currently in the destination.
4444
If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a
4545
different schema, a delay of 2 minutes will be forced to ensure that the new schema
4646
has propagated in the Google environment. See

pandas_gbq/gbq.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,19 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):
557557

558558
self._print("\n")
559559

560-
def verify_schema(self, dataset_id, table_id, schema):
560+
def schema(self, dataset_id, table_id):
561+
"""Retrieve the schema of the table
562+
563+
Obtain from BigQuery the field names and field types
564+
for the table defined by the parameters
565+
566+
:param str dataset_id: Name of the BigQuery dataset for the table
567+
:param str table_id: Name of the BigQuery table
568+
569+
:return: Fields representing the schema
570+
:rtype: list of dicts
571+
"""
572+
561573
try:
562574
from googleapiclient.errors import HttpError
563575
except:
@@ -582,6 +594,49 @@ def verify_schema(self, dataset_id, table_id, schema):
582594
except HttpError as ex:
583595
self.process_http_error(ex)
584596

597+
def verify_schema(self, dataset_id, table_id, schema):
598+
"""Indicate whether schemas match exactly
599+
600+
Compare the BigQuery table identified in the parameters with
601+
the schema passed in and indicate whether all fields in the former
602+
are present in the latter. Order is not considered.
603+
604+
:param str dataset_id: Name of the BigQuery dataset for the table
605+
:param str table_id: Name of the BigQuery table
606+
:param list(dict) schema: Schema for comparison. Each item should have
607+
a 'name' and a 'type'
608+
609+
:return: Whether the schemas match
610+
:rtype: bool
611+
"""
612+
613+
fields_remote = sorted(self.schema(dataset_id, table_id),
614+
key=lambda x: x['name'])
615+
fields_local = sorted(schema['fields'], key=lambda x: x['name'])
616+
617+
return fields_remote == fields_local
618+
619+
def schema_is_subset(self, dataset_id, table_id, schema):
620+
"""Indicate whether the schema to be uploaded is a subset
621+
622+
Compare the BigQuery table identified in the parameters with
623+
the schema passed in and indicate whether a subset of the fields in
624+
the former are present in the latter. Order is not considered.
625+
626+
:param str dataset_id: Name of the BigQuery dataset for the table
627+
:param str table_id: Name of the BigQuery table
628+
:param list(dict) schema: Schema for comparison. Each item should have
629+
a 'name' and a 'type'
630+
631+
:return: Whether the passed schema is a subset
632+
:rtype: bool
633+
"""
634+
635+
fields_remote = self.schema(dataset_id, table_id)
636+
fields_local = schema['fields']
637+
638+
return all(field in fields_remote for field in fields_local)
639+
585640
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
586641
delay = 0
587642

pandas_gbq/tests/test_gbq.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,30 @@ def test_upload_data_if_table_exists_append(self):
10711071
_get_project_id(), if_exists='append',
10721072
private_key=_get_private_key_path())
10731073

1074+
def test_upload_subset_columns_if_table_exists_append(self):
1075+
# For pull request #24
1076+
test_id = "16"
1077+
test_size = 10
1078+
df = make_mixed_dataframe_v2(test_size)
1079+
df_subset_cols = df.iloc[:, :2]
1080+
1081+
# Initialize table with sample data
1082+
gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(),
1083+
chunksize=10000, private_key=_get_private_key_path())
1084+
1085+
# Test the if_exists parameter with value 'append'
1086+
gbq.to_gbq(df_subset_cols,
1087+
self.destination_table + test_id, _get_project_id(),
1088+
if_exists='append', private_key=_get_private_key_path())
1089+
1090+
sleep(30) # <- Curses Google!!!
1091+
1092+
result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1093+
.format(self.destination_table + test_id),
1094+
project_id=_get_project_id(),
1095+
private_key=_get_private_key_path())
1096+
self.assertEqual(result['num_rows'][0], test_size * 2)
1097+
10741098
def test_upload_data_if_table_exists_replace(self):
10751099
test_id = "4"
10761100
test_size = 10
@@ -1255,8 +1279,68 @@ def test_verify_schema_ignores_field_mode(self):
12551279
'type': 'TIMESTAMP'}]}
12561280

12571281
self.table.create(TABLE_ID + test_id, test_schema_1)
1258-
assert self.sut.verify_schema(
1259-
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2)
1282+
self.assertTrue(self.sut.verify_schema(
1283+
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2),
1284+
'Expected schema to match')
1285+
1286+
def test_retrieve_schema(self):
1287+
# For pull request #24
1288+
test_id = "15"
1289+
test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'},
1290+
{'name': 'B', 'type': 'FLOAT'},
1291+
{'name': 'C', 'type': 'STRING'},
1292+
{'name': 'D', 'type': 'TIMESTAMP'}]}
1293+
1294+
self.table.create(TABLE_ID + test_id, test_schema)
1295+
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
1296+
expected = test_schema['fields']
1297+
assert expected == actual, 'Expected schema used to create table'
1298+
1299+
def test_schema_is_subset_passes_if_subset(self):
1300+
# For pull request #24
1301+
test_id = '16'
1302+
1303+
table_name = TABLE_ID + test_id
1304+
dataset = self.dataset_prefix + '1'
1305+
1306+
table_schema = {'fields': [{'name': 'A',
1307+
'type': 'FLOAT'},
1308+
{'name': 'B',
1309+
'type': 'FLOAT'},
1310+
{'name': 'C',
1311+
'type': 'STRING'}]}
1312+
tested_schema = {'fields': [{'name': 'A',
1313+
'type': 'FLOAT'},
1314+
{'name': 'B',
1315+
'type': 'FLOAT'}]}
1316+
1317+
self.table.create(table_name, table_schema)
1318+
1319+
assert self.sut.schema_is_subset(
1320+
dataset, table_name, tested_schema) is True
1321+
1322+
def test_schema_is_subset_fails_if_not_subset(self):
1323+
# For pull request #24
1324+
test_id = '17'
1325+
1326+
table_name = TABLE_ID + test_id
1327+
dataset = self.dataset_prefix + '1'
1328+
1329+
table_schema = {'fields': [{'name': 'A',
1330+
'type': 'FLOAT'},
1331+
{'name': 'B',
1332+
'type': 'FLOAT'},
1333+
{'name': 'C',
1334+
'type': 'STRING'}]}
1335+
tested_schema = {'fields': [{'name': 'A',
1336+
'type': 'FLOAT'},
1337+
{'name': 'C',
1338+
'type': 'FLOAT'}]}
1339+
1340+
self.table.create(table_name, table_schema)
1341+
1342+
assert self.sut.schema_is_subset(
1343+
dataset, table_name, tested_schema) is False
12601344

12611345
def test_list_dataset(self):
12621346
dataset_id = self.dataset_prefix + "1"

0 commit comments

Comments
 (0)