Skip to content

ENH: Add table prefixes to to_sql method #60409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
03c8183
Implement prefixes parameter.
Diadochokinetic Oct 29, 2024
f09fd54
[WIP] implent special cases for temporary tables.
Diadochokinetic Oct 29, 2024
03b8642
Specify Exception in _exists_temporary.
Diadochokinetic Nov 17, 2024
ccb9eac
remove print
Diadochokinetic Nov 17, 2024
d792f27
Finalize working mysql implementation.
Diadochokinetic Nov 17, 2024
2582834
[WIP] Add rollback for postgres.
Diadochokinetic Nov 20, 2024
b138532
Add support for sqlite.
Diadochokinetic Nov 21, 2024
f696257
Add connectables with default pool and add test for if_exists=append.
Diadochokinetic Nov 24, 2024
33b69d9
Merge branch 'pandas-dev:main' into table_prefixes
Diadochokinetic Nov 24, 2024
0bc6504
Fix typo in prefixes docstring.
Diadochokinetic Nov 25, 2024
8a94c2b
Undo experimental import changes.
Diadochokinetic Nov 25, 2024
ec32a70
Merge branch 'table_prefixes' of https://github.com/Diadochokinetic/p…
Diadochokinetic Nov 25, 2024
be305ff
Add some documentation.
Diadochokinetic Nov 25, 2024
35a6394
Fix typo in NDFrame.to_sql docstring.
Diadochokinetic Nov 26, 2024
145d18c
Add prefixes parameter in to_sql suberclass.
Diadochokinetic Nov 26, 2024
eddf687
Add prefixes parameter to ADBC subclass method to_sql.
Diadochokinetic Nov 26, 2024
3190142
Disable case sensitivity check for temporary tables.
Diadochokinetic Nov 26, 2024
154c208
Merge remote-tracking branch 'upstream/main' into table_prefixes
Diadochokinetic Nov 26, 2024
98a153f
Merge branch 'main' into table_prefixes
Diadochokinetic Nov 26, 2024
8a0611d
[WIP] Add support for adbc driver.
Diadochokinetic Nov 27, 2024
eb6e9f0
Merge branch 'table_prefixes' of https://github.com/Diadochokinetic/p…
Diadochokinetic Nov 27, 2024
522f842
Fix mypy unsupported operand types error.
Diadochokinetic Nov 27, 2024
ca0551d
Merge branch 'main' into table_prefixes
Diadochokinetic Dec 1, 2024
b595021
Merge branch 'main' into table_prefixes
Diadochokinetic Dec 2, 2024
3c8a12e
Merge branch 'main' into table_prefixes
Diadochokinetic Dec 4, 2024
1ad55bb
Merge branch 'main' into table_prefixes
Diadochokinetic Dec 22, 2024
25ecbc3
Merge branch 'main' into table_prefixes
Diadochokinetic Jan 22, 2025
775a05a
Merge branch 'main' into table_prefixes
Diadochokinetic Jan 28, 2025
efad6b4
Replace prefixes parameter with temporary.
Diadochokinetic Jan 28, 2025
0543666
Merge branch 'main' into table_prefixes
Diadochokinetic Jan 29, 2025
90d1f1f
Merge branch 'main' into table_prefixes
Diadochokinetic Feb 5, 2025
4da49dd
Add whatsnew entry.
Diadochokinetic Feb 5, 2025
b0db943
Add issue number to whatsnew
Diadochokinetic Feb 5, 2025
c52c71a
Merge branch 'main' into table_prefixes
Diadochokinetic Feb 6, 2025
cd2fff4
Merge branch 'main' into table_prefixes
Diadochokinetic Feb 6, 2025
d1233ce
Merge branch 'main' into table_prefixes
Diadochokinetic Feb 9, 2025
cd51d7c
Change error handling of _exists_temporary to a higher abstraction le…
Diadochokinetic Feb 15, 2025
5d306c2
Use nested try except to handle rollbacks.
Diadochokinetic Feb 15, 2025
8cafc44
Nested try except blocks don't work with adbc.
Diadochokinetic Feb 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/source/whatsnew/v2.3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Other enhancements
- :meth:`~Series.to_hdf` and :meth:`~DataFrame.to_hdf` now round-trip with ``StringDtype`` (:issue:`60663`)
- The :meth:`~Series.cumsum`, :meth:`~Series.cummin`, and :meth:`~Series.cummax` reductions are now implemented for ``StringDtype`` columns when backed by PyArrow (:issue:`60633`)
- The :meth:`~Series.sum` reduction is now implemented for ``StringDtype`` columns (:issue:`59853`)
- The ``to_sql`` method now supports a new parameter ``temporary: bool = False``.
Setting this parameter to ``True`` enables creating, appending or replacing temporary
tables via ``to_sql`` using ``sqlalchemy``, ``adbc`` and ``sqlite`` connectors.
(:issue:`60422`)

