Skip to content

ENH: Pluggable SQL performance via new SQL engine keyword #40556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
569b1bc
Merge pull request #1 from pandas-dev/master
yehoshuadimarsky Jan 19, 2020
845b504
Merge branch 'master' of github.com:yehoshuadimarsky/pandas
yehoshuadimarsky Jun 3, 2020
4c9db09
Merge branch 'master' of https://github.com/pandas-dev/pandas
yehoshuadimarsky Oct 21, 2020
663ebae
merged from pandas-dev/pandas
Nov 16, 2020
f471a72
Merge branch 'master' of https://github.com/pandas-dev/pandas
Jan 24, 2021
8fb4df6
Merge branch 'master' of https://github.com/pandas-dev/pandas
yehoshuadimarsky Feb 7, 2021
383c1cb
Merge branch 'master' of https://github.com/pandas-dev/pandas
yehoshuadimarsky Mar 21, 2021
2562f71
initial refactor of sqlalchemy, also added bcpandas but no tests yet
yehoshuadimarsky Mar 21, 2021
982593c
reverting bcpandas impl; keeping to strictly refactoring sqlalchemy
yehoshuadimarsky Mar 21, 2021
962a36c
changes requested by @jreback
yehoshuadimarsky Mar 23, 2021
0e96765
Merge remote-tracking branch 'upstream/master' into sql-engine
yehoshuadimarsky Apr 11, 2021
dbf0cfa
added tests for sql configs
yehoshuadimarsky Apr 11, 2021
b77b6a3
fixed versionadded
yehoshuadimarsky Apr 12, 2021
7f022fe
merged upstream
yehoshuadimarsky Apr 13, 2021
965538d
Merge branch 'sql-engine' of https://github.com/yehoshuadimarsky/pand…
yehoshuadimarsky Apr 13, 2021
2ab9d85
Merge branch 'master' into sql-engine
yehoshuadimarsky Apr 16, 2021
c34c97b
change Optional[DtypeArg] to DtypeArg | None
yehoshuadimarsky Apr 16, 2021
5adb8b2
Merge remote-tracking branch 'upstream/master' into sql-engine
yehoshuadimarsky Apr 29, 2021
80e3a1b
catch mysql -inf exception
yehoshuadimarsky Apr 30, 2021
69051bc
Merge remote-tracking branch 'upstream/master' into sql-engine
yehoshuadimarsky Apr 30, 2021
1423693
Merge remote-tracking branch 'upstream/master' into sql-engine
yehoshuadimarsky Apr 30, 2021
4f6f8ea
another fix for CI test
yehoshuadimarsky Apr 30, 2021
0be19ce
Regex update to catch CI error
yehoshuadimarsky May 2, 2021
3beb9aa
Merge remote-tracking branch 'upstream/master' into sql-engine
yehoshuadimarsky May 2, 2021
f084faa
Remove `re.MULTILINE | re.VERBOSE`
yehoshuadimarsky May 2, 2021
36adf43
Update pandas/io/sql.py
yehoshuadimarsky May 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/source/user_guide/options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ io.hdf.dropna_table True drop ALL nan rows when appe
io.parquet.engine None The engine to use as a default for
parquet reading and writing. If None
then try 'pyarrow' and 'fastparquet'
io.sql.engine None The engine to use as a default for
sql reading and writing, with SQLAlchemy
as a higher level interface. If None
then try 'sqlalchemy'
mode.chained_assignment warn Controls ``SettingWithCopyWarning``:
'raise', 'warn', or None. Raise an
exception, warn, or no action if
Expand Down
16 changes: 16 additions & 0 deletions pandas/core/config_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,22 @@ def use_inf_as_na_cb(key):
validator=is_one_of_factory(["auto", "pyarrow", "fastparquet"]),
)


# Set up the io.sql specific configuration.
sql_engine_doc = """
: string
The default sql reader/writer engine. Available options:
'auto', 'sqlalchemy', the default is 'auto'
"""

with cf.config_prefix("io.sql"):
cf.register_option(
"engine",
"auto",
sql_engine_doc,
validator=is_one_of_factory(["auto", "sqlalchemy"]),
)
Comment on lines +662 to +669
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already suggested using Python's entrypoint mechanism for registering additional engines. I think this would be a good place to leverage. The idea is that you can implement an engine without contributing anything to pandas making the development of that much more flexible / fast-paced.

In the library implementing the engine, you would add the following to the setup.py:

entry_points={"pandas_sql_engine": ["turbodbc = turbodbc.TurbodbcPandasEngine"]}

Here you can then load the engines using the entry points mechanism (without importing anything!):

