diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index d6ace9a997951..4b8aa62ab8730 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -63,11 +63,6 @@ try: import sqlalchemy - from sqlalchemy import inspect - from sqlalchemy.ext import declarative - from sqlalchemy.orm import session as sa_session - import sqlalchemy.schema - import sqlalchemy.sql.sqltypes as sqltypes SQLALCHEMY_INSTALLED = True except ImportError: @@ -417,10 +412,9 @@ def drop_table(self, table_name): sql.SQLDatabase(self.conn).drop_table(table_name) def _get_all_tables(self): - meta = sqlalchemy.schema.MetaData(bind=self.conn) - meta.reflect() - table_list = meta.tables.keys() - return table_list + from sqlalchemy import inspect + + return inspect(self.conn).get_table_names() def _close_conn(self): # https://docs.sqlalchemy.org/en/13/core/connections.html#engine-disposal @@ -991,8 +985,14 @@ def test_get_schema_with_schema(self, test_frame1): assert "CREATE TABLE pypi." in create_sql def test_get_schema_dtypes(self): + if self.mode == "sqlalchemy": + from sqlalchemy import Integer + + dtype = Integer + else: + dtype = "INTEGER" + float_frame = DataFrame({"a": [1.1, 1.2], "b": [2.1, 2.2]}) - dtype = sqlalchemy.Integer if self.mode == "sqlalchemy" else "INTEGER" create_sql = sql.get_schema( float_frame, "test", con=self.conn, dtype={"b": dtype} ) @@ -1127,10 +1127,12 @@ def test_read_sql_delegate(self): tm.assert_frame_equal(iris_frame1, iris_frame2) def test_not_reflect_all_tables(self): + from sqlalchemy import text + # create invalid table - qry = """CREATE TABLE invalid (x INTEGER, y UNKNOWN);""" + qry = text("CREATE TABLE invalid (x INTEGER, y UNKNOWN);") self.conn.execute(qry) - qry = """CREATE TABLE other_table (x INTEGER, y INTEGER);""" + qry = text("CREATE TABLE other_table (x INTEGER, y INTEGER);") self.conn.execute(qry) with warnings.catch_warnings(record=True) as w: @@ -1165,6 +1167,7 @@ def _get_index_columns(self, tbl_name): return ixs def test_sqlalchemy_type_mapping(self): + from sqlalchemy import TIMESTAMP # Test Timestamp objects (no datetime64 because of timezone) (GH9085) df = DataFrame( @@ -1173,7 +1176,7 @@ def test_sqlalchemy_type_mapping(self): db = sql.SQLDatabase(self.conn) table = sql.SQLTable("test_type", db, frame=df) # GH 9086: TIMESTAMP is the suggested type for datetimes with timezones - assert isinstance(table.table.c["time"].type, sqltypes.TIMESTAMP) + assert isinstance(table.table.c["time"].type, TIMESTAMP) @pytest.mark.parametrize( "integer, expected", @@ -1247,18 +1250,23 @@ def test_database_uri_string(self, test_frame1): def test_query_by_text_obj(self): # WIP : GH10846 - name_text = sqlalchemy.text("select * from iris where name=:name") + from sqlalchemy import text + + name_text = text("select * from iris where name=:name") iris_df = sql.read_sql(name_text, self.conn, params={"name": "Iris-versicolor"}) all_names = set(iris_df["Name"]) assert all_names == {"Iris-versicolor"} def test_query_by_select_obj(self): # WIP : GH10846 - iris = iris_table_metadata(self.flavor) - - name_select = sqlalchemy.select([iris]).where( - iris.c.Name == sqlalchemy.bindparam("name") + from sqlalchemy import ( + bindparam, + select, ) + + iris = iris_table_metadata(self.flavor) + iris_select = iris if _gt14() else [iris] + name_select = select(iris_select).where(iris.c.Name == bindparam("name")) iris_df = sql.read_sql(name_select, self.conn, params={"name": "Iris-setosa"}) all_names = set(iris_df["Name"]) assert all_names == {"Iris-setosa"} @@ -1468,6 +1476,8 @@ def test_create_table(self): pandasSQL.to_sql(temp_frame, "temp_frame") if _gt14(): + from sqlalchemy import inspect + insp = inspect(temp_conn) assert insp.has_table("temp_frame") else: @@ -1484,6 +1494,8 @@ def test_drop_table(self): pandasSQL.to_sql(temp_frame, "temp_frame") if _gt14(): + from sqlalchemy import inspect + insp = inspect(temp_conn) assert insp.has_table("temp_frame") else: @@ -1846,48 +1858,68 @@ def test_get_schema_create_table(self, test_frame3): # Use a dataframe without a bool column, since MySQL converts bool to # TINYINT (which read_sql_table returns as an int and causes a dtype # mismatch) + from sqlalchemy import text + from sqlalchemy.engine import Engine tbl = "test_get_schema_create_table" create_sql = sql.get_schema(test_frame3, tbl, con=self.conn) blank_test_df = test_frame3.iloc[:0] self.drop_table(tbl) - self.conn.execute(create_sql) + create_sql = text(create_sql) + if isinstance(self.conn, Engine): + with self.conn.connect() as conn: + conn.execute(create_sql) + else: + self.conn.execute(create_sql) returned_df = sql.read_sql_table(tbl, self.conn) tm.assert_frame_equal(returned_df, blank_test_df, check_index_type=False) self.drop_table(tbl) def test_dtype(self): + from sqlalchemy import ( + TEXT, + String, + ) + from sqlalchemy.schema import MetaData + cols = ["A", "B"] data = [(0.8, True), (0.9, None)] df = DataFrame(data, columns=cols) df.to_sql("dtype_test", self.conn) - df.to_sql("dtype_test2", self.conn, dtype={"B": sqlalchemy.TEXT}) - meta = sqlalchemy.schema.MetaData(bind=self.conn) - meta.reflect() + df.to_sql("dtype_test2", self.conn, dtype={"B": TEXT}) + meta = MetaData() + meta.reflect(bind=self.conn) sqltype = meta.tables["dtype_test2"].columns["B"].type - assert isinstance(sqltype, sqlalchemy.TEXT) + assert isinstance(sqltype, TEXT) msg = "The type of B is not a SQLAlchemy type" with pytest.raises(ValueError, match=msg): df.to_sql("error", self.conn, dtype={"B": str}) # GH9083 - df.to_sql("dtype_test3", self.conn, dtype={"B": sqlalchemy.String(10)}) - meta.reflect() + df.to_sql("dtype_test3", self.conn, dtype={"B": String(10)}) + meta.reflect(bind=self.conn) sqltype = meta.tables["dtype_test3"].columns["B"].type - assert isinstance(sqltype, sqlalchemy.String) + assert isinstance(sqltype, String) assert sqltype.length == 10 # single dtype - df.to_sql("single_dtype_test", self.conn, dtype=sqlalchemy.TEXT) - meta = sqlalchemy.schema.MetaData(bind=self.conn) - meta.reflect() + df.to_sql("single_dtype_test", self.conn, dtype=TEXT) + meta.reflect(bind=self.conn) sqltypea = meta.tables["single_dtype_test"].columns["A"].type sqltypeb = meta.tables["single_dtype_test"].columns["B"].type - assert isinstance(sqltypea, sqlalchemy.TEXT) - assert isinstance(sqltypeb, sqlalchemy.TEXT) + assert isinstance(sqltypea, TEXT) + assert isinstance(sqltypeb, TEXT) def test_notna_dtype(self): + from sqlalchemy import ( + Boolean, + DateTime, + Float, + Integer, + ) + from sqlalchemy.schema import MetaData + cols = { "Bool": Series([True, None]), "Date": Series([datetime(2012, 5, 1), None]), @@ -1898,22 +1930,24 @@ def test_notna_dtype(self): tbl = "notna_dtype_test" df.to_sql(tbl, self.conn) - returned_df = sql.read_sql_table(tbl, self.conn) # noqa - meta = sqlalchemy.schema.MetaData(bind=self.conn) - meta.reflect() - if self.flavor == "mysql": - my_type = sqltypes.Integer - else: - my_type = sqltypes.Boolean - + _ = sql.read_sql_table(tbl, self.conn) + meta = MetaData() + meta.reflect(bind=self.conn) + my_type = Integer if self.flavor == "mysql" else Boolean col_dict = meta.tables[tbl].columns - assert isinstance(col_dict["Bool"].type, my_type) - assert isinstance(col_dict["Date"].type, sqltypes.DateTime) - assert isinstance(col_dict["Int"].type, sqltypes.Integer) - assert isinstance(col_dict["Float"].type, sqltypes.Float) + assert isinstance(col_dict["Date"].type, DateTime) + assert isinstance(col_dict["Int"].type, Integer) + assert isinstance(col_dict["Float"].type, Float) def test_double_precision(self): + from sqlalchemy import ( + BigInteger, + Float, + Integer, + ) + from sqlalchemy.schema import MetaData + V = 1.23456789101112131415 df = DataFrame( @@ -1931,7 +1965,7 @@ def test_double_precision(self): self.conn, index=False, if_exists="replace", - dtype={"f64_as_f32": sqlalchemy.Float(precision=23)}, + dtype={"f64_as_f32": Float(precision=23)}, ) res = sql.read_sql_table("test_dtypes", self.conn) @@ -1939,18 +1973,19 @@ def test_double_precision(self): assert np.round(df["f64"].iloc[0], 14) == np.round(res["f64"].iloc[0], 14) # check sql types - meta = sqlalchemy.schema.MetaData(bind=self.conn) - meta.reflect() + meta = MetaData() + meta.reflect(bind=self.conn) col_dict = meta.tables["test_dtypes"].columns assert str(col_dict["f32"].type) == str(col_dict["f64_as_f32"].type) - assert isinstance(col_dict["f32"].type, sqltypes.Float) - assert isinstance(col_dict["f64"].type, sqltypes.Float) - assert isinstance(col_dict["i32"].type, sqltypes.Integer) - assert isinstance(col_dict["i64"].type, sqltypes.BigInteger) + assert isinstance(col_dict["f32"].type, Float) + assert isinstance(col_dict["f64"].type, Float) + assert isinstance(col_dict["i32"].type, Integer) + assert isinstance(col_dict["i64"].type, BigInteger) def test_connectable_issue_example(self): # This tests the example raised in issue # https://github.com/pandas-dev/pandas/issues/10104 + from sqlalchemy.engine import Engine def foo(connection): query = "SELECT test_foo_data FROM test_foo_data" @@ -1959,17 +1994,23 @@ def foo(connection): def bar(connection, data): data.to_sql(name="test_foo_data", con=connection, if_exists="append") + def baz(conn): + if _gt14(): + # https://github.com/sqlalchemy/sqlalchemy/commit/ + # 00b5c10846e800304caa86549ab9da373b42fa5d#r48323973 + foo_data = foo(conn) + bar(conn, foo_data) + else: + foo_data = conn.run_callable(foo) + conn.run_callable(bar, foo_data) + def main(connectable): - with connectable.connect() as conn: - with conn.begin(): - if _gt14(): - # https://github.com/sqlalchemy/sqlalchemy/commit/ - # 00b5c10846e800304caa86549ab9da373b42fa5d#r48323973 - foo_data = foo(conn) - bar(conn, foo_data) - else: - foo_data = conn.run_callable(foo) - conn.run_callable(bar, foo_data) + if isinstance(connectable, Engine): + with connectable.connect() as conn: + with conn.begin(): + baz(conn) + else: + baz(connectable) DataFrame({"test_foo_data": [0, 1, 2]}).to_sql("test_foo_data", self.conn) main(self.conn) @@ -2003,24 +2044,49 @@ def test_to_sql_with_negative_npinf(self, input, request): tm.assert_equal(df, res) def test_temporary_table(self): + from sqlalchemy import ( + Column, + Integer, + Unicode, + select, + ) + from sqlalchemy.orm import ( + Session, + sessionmaker, + ) + + if _gt14(): + from sqlalchemy.orm import declarative_base + else: + from sqlalchemy.ext.declarative import declarative_base + test_data = "Hello, World!" expected = DataFrame({"spam": [test_data]}) - Base = declarative.declarative_base() + Base = declarative_base() class Temporary(Base): __tablename__ = "temp_test" __table_args__ = {"prefixes": ["TEMPORARY"]} - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False) - - Session = sa_session.sessionmaker(bind=self.conn) - session = Session() - with session.transaction: - conn = session.connection() - Temporary.__table__.create(conn) - session.add(Temporary(spam=test_data)) - session.flush() - df = sql.read_sql_query(sql=sqlalchemy.select([Temporary.spam]), con=conn) + id = Column(Integer, primary_key=True) + spam = Column(Unicode(30), nullable=False) + + if _gt14(): + with Session(self.conn) as session: + with session.begin(): + conn = session.connection() + Temporary.__table__.create(conn) + session.add(Temporary(spam=test_data)) + session.flush() + df = sql.read_sql_query(sql=select(Temporary.spam), con=conn) + else: + Session = sessionmaker() + session = Session(bind=self.conn) + with session.transaction: + conn = session.connection() + Temporary.__table__.create(conn) + session.add(Temporary(spam=test_data)) + session.flush() + df = sql.read_sql_query(sql=select([Temporary.spam]), con=conn) tm.assert_frame_equal(df, expected) @@ -2151,6 +2217,8 @@ def test_default_type_conversion(self): def test_read_procedure(self): import pymysql + from sqlalchemy import text + from sqlalchemy.engine import Engine # see GH7324. Although it is more an api test, it is added to the # mysql tests as sqlite does not have stored procedures @@ -2164,11 +2232,11 @@ def test_read_procedure(self): BEGIN SELECT * FROM test_procedure; END""" - - connection = self.conn.connect() + proc = text(proc) + connection = self.conn.connect() if isinstance(self.conn, Engine) else self.conn trans = connection.begin() try: - r1 = connection.execute(proc) # noqa + _ = connection.execute(proc) trans.commit() except pymysql.Error: trans.rollback() @@ -2203,6 +2271,8 @@ def setup_driver(cls): cls.driver = "psycopg2" def test_schema_support(self): + from sqlalchemy.engine import Engine + # only test this for postgresql (schema's not supported in # mysql/sqlite) df = DataFrame({"col1": [1, 2], "col2": [0.1, 0.2], "col3": ["a", "n"]}) @@ -2262,7 +2332,7 @@ def test_schema_support(self): # The schema won't be applied on another Connection # because of transactional schemas - if isinstance(self.conn, sqlalchemy.engine.Engine): + if isinstance(self.conn, Engine): engine2 = self.connect() pdsql = sql.SQLDatabase(engine2, schema="other") pdsql.to_sql(df, "test_schema_other2", index=False)