.. ---------------------------------------------------------------------------
.. _whatsnew_230.notable_bug_fixes:
Expand Down
4 changes: 4 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
temporary: bool = False,
) -> int | None:
"""
Write records stored in a DataFrame to a SQL database.
Expand Down Expand Up @@ -2844,6 +2845,8 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
temporary : bool, default False
Indicates if the created, replaced or appended table is temporary.

Returns
-------
Expand Down Expand Up @@ -3017,6 +3020,7 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
temporary=temporary,
)

@final
Expand Down
116 changes: 109 additions & 7 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
temporary: bool = False,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -791,6 +792,8 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
temporary : bool, default False
Indicates if the created, replaced or appended table is temporary.
engine : {'auto', 'sqlalchemy'}, default 'auto'
SQL engine library to use. If 'auto', then the option
``io.sql.engine`` is used. The default ``io.sql.engine``
Expand Down Expand Up @@ -839,6 +842,7 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
temporary=temporary,
engine=engine,
**engine_kwargs,
)
Expand Down Expand Up @@ -932,6 +936,7 @@ def __init__(
schema=None,
keys=None,
dtype: DtypeArg | None = None,
temporary: bool = False,
) -> None:
self.name = name
self.pd_sql = pandas_sql_engine
Expand All @@ -942,6 +947,7 @@ def __init__(
self.if_exists = if_exists
self.keys = keys
self.dtype = dtype
self.temporary = temporary

if frame is not None:
# We want to initialize based on a dataframe
Expand All @@ -956,8 +962,41 @@ def __init__(
if not len(self.name):
raise ValueError("Empty table name specified")

def _drop_temporary_table(self):
"""Drop a temporary table. Temporary tables are not in a database's meta data
and need to be dropped hard coded."""
if self.schema is None:
query = f"DROP TABLE {self.name}"
else:
query = f"DROP TABLE {self.schema}.{self.name}"
self.pd_sql.execute(query)

def _exists_temporary(self):
"""Check if a temporary table exists. Temporary tables are not in a database's
meta data. The existence is duck tested by a SELECT statement."""
from sqlalchemy import text
from sqlalchemy.exc import DatabaseError

if self.schema is None:
query = f"SELECT * FROM {self.name} LIMIT 1"
else:
query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1"
try:
_ = self.pd_sql.con.execute(text(query))
return True
except DatabaseError:
# Some DBMS (e.g. postgres) require a rollback after a caught exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DMBS-specific features are something we want to avoid in pandas, as maintaining compatability on those is not a core specialty of our team. Does sqlalchemy not handle this natively?

try:
self.pd_sql.con.execute(text("rollback"))
return False
except DatabaseError:
return False

def exists(self):
return self.pd_sql.has_table(self.name, self.schema)
if self.temporary:
return self._exists_temporary()
else:
return self.pd_sql.has_table(self.name, self.schema)

def sql_schema(self) -> str:
from sqlalchemy.schema import CreateTable
Expand All @@ -966,7 +1005,9 @@ def sql_schema(self) -> str:

def _execute_create(self) -> None:
# Inserting table into database, add to MetaData object
self.table = self.table.to_metadata(self.pd_sql.meta)
if not self.temporary:
# only insert into meta data, if table is not temporary
self.table = self.table.to_metadata(self.pd_sql.meta)
with self.pd_sql.run_transaction():
self.table.create(bind=self.pd_sql.con)

Expand All @@ -975,7 +1016,10 @@ def create(self) -> None:
if self.if_exists == "fail":
raise ValueError(f"Table '{self.name}' already exists.")
if self.if_exists == "replace":
self.pd_sql.drop_table(self.name, self.schema)
if self.temporary:
self._drop_temporary_table()
else:
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == "append":
pass
Expand Down Expand Up @@ -1266,10 +1310,16 @@ def _create_table_setup(self):

schema = self.schema or self.pd_sql.meta.schema

# check if table is temporary
if self.temporary:
prefixes = ["TEMPORARY"]
else:
prefixes = None

# At this point, attach to new metadata, only attach to self.meta
# once table is created.
meta = MetaData()
return Table(self.name, meta, *columns, schema=schema)
return Table(self.name, meta, *columns, schema=schema, prefixes=prefixes)

def _harmonize_columns(
self,
Expand Down Expand Up @@ -1487,6 +1537,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
temporary: bool = False,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -1871,6 +1922,7 @@ def prep_table(
index_label=None,
schema=None,
dtype: DtypeArg | None = None,
temporary: bool = False,
) -> SQLTable:
"""
Prepares table in the database for data insertion. Creates it if needed, etc.
Expand Down Expand Up @@ -1906,6 +1958,7 @@ def prep_table(
index_label=index_label,
schema=schema,
dtype=dtype,
temporary=temporary,
)
table.create()
return table
Expand Down Expand Up @@ -1950,6 +2003,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
temporary: bool = False,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -1991,6 +2045,8 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
temporary : bool, default False
Indicates if the created, replaced or appended table is temporary.
engine : {'auto', 'sqlalchemy'}, default 'auto'
SQL engine library to use. If 'auto', then the option
``io.sql.engine`` is used. The default ``io.sql.engine``
Expand All @@ -2011,6 +2067,7 @@ def to_sql(
index_label=index_label,
schema=schema,
dtype=dtype,
temporary=temporary,
)

