-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
Adding (Insert or update if key exists) option to .to_sql #14553 #29636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 52 commits
848becc
d9f7c23
f0726b3
c742f47
d2a0c2d
686930c
1f47d45
881a934
c95de1d
0082347
2d85c70
0f32cd0
dc70c40
a7b8e8e
1d936ef
9db2aaa
57a246d
c290a78
35e0fc4
6bcd6c2
00e6319
8d654ca
966a95c
b214a68
41938cd
78f8e86
025b0ef
bbcf92b
73fea73
d8b7686
75e16ff
0dfe913
779818a
3fafc95
d79b970
c38f900
0636332
a15fc2f
6c44506
e409bda
d35e145
8a57126
3c308d3
d4764dc
4396aa3
2b1c797
b23f528
899da90
8ebc256
baad9e3
79ef9c0
d0eb251
b838ef5
17a1d42
a25c2f3
f56b53f
247bec0
51a74a2
91c750b
f940b42
6dedb71
e3809a1
6d692cd
efd2382
c3a6a95
a0ce842
07bc8ca
4822ce0
0fba1b6
c230d16
9a7ef9c
2a078f6
c3c6ed1
79becdc
3b6ca76
61d998f
e454c35
b4c058c
2ed4d32
c33d536
b3bfbcc
fafd646
dbee26b
3a04edd
0b03df7
f2d3596
529e5fb
6d718dc
53e3565
1bff71f
26c0b0f
59c76ac
21a87e1
286e8b8
780fcea
70e0eb1
d4fc6d4
c783496
4f4e9d9
e466ff3
f2bc121
b9bfedc
43353f5
6196c29
b27449e
c8c1826
e28cd9e
197e172
6ebe9e8
b95f6c9
8fa8e0e
1dee409
1f9fca7
2b30a9e
a05937e
67143ff
7e1148e
11f201f
7f0b5dd
e5d5ce7
8123cd7
26faabe
9260eca
ad9f52f
a63a77a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2604,12 +2604,15 @@ def to_sql( | |
schema : str, optional | ||
Specify the schema (if database flavor supports this). If None, use | ||
default schema. | ||
if_exists : {'fail', 'replace', 'append'}, default 'fail' | ||
if_exists : {'fail', 'replace', 'append', 'upsert_overwrite', 'upsert_keep'},\ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may add ambiguity to the
Would this be better as a separate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, the documentation says:
So, the documentation should be updated at the least. However, I think it would be conceptually cleaner to separate table-existence from row collision-checks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're making a good point. I will look to incorporate this shortly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On your original proposal, you mentioned using a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brylie I think it is a good point, but I am less sure about the action required. I have actually sent a full reply here: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the "upsert" method in SQL checks on the primary key, how about name the parameter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be explicit, the parameters should be something like:
Since renaming What is blocking us from adding a new argument for |
||
default 'fail' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this api is conflating two things. isn't upsert by-definition to overwrite? what is the usecase for 'upsert_keep'? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Over the course of this PR I have come to agree that using the term There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that makes sense. |
||
How to behave if the table already exists. | ||
|
||
* fail: Raise a ValueError. | ||
* replace: Drop the table before inserting new values. | ||
* append: Insert new values to the existing table. | ||
* upsert_overwrite: Overwrite matches in database with incoming data. | ||
* upsert_keep: Keep matches in database instead of incoming data. | ||
|
||
index : bool, default True | ||
Write DataFrame index as a column. Uses `index_label` as the column | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -467,10 +467,15 @@ def to_sql( | |
schema : str, optional | ||
Name of SQL schema in database to write to (if database flavor | ||
supports this). If None, use default schema (default). | ||
if_exists : {'fail', 'replace', 'append'}, default 'fail' | ||
if_exists : {'fail', 'replace', 'append', 'upsert_overwrite', 'upsert_keep'}, | ||
default 'fail'. | ||
- fail: If table exists, do nothing. | ||
- replace: If table exists, drop it, recreate it, and insert data. | ||
- append: If table exists, insert data. Create if does not exist. | ||
- upsert_overwrite: If table exists, perform an UPSERT (based on primary keys), | ||
prioritising incoming records over duplicates already in the database. | ||
- upsert_keep: If table exists, perform an UPSERT (based on primary keys), | ||
prioritising records already in the database over incoming duplicates. | ||
index : boolean, default True | ||
Write DataFrame index as a column. | ||
index_label : str or sequence, optional | ||
|
@@ -497,7 +502,13 @@ def to_sql( | |
|
||
.. versionadded:: 0.24.0 | ||
""" | ||
if if_exists not in ("fail", "replace", "append"): | ||
if if_exists not in ( | ||
"fail", | ||
"replace", | ||
"append", | ||
"upsert_keep", | ||
"upsert_overwrite", | ||
): | ||
raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) | ||
|
||
pandas_sql = pandasSQL_builder(con, schema=schema) | ||
|
@@ -649,7 +660,7 @@ def create(self): | |
elif self.if_exists == "replace": | ||
self.pd_sql.drop_table(self.name, self.schema) | ||
self._execute_create() | ||
elif self.if_exists == "append": | ||
elif self.if_exists in {"append", "upsert_overwrite", "upsert_keep"}: | ||
pass | ||
else: | ||
raise ValueError( | ||
|
@@ -658,6 +669,104 @@ def create(self): | |
else: | ||
self._execute_create() | ||
|
||
def _upsert_overwrite_processing(self): | ||
""" | ||
Generate delete statement for rows with clashing primary key from database. | ||
|
||
`upsert_overwrite` prioritizes incoming data, over existing data in the DB. | ||
This method generates the Delete statement for duplicate rows, | ||
which is to be executed in the same transaction as the ensuing data insert. | ||
|
||
Returns | ||
---------- | ||
sqlalchemy.sql.dml.Delete | ||
Delete statement to be executed against DB | ||
""" | ||
from sqlalchemy import tuple_ | ||
|
||
# Primary key data | ||
primary_keys, primary_key_values = self._get_primary_key_data() | ||
# Generate delete statement | ||
delete_statement = self.table.delete().where( | ||
tuple_(*(self.table.c[col] for col in primary_keys)).in_(primary_key_values) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can be more clearly expressed as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @WillAyd - thanks for the feedback.
|
||
) | ||
return delete_statement | ||
|
||
def _upsert_keep_processing(self): | ||
""" | ||
Delete clashing values from a copy of the incoming dataframe. | ||
|
||
`upsert_keep` prioritizes data in DB over incoming data. | ||
This method creates a copy of the incoming dataframe, | ||
fetches matching data from DB, deletes matching data from copied frame, | ||
and returns that frame to be inserted. | ||
|
||
Returns | ||
---------- | ||
DataFrame | ||
Filtered dataframe, with values that are already in DB removed. | ||
""" | ||
from sqlalchemy import tuple_, select | ||
|
||
# Primary key data | ||
primary_keys, primary_key_values = self._get_primary_key_data() | ||
# Fetch matching pkey values from database | ||
columns_to_fetch = [self.table.c[key] for key in primary_keys] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment - can you just select without looping? |
||
select_statement = select(columns_to_fetch).where( | ||
tuple_(*columns_to_fetch).in_(primary_key_values) | ||
) | ||
pkeys_from_database = _wrap_result( | ||
data=self.pd_sql.execute(select_statement), columns=primary_keys | ||
) | ||
# Get temporary dataframe so as not to delete values from main df | ||
temp = self._get_index_formatted_dataframe() | ||
# Delete rows from dataframe where primary keys match | ||
# Method requires tuples, to account for cases where indexes do not match | ||
to_be_deleted_mask = ( | ||
temp[primary_keys] | ||
.apply(tuple, axis=1) | ||
.isin(pkeys_from_database[primary_keys].apply(tuple, axis=1)) | ||
) | ||
temp.drop(temp[to_be_deleted_mask].index, inplace=True) | ||
|
||
return temp | ||
|
||
def _get_primary_key_data(self): | ||
""" | ||
Get primary keys from database, and yield dataframe columns with same names. | ||
|
||
Upsert workflows require knowledge of what is already in the database. | ||
This method reflects the meta object and gets a list of primary keys, | ||
it then returns all columns from the incoming dataframe with names matching | ||
these keys. | ||
|
||
Returns | ||
------- | ||
primary_keys : list of str | ||
Primary key names | ||
primary_key_values : iterable | ||
DataFrame rows, for columns corresponding to `primary_key` names | ||
""" | ||
# reflect MetaData object and assign contents of db to self.table attribute | ||
self.pd_sql.meta.reflect(only=[self.name], views=True) | ||
self.table = self.pd_sql.get_table(table_name=self.name, schema=self.schema) | ||
|
||
primary_keys = [ | ||
str(primary_key.name) | ||
for primary_key in self.table.primary_key.columns.values() | ||
] | ||
|
||
# For the time being, this method is defensive and will break if | ||
# no pkeys are found. If desired this default behaviour could be | ||
# changed so that in cases where no pkeys are found, | ||
# it could default to a normal insert | ||
if len(primary_keys) == 0: | ||
raise ValueError(f"No primary keys found for table {self.name}") | ||
|
||
temp = self._get_index_formatted_dataframe() | ||
primary_key_values = zip(*[temp[key] for key in primary_keys]) | ||
return primary_keys, primary_key_values | ||
|
||
def _execute_insert(self, conn, keys, data_iter): | ||
"""Execute SQL statement inserting data | ||
|
||
|
@@ -682,21 +791,36 @@ def _execute_insert_multi(self, conn, keys, data_iter): | |
data = [dict(zip(keys, row)) for row in data_iter] | ||
conn.execute(self.table.insert(data)) | ||
|
||
def insert_data(self): | ||
def _get_index_formatted_dataframe(self): | ||
""" | ||
Format index of incoming dataframe to be aligned with a database table. | ||
|
||
Copy original dataframe, and check whether the dataframe index | ||
is to be added to the database table. | ||
If it is, reset the index so that it becomes a normal column, else return | ||
|
||
Returns | ||
------- | ||
DataFrame | ||
""" | ||
# Originally this functionality formed the first step of the insert_data method. | ||
# It will be useful to have in other places, so moved here to keep code DRY. | ||
temp = self.frame.copy() | ||
if self.index is not None: | ||
temp = self.frame.copy() | ||
temp.index.names = self.index | ||
try: | ||
temp.reset_index(inplace=True) | ||
except ValueError as err: | ||
raise ValueError("duplicate name in index/columns: {0}".format(err)) | ||
else: | ||
temp = self.frame | ||
|
||
column_names = list(map(str, temp.columns)) | ||
return temp | ||
|
||
@staticmethod | ||
def insert_data(data): | ||
column_names = list(map(str, data.columns)) | ||
ncols = len(column_names) | ||
data_list = [None] * ncols | ||
blocks = temp._data.blocks | ||
blocks = data._data.blocks | ||
|
||
for b in blocks: | ||
if b.is_datetime: | ||
|
@@ -723,7 +847,21 @@ def insert_data(self): | |
return column_names, data_list | ||
|
||
def insert(self, chunksize=None, method=None): | ||
""" | ||
Determines what data to pass to the underlying insert method. | ||
""" | ||
with self.pd_sql.run_transaction() as trans: | ||
if self.if_exists == "upsert_keep": | ||
data = self._upsert_keep_processing() | ||
self._insert(data=data, chunksize=chunksize, method=method, conn=trans) | ||
elif self.if_exists == "upsert_overwrite": | ||
delete_statement = self._upsert_overwrite_processing() | ||
trans.execute(delete_statement) | ||
self._insert(chunksize=chunksize, method=method, conn=trans) | ||
else: | ||
self._insert(chunksize=chunksize, method=method, conn=trans) | ||
|
||
def _insert(self, data=None, chunksize=None, method=None, conn=None): | ||
# set insert method | ||
if method is None: | ||
exec_insert = self._execute_insert | ||
|
@@ -734,9 +872,12 @@ def insert(self, chunksize=None, method=None): | |
else: | ||
raise ValueError("Invalid parameter `method`: {}".format(method)) | ||
|
||
keys, data_list = self.insert_data() | ||
if data is None: | ||
data = self._get_index_formatted_dataframe() | ||
|
||
keys, data_list = self.insert_data(data=data) | ||
|
||
nrows = len(self.frame) | ||
nrows = len(data) | ||
|
||
if nrows == 0: | ||
return | ||
|
@@ -748,15 +889,14 @@ def insert(self, chunksize=None, method=None): | |
|
||
chunks = int(nrows / chunksize) + 1 | ||
|
||
with self.pd_sql.run_transaction() as conn: | ||
for i in range(chunks): | ||
start_i = i * chunksize | ||
end_i = min((i + 1) * chunksize, nrows) | ||
if start_i >= end_i: | ||
break | ||
for i in range(chunks): | ||
start_i = i * chunksize | ||
end_i = min((i + 1) * chunksize, nrows) | ||
if start_i >= end_i: | ||
break | ||
|
||
chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list]) | ||
exec_insert(conn, keys, chunk_iter) | ||
chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list]) | ||
exec_insert(conn, keys, chunk_iter) | ||
|
||
def _query_iterator( | ||
self, result, chunksize, columns, coerce_float=True, parse_dates=None | ||
|
@@ -1263,10 +1403,15 @@ def to_sql( | |
frame : DataFrame | ||
name : string | ||
Name of SQL table. | ||
if_exists : {'fail', 'replace', 'append'}, default 'fail' | ||
- fail: If table exists, do nothing. | ||
- replace: If table exists, drop it, recreate it, and insert data. | ||
- append: If table exists, insert data. Create if does not exist. | ||
if_exists : {'fail', 'replace', 'append', 'upsert_overwrite', 'upsert_keep'}, | ||
default 'fail'. | ||
- fail: If table exists, do nothing. | ||
- replace: If table exRsts, drop it, recreate it, and insert data. | ||
- append: If table exists, insert data. Create if does not exist. | ||
- upsert_overwrite: If table exists, perform an UPSERT (based on primary keys), | ||
prioritising incoming records over duplicates already in the database. | ||
- upsert_keep: If table exists, perform an UPSERT (based on primary keys), | ||
prioritising records already in the database over incoming duplicates. | ||
index : boolean, default True | ||
Write DataFrame index as a column. | ||
index_label : string or sequence, default None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't change unrelated things