Suggested change
with cf.config_prefix("io.sql"):
cf.register_option(
"engine",
"auto",
sql_engine_doc,
validator=is_one_of_factory(["auto", "sqlalchemy"]),
)
import pkg_resources
EXTERNAL_SQL_ENGINES = [e.name for e in pkg_resources.iter_entry_points('pandas_sql_engine')]
SQL_ENGINES = ["sqlalchemy"] + EXTERNAL_SQL_ENGINES
with cf.config_prefix("io.sql"):
cf.register_option(
"engine",
"auto",
sql_engine_doc,
validator=is_one_of_factory(["auto"] + SQL_ENGINES),
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know... I've never used entrypoints before, makes me a little uneasy to use them without understanding them, I'd need to learn some more about it... But we don't do this for any other I/O engines like Parquet or Excel, so why not keep the same here?

@jreback what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I would expect the number of engines to be larger in this case here. Personally, I would write at least an engine each for snowflake, turbodbc and postgres.
  2. Databases are harder to test than file formats as you need to start additional services. Integrating all of them into the pandas CI would probably be quite heavy at the end.
  3. I'm really excited about ExtensionArrays as this gives us a nice way to extend the pandas functionality with additions that are experimental and wouldn't be merged into core pandas. They don't need a technique like entrypoints as you instantiate them explicitly by using the class constructor of the ExtensionArray. This isn't the case for the database engines as you only specify the engine via a string.
  4. We can use a different mechanism but entrypoints have the nice effect of being very light-weight and discovery works without any Python code of the additional packages to be run before any pandas code, i.e. you don't need to ensure that you have imported the engine code before any other code uses pandas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could certainly add this (we already use entry points for plotting), but let's do as a followup (pls create an issue)


# --------
# Plotting
# ---------
Expand Down
256 changes: 205 additions & 51 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import pandas._libs.lib as lib
from pandas._typing import DtypeArg
from pandas.compat._optional import import_optional_dependency
from pandas.errors import AbstractMethodError

from pandas.core.dtypes.common import (
is_datetime64tz_dtype,
Expand All @@ -36,6 +38,7 @@
from pandas.core.dtypes.dtypes import DatetimeTZDtype
from pandas.core.dtypes.missing import isna

from pandas import get_option
from pandas.core.api import (
DataFrame,
Series,
Expand Down Expand Up @@ -643,6 +646,8 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: str | None = None,
engine: str = "auto",
**engine_kwargs,
) -> None:
"""
Write records stored in a DataFrame to a SQL database.
Expand Down Expand Up @@ -689,6 +694,16 @@ def to_sql(
section :ref:`insert method <io.sql.method>`.

.. versionadded:: 0.24.0

engine : {'auto', 'sqlalchemy'}, default 'auto'
SQL engine library to use. If 'auto', then the option
``io.sql.engine`` is used. The default ``io.sql.engine``
behavior is 'sqlalchemy'

.. versionadded:: 1.3.0

**engine_kwargs
Any additional kwargs are passed to the engine.
"""
if if_exists not in ("fail", "replace", "append"):
raise ValueError(f"'{if_exists}' is not valid for if_exists")
Expand All @@ -712,6 +727,8 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
engine=engine,
**engine_kwargs,
)


Expand Down Expand Up @@ -1283,6 +1300,89 @@ def to_sql(
)


class BaseEngine:
def insert_records(
self,
table: SQLTable,
con,
frame,
name,
index=True,
schema=None,
chunksize=None,
method=None,
**engine_kwargs,
):
"""
Inserts data into already-prepared table
"""
raise AbstractMethodError(self)


class SQLAlchemyEngine(BaseEngine):
def __init__(self):
import_optional_dependency(
"sqlalchemy", extra="sqlalchemy is required for SQL support."
)

def insert_records(
self,
table: SQLTable,
con,
frame,
name,
index=True,
schema=None,
chunksize=None,
method=None,
**engine_kwargs,
):
from sqlalchemy import exc

try:
table.insert(chunksize=chunksize, method=method)
except exc.SQLAlchemyError as err:
# GH34431
msg_pattern = "(OperationalError: )?\\(1054, \"Unknown column 'inf(e0)?' in 'field list'\"\\)" # noqa: E501
err_text = str(err.orig)
if re.search(msg_pattern, err_text, re.MULTILINE | re.VERBOSE):
raise ValueError("inf cannot be used with MySQL") from err
else:
raise err


def get_engine(engine: str) -> BaseEngine:
""" return our implementation """
if engine == "auto":
engine = get_option("io.sql.engine")

if engine == "auto":
# try engines in this order
engine_classes = [SQLAlchemyEngine]

error_msgs = ""
for engine_class in engine_classes:
try:
return engine_class()
except ImportError as err:
error_msgs += "\n - " + str(err)

raise ImportError(
"Unable to find a usable engine; "
"tried using: 'sqlalchemy'.\n"
"A suitable version of "
"sqlalchemy is required for sql I/O "
"support.\n"
"Trying to import the above resulted in these errors:"
f"{error_msgs}"
)

elif engine == "sqlalchemy":
return SQLAlchemyEngine()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you could also use the entrypoint mechanism to do the actual engine load.

Suggested change
return SQLAlchemyEngine()
return SQLAlchemyEngine()
else:
return {e.name: e for e in pkg_resources.iter_entry_points('pandas_sql_engine')}[engine].load()()


raise ValueError("engine must be one of 'auto', 'sqlalchemy'")


class SQLDatabase(PandasSQL):
"""
This class enables conversion between DataFrame and SQL databases
Expand Down Expand Up @@ -1504,58 +1604,18 @@ def read_query(

read_sql = read_query

def to_sql(
def prep_table(
self,
frame,
name,
if_exists="fail",
index=True,
index_label=None,
schema=None,
chunksize=None,
dtype: DtypeArg | None = None,
method=None,
):
) -> SQLTable:
"""
Write records stored in a DataFrame to a SQL database.

Parameters
----------
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
index : bool, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Name of SQL schema in database to write to (if database flavor
supports this). If specified, this overwrites the default
schema of the SQLDatabase object.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.
dtype : single type or dict of column name to SQL type, default None
Optional specifying the datatype for columns. The SQL type should
be a SQLAlchemy type. If all columns are of the same type, one
single value can be used.
method : {None', 'multi', callable}, default None
Controls the SQL insertion clause used:

* None : Uses standard SQL ``INSERT`` clause (one per row).
* 'multi': Pass multiple values in a single ``INSERT`` clause.
* callable with signature ``(pd_table, conn, keys, data_iter)``.

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.

.. versionadded:: 0.24.0
Prepares table in the database for data insertion. Creates it if needed, etc.
"""
if dtype:
if not is_dict_like(dtype):
Expand Down Expand Up @@ -1589,15 +1649,17 @@ def to_sql(
dtype=dtype,
)
table.create()
return table

from sqlalchemy.exc import SQLAlchemyError

try:
table.insert(chunksize, method=method)
except SQLAlchemyError as err:
# GH 34431 36465
raise ValueError("inf cannot be used with MySQL") from err

def check_case_sensitive(
self,
name,
schema,
):
"""
Checks table name for issues with case-sensitivity.
Method is called after data is inserted.
"""
if not name.isdigit() and not name.islower():
# check for potentially case sensitivity issues (GH7815)
# Only check when name is not a number and name is not lower case
Expand All @@ -1623,6 +1685,97 @@ def to_sql(
)
warnings.warn(msg, UserWarning)

def to_sql(
self,
frame,
name,
if_exists="fail",
index=True,
index_label=None,
schema=None,
chunksize=None,
dtype: DtypeArg | None = None,
method=None,
engine="auto",
**engine_kwargs,
):
"""
Write records stored in a DataFrame to a SQL database.

Parameters
----------
frame : DataFrame
name : string
Name of SQL table.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
index : boolean, default True
Write DataFrame index as a column.
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Name of SQL schema in database to write to (if database flavor
supports this). If specified, this overwrites the default
schema of the SQLDatabase object.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.
dtype : single type or dict of column name to SQL type, default None
Optional specifying the datatype for columns. The SQL type should
be a SQLAlchemy type. If all columns are of the same type, one
single value can be used.
method : {None', 'multi', callable}, default None
Controls the SQL insertion clause used:

* None : Uses standard SQL ``INSERT`` clause (one per row).
* 'multi': Pass multiple values in a single ``INSERT`` clause.
* callable with signature ``(pd_table, conn, keys, data_iter)``.

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.

.. versionadded:: 0.24.0

engine : {'auto', 'sqlalchemy'}, default 'auto'
SQL engine library to use. If 'auto', then the option
``io.sql.engine`` is used. The default ``io.sql.engine``
behavior is 'sqlalchemy'

.. versionadded:: 1.3.0

**engine_kwargs
Any additional kwargs are passed to the engine.
"""
sql_engine = get_engine(engine)

table = self.prep_table(
frame=frame,
name=name,
if_exists=if_exists,
index=index,
index_label=index_label,
schema=schema,
dtype=dtype,
)

sql_engine.insert_records(
table=table,
con=self.connectable,
frame=frame,
name=name,
index=index,
schema=schema,
chunksize=chunksize,
method=method,
**engine_kwargs,
)

self.check_case_sensitive(name=name, schema=schema)

@property
def tables(self):
return self.meta.tables
Expand Down Expand Up @@ -2008,6 +2161,7 @@ def to_sql(
chunksize=None,
dtype: DtypeArg | None = None,
method=None,
**kwargs,
):
"""
Write records stored in a DataFrame to a SQL database.
Expand Down
Loading