total_inserted = sql_engine.insert_records(
Expand All @@ -2025,7 +2082,9 @@ def to_sql(
**engine_kwargs,
)

self.check_case_sensitive(name=name, schema=schema)
# only check case sensitivity for non temporary tables
if not table.temporary:
self.check_case_sensitive(name=name, schema=schema)
return total_inserted

@property
Expand Down Expand Up @@ -2303,6 +2362,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
temporary: bool = False,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2332,6 +2392,8 @@ def to_sql(
Raises NotImplementedError
method : {None', 'multi', callable}, default None
Raises NotImplementedError
temporary : bool, default False
Indicates if the created, replaced or appended table is temporary.
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""
Expand Down Expand Up @@ -2360,7 +2422,14 @@ def to_sql(
# as applicable modes, so the semantics get blurred across
# the libraries
mode = "create"
if self.has_table(name, schema):

# for temporary tables use duck testing for existence check
if temporary:
exists = self._has_table_temporary(name, schema)
else:
exists = self.has_table(name, schema)

if exists:
if if_exists == "fail":
raise ValueError(f"Table '{table_name}' already exists.")
elif if_exists == "replace":
Expand All @@ -2378,12 +2447,41 @@ def to_sql(

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
table_name=name,
data=tbl,
mode=mode,
db_schema_name=schema,
temporary=temporary,
)

self.con.commit()
return total_inserted

def _has_table_temporary(self, name: str, schema: str | None = None) -> bool:
"""Check if a temporary table exists. Temporary tables are not in a database's
meta data. The existence is duck tested by a SELECT statement."""
from adbc_driver_manager import ProgrammingError

# sqlite doesn't allow a rollback at this point
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment as before - we really want to avoid putting DBMS-specific logic into our implementation

rollback = (
True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False
)

if schema is None:
query = f"SELECT * FROM {name} LIMIT 1"
else:
query = f"SELECT * FROM {schema}.{name} LIMIT 1"
try:
with self.con.cursor() as cur:
cur.execute(query)
return True
except ProgrammingError:
if rollback:
# Some DBMS (e.g. postgres) require a rollback after a caught exception
with self.con.cursor() as cur:
cur.execute("rollback")
return False

def has_table(self, name: str, schema: str | None = None) -> bool:
meta = self.con.adbc_get_objects(
db_schema_filter=schema, table_name_filter=name
Expand Down Expand Up @@ -2758,6 +2856,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
temporary: bool = False,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2798,6 +2897,8 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
temporary : bool, default False
Indicates if the created, replaced or appended table is temporary.
"""
if dtype:
if not is_dict_like(dtype):
Expand All @@ -2823,6 +2924,7 @@ def to_sql(
if_exists=if_exists,
index_label=index_label,
dtype=dtype,
temporary=temporary,
)
table.create()
return table.insert(chunksize, method)
Expand Down
Loading