-
-
Notifications
You must be signed in to change notification settings - Fork 18.6k
ENH: Add table prefixes to to_sql method #60409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
03c8183
Implement prefixes parameter.
Diadochokinetic f09fd54
[WIP] implent special cases for temporary tables.
Diadochokinetic 03b8642
Specify Exception in _exists_temporary.
Diadochokinetic ccb9eac
remove print
Diadochokinetic d792f27
Finalize working mysql implementation.
Diadochokinetic 2582834
[WIP] Add rollback for postgres.
Diadochokinetic b138532
Add support for sqlite.
Diadochokinetic f696257
Add connectables with default pool and add test for if_exists=append.
Diadochokinetic 33b69d9
Merge branch 'pandas-dev:main' into table_prefixes
Diadochokinetic 0bc6504
Fix typo in prefixes docstring.
Diadochokinetic 8a94c2b
Undo experimental import changes.
Diadochokinetic ec32a70
Merge branch 'table_prefixes' of https://github.com/Diadochokinetic/p…
Diadochokinetic be305ff
Add some documentation.
Diadochokinetic 35a6394
Fix typo in NDFrame.to_sql docstring.
Diadochokinetic 145d18c
Add prefixes parameter in to_sql suberclass.
Diadochokinetic eddf687
Add prefixes parameter to ADBC subclass method to_sql.
Diadochokinetic 3190142
Disable case sensitivity check for temporary tables.
Diadochokinetic 154c208
Merge remote-tracking branch 'upstream/main' into table_prefixes
Diadochokinetic 98a153f
Merge branch 'main' into table_prefixes
Diadochokinetic 8a0611d
[WIP] Add support for adbc driver.
Diadochokinetic eb6e9f0
Merge branch 'table_prefixes' of https://github.com/Diadochokinetic/p…
Diadochokinetic 522f842
Fix mypy unsupported operand types error.
Diadochokinetic ca0551d
Merge branch 'main' into table_prefixes
Diadochokinetic b595021
Merge branch 'main' into table_prefixes
Diadochokinetic 3c8a12e
Merge branch 'main' into table_prefixes
Diadochokinetic 1ad55bb
Merge branch 'main' into table_prefixes
Diadochokinetic 25ecbc3
Merge branch 'main' into table_prefixes
Diadochokinetic 775a05a
Merge branch 'main' into table_prefixes
Diadochokinetic efad6b4
Replace prefixes parameter with temporary.
Diadochokinetic 0543666
Merge branch 'main' into table_prefixes
Diadochokinetic 90d1f1f
Merge branch 'main' into table_prefixes
Diadochokinetic 4da49dd
Add whatsnew entry.
Diadochokinetic b0db943
Add issue number to whatsnew
Diadochokinetic c52c71a
Merge branch 'main' into table_prefixes
Diadochokinetic cd2fff4
Merge branch 'main' into table_prefixes
Diadochokinetic d1233ce
Merge branch 'main' into table_prefixes
Diadochokinetic cd51d7c
Change error handling of _exists_temporary to a higher abstraction le…
Diadochokinetic 5d306c2
Use nested try except to handle rollbacks.
Diadochokinetic 8cafc44
Nested try except blocks don't work with adbc.
Diadochokinetic File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -744,6 +744,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -791,6 +792,8 @@ def to_sql( | |
|
||
Details and a sample callable implementation can be found in the | ||
section :ref:`insert method <io.sql.method>`. | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
engine : {'auto', 'sqlalchemy'}, default 'auto' | ||
SQL engine library to use. If 'auto', then the option | ||
``io.sql.engine`` is used. The default ``io.sql.engine`` | ||
|
@@ -839,6 +842,7 @@ def to_sql( | |
chunksize=chunksize, | ||
dtype=dtype, | ||
method=method, | ||
temporary=temporary, | ||
engine=engine, | ||
**engine_kwargs, | ||
) | ||
|
@@ -932,6 +936,7 @@ def __init__( | |
schema=None, | ||
keys=None, | ||
dtype: DtypeArg | None = None, | ||
temporary: bool = False, | ||
) -> None: | ||
self.name = name | ||
self.pd_sql = pandas_sql_engine | ||
|
@@ -942,6 +947,7 @@ def __init__( | |
self.if_exists = if_exists | ||
self.keys = keys | ||
self.dtype = dtype | ||
self.temporary = temporary | ||
|
||
if frame is not None: | ||
# We want to initialize based on a dataframe | ||
|
@@ -956,8 +962,41 @@ def __init__( | |
if not len(self.name): | ||
raise ValueError("Empty table name specified") | ||
|
||
def _drop_temporary_table(self): | ||
"""Drop a temporary table. Temporary tables are not in a database's meta data | ||
and need to be dropped hard coded.""" | ||
if self.schema is None: | ||
query = f"DROP TABLE {self.name}" | ||
else: | ||
query = f"DROP TABLE {self.schema}.{self.name}" | ||
self.pd_sql.execute(query) | ||
|
||
def _exists_temporary(self): | ||
"""Check if a temporary table exists. Temporary tables are not in a database's | ||
meta data. The existence is duck tested by a SELECT statement.""" | ||
from sqlalchemy import text | ||
from sqlalchemy.exc import DatabaseError | ||
|
||
if self.schema is None: | ||
query = f"SELECT * FROM {self.name} LIMIT 1" | ||
else: | ||
query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1" | ||
try: | ||
_ = self.pd_sql.con.execute(text(query)) | ||
return True | ||
except DatabaseError: | ||
# Some DBMS (e.g. postgres) require a rollback after a caught exception | ||
try: | ||
self.pd_sql.con.execute(text("rollback")) | ||
return False | ||
except DatabaseError: | ||
return False | ||
|
||
def exists(self): | ||
return self.pd_sql.has_table(self.name, self.schema) | ||
if self.temporary: | ||
return self._exists_temporary() | ||
else: | ||
return self.pd_sql.has_table(self.name, self.schema) | ||
|
||
def sql_schema(self) -> str: | ||
from sqlalchemy.schema import CreateTable | ||
|
@@ -966,7 +1005,9 @@ def sql_schema(self) -> str: | |
|
||
def _execute_create(self) -> None: | ||
# Inserting table into database, add to MetaData object | ||
self.table = self.table.to_metadata(self.pd_sql.meta) | ||
if not self.temporary: | ||
# only insert into meta data, if table is not temporary | ||
self.table = self.table.to_metadata(self.pd_sql.meta) | ||
with self.pd_sql.run_transaction(): | ||
self.table.create(bind=self.pd_sql.con) | ||
|
||
|
@@ -975,7 +1016,10 @@ def create(self) -> None: | |
if self.if_exists == "fail": | ||
raise ValueError(f"Table '{self.name}' already exists.") | ||
if self.if_exists == "replace": | ||
self.pd_sql.drop_table(self.name, self.schema) | ||
if self.temporary: | ||
self._drop_temporary_table() | ||
else: | ||
self.pd_sql.drop_table(self.name, self.schema) | ||
self._execute_create() | ||
elif self.if_exists == "append": | ||
pass | ||
|
@@ -1266,10 +1310,16 @@ def _create_table_setup(self): | |
|
||
schema = self.schema or self.pd_sql.meta.schema | ||
|
||
# check if table is temporary | ||
if self.temporary: | ||
prefixes = ["TEMPORARY"] | ||
else: | ||
prefixes = None | ||
|
||
# At this point, attach to new metadata, only attach to self.meta | ||
# once table is created. | ||
meta = MetaData() | ||
return Table(self.name, meta, *columns, schema=schema) | ||
return Table(self.name, meta, *columns, schema=schema, prefixes=prefixes) | ||
|
||
def _harmonize_columns( | ||
self, | ||
|
@@ -1487,6 +1537,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -1871,6 +1922,7 @@ def prep_table( | |
index_label=None, | ||
schema=None, | ||
dtype: DtypeArg | None = None, | ||
temporary: bool = False, | ||
) -> SQLTable: | ||
""" | ||
Prepares table in the database for data insertion. Creates it if needed, etc. | ||
|
@@ -1906,6 +1958,7 @@ def prep_table( | |
index_label=index_label, | ||
schema=schema, | ||
dtype=dtype, | ||
temporary=temporary, | ||
) | ||
table.create() | ||
return table | ||
|
@@ -1950,6 +2003,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -1991,6 +2045,8 @@ def to_sql( | |
|
||
Details and a sample callable implementation can be found in the | ||
section :ref:`insert method <io.sql.method>`. | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
engine : {'auto', 'sqlalchemy'}, default 'auto' | ||
SQL engine library to use. If 'auto', then the option | ||
``io.sql.engine`` is used. The default ``io.sql.engine`` | ||
|
@@ -2011,6 +2067,7 @@ def to_sql( | |
index_label=index_label, | ||
schema=schema, | ||
dtype=dtype, | ||
temporary=temporary, | ||
) | ||
|
||
total_inserted = sql_engine.insert_records( | ||
|
@@ -2025,7 +2082,9 @@ def to_sql( | |
**engine_kwargs, | ||
) | ||
|
||
self.check_case_sensitive(name=name, schema=schema) | ||
# only check case sensitivity for non temporary tables | ||
if not table.temporary: | ||
self.check_case_sensitive(name=name, schema=schema) | ||
return total_inserted | ||
|
||
@property | ||
|
@@ -2303,6 +2362,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -2332,6 +2392,8 @@ def to_sql( | |
Raises NotImplementedError | ||
method : {None', 'multi', callable}, default None | ||
Raises NotImplementedError | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
engine : {'auto', 'sqlalchemy'}, default 'auto' | ||
Raises NotImplementedError if not set to 'auto' | ||
""" | ||
|
@@ -2360,7 +2422,14 @@ def to_sql( | |
# as applicable modes, so the semantics get blurred across | ||
# the libraries | ||
mode = "create" | ||
if self.has_table(name, schema): | ||
|
||
# for temporary tables use duck testing for existence check | ||
if temporary: | ||
exists = self._has_table_temporary(name, schema) | ||
else: | ||
exists = self.has_table(name, schema) | ||
|
||
if exists: | ||
if if_exists == "fail": | ||
raise ValueError(f"Table '{table_name}' already exists.") | ||
elif if_exists == "replace": | ||
|
@@ -2378,12 +2447,41 @@ def to_sql( | |
|
||
with self.con.cursor() as cur: | ||
total_inserted = cur.adbc_ingest( | ||
table_name=name, data=tbl, mode=mode, db_schema_name=schema | ||
table_name=name, | ||
data=tbl, | ||
mode=mode, | ||
db_schema_name=schema, | ||
temporary=temporary, | ||
) | ||
|
||
self.con.commit() | ||
return total_inserted | ||
|
||
def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: | ||
"""Check if a temporary table exists. Temporary tables are not in a database's | ||
meta data. The existence is duck tested by a SELECT statement.""" | ||
from adbc_driver_manager import ProgrammingError | ||
|
||
# sqlite doesn't allow a rollback at this point | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment as before - we really want to avoid putting DBMS-specific logic into our implementation |
||
rollback = ( | ||
True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False | ||
) | ||
|
||
if schema is None: | ||
query = f"SELECT * FROM {name} LIMIT 1" | ||
else: | ||
query = f"SELECT * FROM {schema}.{name} LIMIT 1" | ||
try: | ||
with self.con.cursor() as cur: | ||
cur.execute(query) | ||
return True | ||
except ProgrammingError: | ||
if rollback: | ||
# Some DBMS (e.g. postgres) require a rollback after a caught exception | ||
with self.con.cursor() as cur: | ||
cur.execute("rollback") | ||
return False | ||
|
||
def has_table(self, name: str, schema: str | None = None) -> bool: | ||
meta = self.con.adbc_get_objects( | ||
db_schema_filter=schema, table_name_filter=name | ||
|
@@ -2758,6 +2856,7 @@ def to_sql( | |
chunksize: int | None = None, | ||
dtype: DtypeArg | None = None, | ||
method: Literal["multi"] | Callable | None = None, | ||
temporary: bool = False, | ||
engine: str = "auto", | ||
**engine_kwargs, | ||
) -> int | None: | ||
|
@@ -2798,6 +2897,8 @@ def to_sql( | |
|
||
Details and a sample callable implementation can be found in the | ||
section :ref:`insert method <io.sql.method>`. | ||
temporary : bool, default False | ||
Indicates if the created, replaced or appended table is temporary. | ||
""" | ||
if dtype: | ||
if not is_dict_like(dtype): | ||
|
@@ -2823,6 +2924,7 @@ def to_sql( | |
if_exists=if_exists, | ||
index_label=index_label, | ||
dtype=dtype, | ||
temporary=temporary, | ||
) | ||
table.create() | ||
return table.insert(chunksize, method) | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DMBS-specific features are something we want to avoid in pandas, as maintaining compatability on those is not a core specialty of our team. Does sqlalchemy not handle this natively?