diff --git a/doc/source/user_guide/options.rst b/doc/source/user_guide/options.rst index 1fcaac1a91d09..278eb907102ed 100644 --- a/doc/source/user_guide/options.rst +++ b/doc/source/user_guide/options.rst @@ -456,6 +456,10 @@ io.hdf.dropna_table True drop ALL nan rows when appe io.parquet.engine None The engine to use as a default for parquet reading and writing. If None then try 'pyarrow' and 'fastparquet' +io.sql.engine None The engine to use as a default for + sql reading and writing, with SQLAlchemy + as a higher level interface. If None + then try 'sqlalchemy' mode.chained_assignment warn Controls ``SettingWithCopyWarning``: 'raise', 'warn', or None. Raise an exception, warn, or no action if diff --git a/pandas/core/config_init.py b/pandas/core/config_init.py index fd49ac0176ce4..baac872a6a466 100644 --- a/pandas/core/config_init.py +++ b/pandas/core/config_init.py @@ -652,6 +652,22 @@ def use_inf_as_na_cb(key): validator=is_one_of_factory(["auto", "pyarrow", "fastparquet"]), ) + +# Set up the io.sql specific configuration. +sql_engine_doc = """ +: string + The default sql reader/writer engine. Available options: + 'auto', 'sqlalchemy', the default is 'auto' +""" + +with cf.config_prefix("io.sql"): + cf.register_option( + "engine", + "auto", + sql_engine_doc, + validator=is_one_of_factory(["auto", "sqlalchemy"]), + ) + # -------- # Plotting # --------- diff --git a/pandas/io/sql.py b/pandas/io/sql.py index d797fa51984d6..04a7ccb538a67 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -27,6 +27,8 @@ import pandas._libs.lib as lib from pandas._typing import DtypeArg +from pandas.compat._optional import import_optional_dependency +from pandas.errors import AbstractMethodError from pandas.core.dtypes.common import ( is_datetime64tz_dtype, @@ -36,6 +38,7 @@ from pandas.core.dtypes.dtypes import DatetimeTZDtype from pandas.core.dtypes.missing import isna +from pandas import get_option from pandas.core.api import ( DataFrame, Series, @@ -643,6 +646,8 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: str | None = None, + engine: str = "auto", + **engine_kwargs, ) -> None: """ Write records stored in a DataFrame to a SQL database. @@ -689,6 +694,16 @@ def to_sql( section :ref:`insert method `. .. versionadded:: 0.24.0 + + engine : {'auto', 'sqlalchemy'}, default 'auto' + SQL engine library to use. If 'auto', then the option + ``io.sql.engine`` is used. The default ``io.sql.engine`` + behavior is 'sqlalchemy' + + .. versionadded:: 1.3.0 + + **engine_kwargs + Any additional kwargs are passed to the engine. """ if if_exists not in ("fail", "replace", "append"): raise ValueError(f"'{if_exists}' is not valid for if_exists") @@ -712,6 +727,8 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, + engine=engine, + **engine_kwargs, ) @@ -1283,6 +1300,91 @@ def to_sql( ) +class BaseEngine: + def insert_records( + self, + table: SQLTable, + con, + frame, + name, + index=True, + schema=None, + chunksize=None, + method=None, + **engine_kwargs, + ): + """ + Inserts data into already-prepared table + """ + raise AbstractMethodError(self) + + +class SQLAlchemyEngine(BaseEngine): + def __init__(self): + import_optional_dependency( + "sqlalchemy", extra="sqlalchemy is required for SQL support." + ) + + def insert_records( + self, + table: SQLTable, + con, + frame, + name, + index=True, + schema=None, + chunksize=None, + method=None, + **engine_kwargs, + ): + from sqlalchemy import exc + + try: + table.insert(chunksize=chunksize, method=method) + except exc.SQLAlchemyError as err: + # GH34431 + # https://stackoverflow.com/a/67358288/6067848 + msg = r"""(\(1054, "Unknown column 'inf(e0)?' in 'field list'"\))(?# + )|inf can not be used with MySQL""" + err_text = str(err.orig) + if re.search(msg, err_text): + raise ValueError("inf cannot be used with MySQL") from err + else: + raise err + + +def get_engine(engine: str) -> BaseEngine: + """ return our implementation """ + if engine == "auto": + engine = get_option("io.sql.engine") + + if engine == "auto": + # try engines in this order + engine_classes = [SQLAlchemyEngine] + + error_msgs = "" + for engine_class in engine_classes: + try: + return engine_class() + except ImportError as err: + error_msgs += "\n - " + str(err) + + raise ImportError( + "Unable to find a usable engine; " + "tried using: 'sqlalchemy'.\n" + "A suitable version of " + "sqlalchemy is required for sql I/O " + "support.\n" + "Trying to import the above resulted in these errors:" + f"{error_msgs}" + ) + + elif engine == "sqlalchemy": + return SQLAlchemyEngine() + + raise ValueError("engine must be one of 'auto', 'sqlalchemy'") + + class SQLDatabase(PandasSQL): """ This class enables conversion between DataFrame and SQL databases @@ -1504,7 +1606,7 @@ def read_query( read_sql = read_query - def to_sql( + def prep_table( self, frame, name, @@ -1512,50 +1614,10 @@ def to_sql( index=True, index_label=None, schema=None, - chunksize=None, dtype: DtypeArg | None = None, - method=None, - ): + ) -> SQLTable: """ - Write records stored in a DataFrame to a SQL database. - - Parameters - ---------- - frame : DataFrame - name : string - Name of SQL table. - if_exists : {'fail', 'replace', 'append'}, default 'fail' - - fail: If table exists, do nothing. - - replace: If table exists, drop it, recreate it, and insert data. - - append: If table exists, insert data. Create if does not exist. - index : bool, default True - Write DataFrame index as a column. - index_label : string or sequence, default None - Column label for index column(s). If None is given (default) and - `index` is True, then the index names are used. - A sequence should be given if the DataFrame uses MultiIndex. - schema : string, default None - Name of SQL schema in database to write to (if database flavor - supports this). If specified, this overwrites the default - schema of the SQLDatabase object. - chunksize : int, default None - If not None, then rows will be written in batches of this size at a - time. If None, all rows will be written at once. - dtype : single type or dict of column name to SQL type, default None - Optional specifying the datatype for columns. The SQL type should - be a SQLAlchemy type. If all columns are of the same type, one - single value can be used. - method : {None', 'multi', callable}, default None - Controls the SQL insertion clause used: - - * None : Uses standard SQL ``INSERT`` clause (one per row). - * 'multi': Pass multiple values in a single ``INSERT`` clause. - * callable with signature ``(pd_table, conn, keys, data_iter)``. - - Details and a sample callable implementation can be found in the - section :ref:`insert method `. - - .. versionadded:: 0.24.0 + Prepares table in the database for data insertion. Creates it if needed, etc. """ if dtype: if not is_dict_like(dtype): @@ -1589,15 +1651,17 @@ def to_sql( dtype=dtype, ) table.create() + return table - from sqlalchemy.exc import SQLAlchemyError - - try: - table.insert(chunksize, method=method) - except SQLAlchemyError as err: - # GH 34431 36465 - raise ValueError("inf cannot be used with MySQL") from err - + def check_case_sensitive( + self, + name, + schema, + ): + """ + Checks table name for issues with case-sensitivity. + Method is called after data is inserted. + """ if not name.isdigit() and not name.islower(): # check for potentially case sensitivity issues (GH7815) # Only check when name is not a number and name is not lower case @@ -1623,6 +1687,97 @@ def to_sql( ) warnings.warn(msg, UserWarning) + def to_sql( + self, + frame, + name, + if_exists="fail", + index=True, + index_label=None, + schema=None, + chunksize=None, + dtype: DtypeArg | None = None, + method=None, + engine="auto", + **engine_kwargs, + ): + """ + Write records stored in a DataFrame to a SQL database. + + Parameters + ---------- + frame : DataFrame + name : string + Name of SQL table. + if_exists : {'fail', 'replace', 'append'}, default 'fail' + - fail: If table exists, do nothing. + - replace: If table exists, drop it, recreate it, and insert data. + - append: If table exists, insert data. Create if does not exist. + index : boolean, default True + Write DataFrame index as a column. + index_label : string or sequence, default None + Column label for index column(s). If None is given (default) and + `index` is True, then the index names are used. + A sequence should be given if the DataFrame uses MultiIndex. + schema : string, default None + Name of SQL schema in database to write to (if database flavor + supports this). If specified, this overwrites the default + schema of the SQLDatabase object. + chunksize : int, default None + If not None, then rows will be written in batches of this size at a + time. If None, all rows will be written at once. + dtype : single type or dict of column name to SQL type, default None + Optional specifying the datatype for columns. The SQL type should + be a SQLAlchemy type. If all columns are of the same type, one + single value can be used. + method : {None', 'multi', callable}, default None + Controls the SQL insertion clause used: + + * None : Uses standard SQL ``INSERT`` clause (one per row). + * 'multi': Pass multiple values in a single ``INSERT`` clause. + * callable with signature ``(pd_table, conn, keys, data_iter)``. + + Details and a sample callable implementation can be found in the + section :ref:`insert method `. + + .. versionadded:: 0.24.0 + + engine : {'auto', 'sqlalchemy'}, default 'auto' + SQL engine library to use. If 'auto', then the option + ``io.sql.engine`` is used. The default ``io.sql.engine`` + behavior is 'sqlalchemy' + + .. versionadded:: 1.3.0 + + **engine_kwargs + Any additional kwargs are passed to the engine. + """ + sql_engine = get_engine(engine) + + table = self.prep_table( + frame=frame, + name=name, + if_exists=if_exists, + index=index, + index_label=index_label, + schema=schema, + dtype=dtype, + ) + + sql_engine.insert_records( + table=table, + con=self.connectable, + frame=frame, + name=name, + index=index, + schema=schema, + chunksize=chunksize, + method=method, + **engine_kwargs, + ) + + self.check_case_sensitive(name=name, schema=schema) + @property def tables(self): return self.meta.tables @@ -2008,6 +2163,7 @@ def to_sql( chunksize=None, dtype: DtypeArg | None = None, method=None, + **kwargs, ): """ Write records stored in a DataFrame to a SQL database. diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 04ddef57a9621..290e063a59be7 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -52,7 +52,9 @@ import pandas.io.sql as sql from pandas.io.sql import ( + SQLAlchemyEngine, _gt14, + get_engine, read_sql_query, read_sql_table, ) @@ -575,6 +577,23 @@ def sample(pd_table, conn, keys, data_iter): # Nuke table self.drop_table("test_frame1") + def _to_sql_with_sql_engine(self, engine="auto", **engine_kwargs): + """`to_sql` with the `engine` param""" + # mostly copied from this class's `_to_sql()` method + self.drop_table("test_frame1") + + self.pandasSQL.to_sql( + self.test_frame1, "test_frame1", engine=engine, **engine_kwargs + ) + assert self.pandasSQL.has_table("test_frame1") + + num_entries = len(self.test_frame1) + num_rows = self._count_rows("test_frame1") + assert num_rows == num_entries + + # Nuke table + self.drop_table("test_frame1") + def _roundtrip(self): self.drop_table("test_frame_roundtrip") self.pandasSQL.to_sql(self.test_frame1, "test_frame_roundtrip") @@ -2053,6 +2072,41 @@ class Temporary(Base): tm.assert_frame_equal(df, expected) + # -- SQL Engine tests (in the base class for now) + def test_invalid_engine(self): + msg = "engine must be one of 'auto', 'sqlalchemy'" + with pytest.raises(ValueError, match=msg): + self._to_sql_with_sql_engine("bad_engine") + + def test_options_sqlalchemy(self): + # use the set option + + with pd.option_context("io.sql.engine", "sqlalchemy"): + self._to_sql_with_sql_engine() + + def test_options_auto(self): + # use the set option + + with pd.option_context("io.sql.engine", "auto"): + self._to_sql_with_sql_engine() + + def test_options_get_engine(self): + assert isinstance(get_engine("sqlalchemy"), SQLAlchemyEngine) + + with pd.option_context("io.sql.engine", "sqlalchemy"): + assert isinstance(get_engine("auto"), SQLAlchemyEngine) + assert isinstance(get_engine("sqlalchemy"), SQLAlchemyEngine) + + with pd.option_context("io.sql.engine", "auto"): + assert isinstance(get_engine("auto"), SQLAlchemyEngine) + assert isinstance(get_engine("sqlalchemy"), SQLAlchemyEngine) + + def test_get_engine_auto_error_message(self): + # Expect different error messages from get_engine(engine="auto") + # if engines aren't installed vs. are installed but bad version + pass + # TODO fill this in when we add more engines + class _TestSQLAlchemyConn(_EngineToConnMixin, _TestSQLAlchemy): def test_transactions(self):