diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 2b139f8ca527c..0788d9da06eb9 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -138,7 +138,7 @@ def _parse_date_columns(data_frame, parse_dates): if isinstance(df_col.dtype, DatetimeTZDtype) or col_name in parse_dates: try: fmt = parse_dates[col_name] - except TypeError: + except (KeyError, TypeError): fmt = None data_frame.isetitem(i, _handle_date_column(df_col, format=fmt)) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index bfa93a4ff910e..bbdb22955297e 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -413,6 +413,8 @@ def mysql_pymysql_engine(iris_path, types_data): for entry in types_data: entry.pop("DateColWithTz") create_and_load_types(engine, types_data, "mysql") + if not insp.has_table("iris_view"): + create_and_load_iris_view(engine) yield engine with engine.connect() as conn: with conn.begin(): @@ -422,7 +424,7 @@ def mysql_pymysql_engine(iris_path, types_data): @pytest.fixture -def mysql_pymysql_conn(mysql_pymysql_engine): +def mysql_pymysql_conn(iris_path, mysql_pymysql_engine): with mysql_pymysql_engine.connect() as conn: yield conn @@ -440,6 +442,8 @@ def postgresql_psycopg2_engine(iris_path, types_data): create_and_load_iris(engine, iris_path, "postgresql") if not insp.has_table("types"): create_and_load_types(engine, types_data, "postgresql") + if not insp.has_table("iris_view"): + create_and_load_iris_view(engine) yield engine with engine.connect() as conn: with conn.begin(): @@ -462,9 +466,20 @@ def sqlite_str(): @pytest.fixture -def sqlite_engine(sqlite_str): +def sqlite_engine(sqlite_str, iris_path, types_data): sqlalchemy = pytest.importorskip("sqlalchemy") engine = sqlalchemy.create_engine(sqlite_str, poolclass=sqlalchemy.pool.NullPool) + + insp = sqlalchemy.inspect(engine) + if not insp.has_table("iris"): + create_and_load_iris(engine, iris_path, "sqlite") + if not insp.has_table("iris_view"): + create_and_load_iris_view(engine) + if not insp.has_table("types"): + for entry in types_data: + entry.pop("DateColWithTz") + create_and_load_types(engine, types_data, "sqlite") + yield engine engine.dispose() @@ -476,17 +491,25 @@ def sqlite_conn(sqlite_engine): @pytest.fixture -def sqlite_iris_str(sqlite_str, iris_path): +def sqlite_iris_str(sqlite_str, iris_path, types_data): sqlalchemy = pytest.importorskip("sqlalchemy") engine = sqlalchemy.create_engine(sqlite_str) - create_and_load_iris(engine, iris_path, "sqlite") + + insp = sqlalchemy.inspect(engine) + if not insp.has_table("iris"): + create_and_load_iris(engine, iris_path, "sqlite") + if not insp.has_table("iris_view"): + create_and_load_iris_view(engine) + if not insp.has_table("types"): + for entry in types_data: + entry.pop("DateColWithTz") + create_and_load_types(engine, types_data, "sqlite") engine.dispose() return sqlite_str @pytest.fixture def sqlite_iris_engine(sqlite_engine, iris_path): - create_and_load_iris(sqlite_engine, iris_path, "sqlite") return sqlite_engine @@ -499,6 +522,7 @@ def sqlite_iris_conn(sqlite_iris_engine): @pytest.fixture def sqlite_buildin(): with contextlib.closing(sqlite3.connect(":memory:")) as closing_conn: + create_and_load_iris_view(closing_conn) with closing_conn as conn: yield conn @@ -1097,6 +1121,7 @@ class PandasSQLTest: """ def load_iris_data(self, iris_path): + self.drop_view("iris_view", self.conn) self.drop_table("iris", self.conn) if isinstance(self.conn, sqlite3.Connection): create_and_load_iris_sqlite3(self.conn, iris_path) @@ -1221,470 +1246,695 @@ class DummyException(Exception): # -- Testing the public API -class _TestSQLApi(PandasSQLTest): - """ - Base class to test the public API. +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable_iris) +def test_api_read_sql_view(conn, request): + conn = request.getfixturevalue(conn) + iris_frame = sql.read_sql_query("SELECT * FROM iris_view", conn) + check_iris_frame(iris_frame) - From this two classes are derived to run these tests for both the - sqlalchemy mode (`TestSQLApi`) and the fallback mode - (`TestSQLiteFallbackApi`). These tests are run with sqlite3. Specific - tests for the different sql flavours are included in `_TestSQLAlchemy`. - Notes: - flavor can always be passed even in SQLAlchemy mode, - should be correctly ignored. +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable_iris) +def test_api_read_sql_with_chunksize_no_result(conn, request): + conn = request.getfixturevalue(conn) + query = 'SELECT * FROM iris_view WHERE "SepalLength" < 0.0' + with_batch = sql.read_sql_query(query, conn, chunksize=5) + without_batch = sql.read_sql_query(query, conn) + tm.assert_frame_equal(concat(with_batch), without_batch) - we don't use drop_table because that isn't part of the public api - """ +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame1", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame1") - flavor = "sqlite" - mode: str + sql.to_sql(test_frame1, "test_frame1", conn) + assert sql.has_table("test_frame1", conn) - @pytest.fixture(autouse=True) - def setup_method(self, iris_path, types_data): - self.conn = self.connect() - self.load_iris_data(iris_path) - self.load_types_data(types_data) - self.load_test_data_and_sql() - def load_test_data_and_sql(self): - create_and_load_iris_view(self.conn) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql_fail(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame2", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame2") - def test_read_sql_view(self): - iris_frame = sql.read_sql_query("SELECT * FROM iris_view", self.conn) - check_iris_frame(iris_frame) + sql.to_sql(test_frame1, "test_frame2", conn, if_exists="fail") + assert sql.has_table("test_frame2", conn) - def test_read_sql_with_chunksize_no_result(self): - query = "SELECT * FROM iris_view WHERE SepalLength < 0.0" - with_batch = sql.read_sql_query(query, self.conn, chunksize=5) - without_batch = sql.read_sql_query(query, self.conn) - tm.assert_frame_equal(concat(with_batch), without_batch) + msg = "Table 'test_frame2' already exists" + with pytest.raises(ValueError, match=msg): + sql.to_sql(test_frame1, "test_frame2", conn, if_exists="fail") - def test_to_sql(self, test_frame1): - sql.to_sql(test_frame1, "test_frame1", self.conn) - assert sql.has_table("test_frame1", self.conn) - def test_to_sql_fail(self, test_frame1): - sql.to_sql(test_frame1, "test_frame2", self.conn, if_exists="fail") - assert sql.has_table("test_frame2", self.conn) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql_replace(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame3", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame3") - msg = "Table 'test_frame2' already exists" - with pytest.raises(ValueError, match=msg): - sql.to_sql(test_frame1, "test_frame2", self.conn, if_exists="fail") + sql.to_sql(test_frame1, "test_frame3", conn, if_exists="fail") + # Add to table again + sql.to_sql(test_frame1, "test_frame3", conn, if_exists="replace") + assert sql.has_table("test_frame3", conn) - def test_to_sql_replace(self, test_frame1): - sql.to_sql(test_frame1, "test_frame3", self.conn, if_exists="fail") - # Add to table again - sql.to_sql(test_frame1, "test_frame3", self.conn, if_exists="replace") - assert sql.has_table("test_frame3", self.conn) + num_entries = len(test_frame1) + num_rows = count_rows(conn, "test_frame3") - num_entries = len(test_frame1) - num_rows = count_rows(self.conn, "test_frame3") + assert num_rows == num_entries - assert num_rows == num_entries - def test_to_sql_append(self, test_frame1): - assert sql.to_sql(test_frame1, "test_frame4", self.conn, if_exists="fail") == 4 +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql_append(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame4", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame4") - # Add to table again - assert ( - sql.to_sql(test_frame1, "test_frame4", self.conn, if_exists="append") == 4 - ) - assert sql.has_table("test_frame4", self.conn) + assert sql.to_sql(test_frame1, "test_frame4", conn, if_exists="fail") == 4 - num_entries = 2 * len(test_frame1) - num_rows = count_rows(self.conn, "test_frame4") + # Add to table again + assert sql.to_sql(test_frame1, "test_frame4", conn, if_exists="append") == 4 + assert sql.has_table("test_frame4", conn) - assert num_rows == num_entries + num_entries = 2 * len(test_frame1) + num_rows = count_rows(conn, "test_frame4") - def test_to_sql_type_mapping(self, test_frame3): - sql.to_sql(test_frame3, "test_frame5", self.conn, index=False) - result = sql.read_sql("SELECT * FROM test_frame5", self.conn) + assert num_rows == num_entries - tm.assert_frame_equal(test_frame3, result) - def test_to_sql_series(self): - s = Series(np.arange(5, dtype="int64"), name="series") - sql.to_sql(s, "test_series", self.conn, index=False) - s2 = sql.read_sql_query("SELECT * FROM test_series", self.conn) - tm.assert_frame_equal(s.to_frame(), s2) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql_type_mapping(conn, request, test_frame3): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame5", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame5") + + sql.to_sql(test_frame3, "test_frame5", conn, index=False) + result = sql.read_sql("SELECT * FROM test_frame5", conn) - def test_roundtrip(self, test_frame1): - sql.to_sql(test_frame1, "test_frame_roundtrip", con=self.conn) - result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=self.conn) + tm.assert_frame_equal(test_frame3, result) - # HACK! - result.index = test_frame1.index - result.set_index("level_0", inplace=True) - result.index.astype(int) - result.index.name = None - tm.assert_frame_equal(result, test_frame1) - def test_roundtrip_chunksize(self, test_frame1): - sql.to_sql( - test_frame1, - "test_frame_roundtrip", - con=self.conn, - index=False, - chunksize=2, - ) - result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=self.conn) - tm.assert_frame_equal(result, test_frame1) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql_series(conn, request): + conn = request.getfixturevalue(conn) + if sql.has_table("test_series", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_series") - def test_execute_sql(self): - # drop_sql = "DROP TABLE IF EXISTS test" # should already be done - with sql.pandasSQL_builder(self.conn) as pandas_sql: - iris_results = pandas_sql.execute("SELECT * FROM iris") + s = Series(np.arange(5, dtype="int64"), name="series") + sql.to_sql(s, "test_series", conn, index=False) + s2 = sql.read_sql_query("SELECT * FROM test_series", conn) + tm.assert_frame_equal(s.to_frame(), s2) + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_roundtrip(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame_roundtrip", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame_roundtrip") + + sql.to_sql(test_frame1, "test_frame_roundtrip", con=conn) + result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=conn) + + # HACK! + result.index = test_frame1.index + result.set_index("level_0", inplace=True) + result.index.astype(int) + result.index.name = None + tm.assert_frame_equal(result, test_frame1) + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_roundtrip_chunksize(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + if sql.has_table("test_frame_roundtrip", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_frame_roundtrip") + + sql.to_sql( + test_frame1, + "test_frame_roundtrip", + con=conn, + index=False, + chunksize=2, + ) + result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=conn) + tm.assert_frame_equal(result, test_frame1) + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable_iris) +def test_api_execute_sql(conn, request): + # drop_sql = "DROP TABLE IF EXISTS test" # should already be done + conn = request.getfixturevalue(conn) + with sql.pandasSQL_builder(conn) as pandas_sql: + iris_results = pandas_sql.execute("SELECT * FROM iris") row = iris_results.fetchone() - tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"]) + tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"]) - def test_date_parsing(self): - # Test date parsing in read_sql - # No Parsing - df = sql.read_sql_query("SELECT * FROM types", self.conn) + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_date_parsing(conn, request): + conn_name = conn + if conn_name in {"sqlite_buildin", "sqlite_str"}: + pytest.skip("types tables not created in sqlite_buildin or sqlite_str fixture") + + conn = request.getfixturevalue(conn) + # Test date parsing in read_sql + # No Parsing + df = sql.read_sql_query("SELECT * FROM types", conn) + if not ("mysql" in conn_name or "postgres" in conn_name): assert not issubclass(df.DateCol.dtype.type, np.datetime64) - df = sql.read_sql_query( - "SELECT * FROM types", self.conn, parse_dates=["DateCol"] - ) - assert issubclass(df.DateCol.dtype.type, np.datetime64) - assert df.DateCol.tolist() == [ - Timestamp(2000, 1, 3, 0, 0, 0), - Timestamp(2000, 1, 4, 0, 0, 0), - ] + df = sql.read_sql_query("SELECT * FROM types", conn, parse_dates=["DateCol"]) + assert issubclass(df.DateCol.dtype.type, np.datetime64) + assert df.DateCol.tolist() == [ + Timestamp(2000, 1, 3, 0, 0, 0), + Timestamp(2000, 1, 4, 0, 0, 0), + ] - df = sql.read_sql_query( - "SELECT * FROM types", - self.conn, - parse_dates={"DateCol": "%Y-%m-%d %H:%M:%S"}, - ) - assert issubclass(df.DateCol.dtype.type, np.datetime64) - assert df.DateCol.tolist() == [ - Timestamp(2000, 1, 3, 0, 0, 0), - Timestamp(2000, 1, 4, 0, 0, 0), - ] + df = sql.read_sql_query( + "SELECT * FROM types", + conn, + parse_dates={"DateCol": "%Y-%m-%d %H:%M:%S"}, + ) + assert issubclass(df.DateCol.dtype.type, np.datetime64) + assert df.DateCol.tolist() == [ + Timestamp(2000, 1, 3, 0, 0, 0), + Timestamp(2000, 1, 4, 0, 0, 0), + ] - df = sql.read_sql_query( - "SELECT * FROM types", self.conn, parse_dates=["IntDateCol"] - ) - assert issubclass(df.IntDateCol.dtype.type, np.datetime64) - assert df.IntDateCol.tolist() == [ - Timestamp(1986, 12, 25, 0, 0, 0), - Timestamp(2013, 1, 1, 0, 0, 0), - ] + df = sql.read_sql_query("SELECT * FROM types", conn, parse_dates=["IntDateCol"]) + assert issubclass(df.IntDateCol.dtype.type, np.datetime64) + assert df.IntDateCol.tolist() == [ + Timestamp(1986, 12, 25, 0, 0, 0), + Timestamp(2013, 1, 1, 0, 0, 0), + ] + + df = sql.read_sql_query( + "SELECT * FROM types", conn, parse_dates={"IntDateCol": "s"} + ) + assert issubclass(df.IntDateCol.dtype.type, np.datetime64) + assert df.IntDateCol.tolist() == [ + Timestamp(1986, 12, 25, 0, 0, 0), + Timestamp(2013, 1, 1, 0, 0, 0), + ] + + df = sql.read_sql_query( + "SELECT * FROM types", + conn, + parse_dates={"IntDateOnlyCol": "%Y%m%d"}, + ) + assert issubclass(df.IntDateOnlyCol.dtype.type, np.datetime64) + assert df.IntDateOnlyCol.tolist() == [ + Timestamp("2010-10-10"), + Timestamp("2010-12-12"), + ] - df = sql.read_sql_query( - "SELECT * FROM types", self.conn, parse_dates={"IntDateCol": "s"} - ) - assert issubclass(df.IntDateCol.dtype.type, np.datetime64) - assert df.IntDateCol.tolist() == [ - Timestamp(1986, 12, 25, 0, 0, 0), - Timestamp(2013, 1, 1, 0, 0, 0), - ] - df = sql.read_sql_query( +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +@pytest.mark.parametrize("error", ["ignore", "raise", "coerce"]) +@pytest.mark.parametrize( + "read_sql, text, mode", + [ + (sql.read_sql, "SELECT * FROM types", ("sqlalchemy", "fallback")), + (sql.read_sql, "types", ("sqlalchemy")), + ( + sql.read_sql_query, "SELECT * FROM types", - self.conn, - parse_dates={"IntDateOnlyCol": "%Y%m%d"}, - ) - assert issubclass(df.IntDateOnlyCol.dtype.type, np.datetime64) - assert df.IntDateOnlyCol.tolist() == [ - Timestamp("2010-10-10"), - Timestamp("2010-12-12"), - ] + ("sqlalchemy", "fallback"), + ), + (sql.read_sql_table, "types", ("sqlalchemy")), + ], +) +def test_api_custom_dateparsing_error( + conn, request, read_sql, text, mode, error, types_data_frame +): + conn_name = conn + if conn_name in {"sqlite_buildin", "sqlite_str"}: + pytest.skip("types tables not created in sqlite_buildin or sqlite_str fixture") - @pytest.mark.parametrize("error", ["ignore", "raise", "coerce"]) - @pytest.mark.parametrize( - "read_sql, text, mode", - [ - (sql.read_sql, "SELECT * FROM types", ("sqlalchemy", "fallback")), - (sql.read_sql, "types", ("sqlalchemy")), - ( - sql.read_sql_query, - "SELECT * FROM types", - ("sqlalchemy", "fallback"), - ), - (sql.read_sql_table, "types", ("sqlalchemy")), - ], + conn = request.getfixturevalue(conn) + + expected = types_data_frame.astype({"DateCol": "datetime64[ns]"}) + + result = read_sql( + text, + con=conn, + parse_dates={ + "DateCol": {"errors": error}, + }, ) - def test_custom_dateparsing_error( - self, read_sql, text, mode, error, types_data_frame - ): - if self.mode in mode: - expected = types_data_frame.astype({"DateCol": "datetime64[ns]"}) + if "postgres" in conn_name: + # TODO: clean up types_data_frame fixture + result = result.drop(columns=["DateColWithTz"]) + result["BoolCol"] = result["BoolCol"].astype(int) + result["BoolColWithNull"] = result["BoolColWithNull"].astype(float) - result = read_sql( - text, - con=self.conn, - parse_dates={ - "DateCol": {"errors": error}, - }, - ) + tm.assert_frame_equal(result, expected) - tm.assert_frame_equal(result, expected) - def test_date_and_index(self): - # Test case where same column appears in parse_date and index_col +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_date_and_index(conn, request): + # Test case where same column appears in parse_date and index_col + conn_name = conn + if conn_name in {"sqlite_buildin", "sqlite_str"}: + pytest.skip("types tables not created in sqlite_buildin or sqlite_str fixture") - df = sql.read_sql_query( - "SELECT * FROM types", - self.conn, - index_col="DateCol", - parse_dates=["DateCol", "IntDateCol"], - ) + conn = request.getfixturevalue(conn) + df = sql.read_sql_query( + "SELECT * FROM types", + conn, + index_col="DateCol", + parse_dates=["DateCol", "IntDateCol"], + ) - assert issubclass(df.index.dtype.type, np.datetime64) - assert issubclass(df.IntDateCol.dtype.type, np.datetime64) + assert issubclass(df.index.dtype.type, np.datetime64) + assert issubclass(df.IntDateCol.dtype.type, np.datetime64) - def test_timedelta(self): - # see #6921 - df = to_timedelta(Series(["00:00:01", "00:00:03"], name="foo")).to_frame() - with tm.assert_produces_warning(UserWarning): - result_count = df.to_sql(name="test_timedelta", con=self.conn) - assert result_count == 2 - result = sql.read_sql_query("SELECT * FROM test_timedelta", self.conn) - tm.assert_series_equal(result["foo"], df["foo"].view("int64")) - - def test_complex_raises(self): - df = DataFrame({"a": [1 + 1j, 2j]}) - msg = "Complex datatypes not supported" - with pytest.raises(ValueError, match=msg): - assert df.to_sql("test_complex", con=self.conn) is None - @pytest.mark.parametrize( - "index_name,index_label,expected", - [ - # no index name, defaults to 'index' - (None, None, "index"), - # specifying index_label - (None, "other_label", "other_label"), - # using the index name - ("index_name", None, "index_name"), - # has index name, but specifying index_label - ("index_name", "other_label", "other_label"), - # index name is integer - (0, None, "0"), - # index name is None but index label is integer - (None, 0, "0"), - ], - ) - def test_to_sql_index_label(self, index_name, index_label, expected): - temp_frame = DataFrame({"col1": range(4)}) - temp_frame.index.name = index_name - query = "SELECT * FROM test_index_label" - sql.to_sql(temp_frame, "test_index_label", self.conn, index_label=index_label) - frame = sql.read_sql_query(query, self.conn) - assert frame.columns[0] == expected - - def test_to_sql_index_label_multiindex(self): - expected_row_count = 4 - temp_frame = DataFrame( - {"col1": range(4)}, - index=MultiIndex.from_product([("A0", "A1"), ("B0", "B1")]), - ) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_timedelta(conn, request): + # see #6921 + conn = request.getfixturevalue(conn) + if sql.has_table("test_timedelta", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_timedelta") - # no index name, defaults to 'level_0' and 'level_1' - result = sql.to_sql(temp_frame, "test_index_label", self.conn) - assert result == expected_row_count - frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn) - assert frame.columns[0] == "level_0" - assert frame.columns[1] == "level_1" + df = to_timedelta(Series(["00:00:01", "00:00:03"], name="foo")).to_frame() + with tm.assert_produces_warning(UserWarning): + result_count = df.to_sql(name="test_timedelta", con=conn) + assert result_count == 2 + result = sql.read_sql_query("SELECT * FROM test_timedelta", conn) + tm.assert_series_equal(result["foo"], df["foo"].view("int64")) + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_complex_raises(conn, request): + conn = request.getfixturevalue(conn) + df = DataFrame({"a": [1 + 1j, 2j]}) + msg = "Complex datatypes not supported" + with pytest.raises(ValueError, match=msg): + assert df.to_sql("test_complex", con=conn) is None - # specifying index_label - result = sql.to_sql( - temp_frame, - "test_index_label", - self.conn, - if_exists="replace", - index_label=["A", "B"], - ) - assert result == expected_row_count - frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn) - assert frame.columns[:2].tolist() == ["A", "B"] +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +@pytest.mark.parametrize( + "index_name,index_label,expected", + [ + # no index name, defaults to 'index' + (None, None, "index"), + # specifying index_label + (None, "other_label", "other_label"), # using the index name - temp_frame.index.names = ["A", "B"] - result = sql.to_sql( - temp_frame, "test_index_label", self.conn, if_exists="replace" + ("index_name", None, "index_name"), + # has index name, but specifying index_label + ("index_name", "other_label", "other_label"), + # index name is integer + (0, None, "0"), + # index name is None but index label is integer + (None, 0, "0"), + ], +) +def test_api_to_sql_index_label(conn, request, index_name, index_label, expected): + conn = request.getfixturevalue(conn) + if sql.has_table("test_index_label", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_index_label") + + temp_frame = DataFrame({"col1": range(4)}) + temp_frame.index.name = index_name + query = "SELECT * FROM test_index_label" + sql.to_sql(temp_frame, "test_index_label", conn, index_label=index_label) + frame = sql.read_sql_query(query, conn) + assert frame.columns[0] == expected + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_to_sql_index_label_multiindex(conn, request): + conn_name = conn + if "mysql" in conn_name: + request.node.add_marker( + pytest.mark.xfail(reason="MySQL can fail using TEXT without length as key") ) - assert result == expected_row_count - frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn) - assert frame.columns[:2].tolist() == ["A", "B"] - # has index name, but specifying index_label - result = sql.to_sql( + conn = request.getfixturevalue(conn) + if sql.has_table("test_index_label", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_index_label") + + expected_row_count = 4 + temp_frame = DataFrame( + {"col1": range(4)}, + index=MultiIndex.from_product([("A0", "A1"), ("B0", "B1")]), + ) + + # no index name, defaults to 'level_0' and 'level_1' + result = sql.to_sql(temp_frame, "test_index_label", conn) + assert result == expected_row_count + frame = sql.read_sql_query("SELECT * FROM test_index_label", conn) + assert frame.columns[0] == "level_0" + assert frame.columns[1] == "level_1" + + # specifying index_label + result = sql.to_sql( + temp_frame, + "test_index_label", + conn, + if_exists="replace", + index_label=["A", "B"], + ) + assert result == expected_row_count + frame = sql.read_sql_query("SELECT * FROM test_index_label", conn) + assert frame.columns[:2].tolist() == ["A", "B"] + + # using the index name + temp_frame.index.names = ["A", "B"] + result = sql.to_sql(temp_frame, "test_index_label", conn, if_exists="replace") + assert result == expected_row_count + frame = sql.read_sql_query("SELECT * FROM test_index_label", conn) + assert frame.columns[:2].tolist() == ["A", "B"] + + # has index name, but specifying index_label + result = sql.to_sql( + temp_frame, + "test_index_label", + conn, + if_exists="replace", + index_label=["C", "D"], + ) + assert result == expected_row_count + frame = sql.read_sql_query("SELECT * FROM test_index_label", conn) + assert frame.columns[:2].tolist() == ["C", "D"] + + msg = "Length of 'index_label' should match number of levels, which is 2" + with pytest.raises(ValueError, match=msg): + sql.to_sql( temp_frame, "test_index_label", - self.conn, + conn, if_exists="replace", - index_label=["C", "D"], + index_label="C", ) - assert result == expected_row_count - frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn) - assert frame.columns[:2].tolist() == ["C", "D"] - - msg = "Length of 'index_label' should match number of levels, which is 2" - with pytest.raises(ValueError, match=msg): - sql.to_sql( - temp_frame, - "test_index_label", - self.conn, - if_exists="replace", - index_label="C", - ) - def test_multiindex_roundtrip(self): - df = DataFrame.from_records( - [(1, 2.1, "line1"), (2, 1.5, "line2")], - columns=["A", "B", "C"], - index=["A", "B"], - ) - df.to_sql(name="test_multiindex_roundtrip", con=self.conn) - result = sql.read_sql_query( - "SELECT * FROM test_multiindex_roundtrip", self.conn, index_col=["A", "B"] - ) - tm.assert_frame_equal(df, result, check_index_type=True) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_multiindex_roundtrip(conn, request): + conn = request.getfixturevalue(conn) + if sql.has_table("test_multiindex_roundtrip", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_multiindex_roundtrip") + + df = DataFrame.from_records( + [(1, 2.1, "line1"), (2, 1.5, "line2")], + columns=["A", "B", "C"], + index=["A", "B"], + ) - @pytest.mark.parametrize( - "dtype", - [ - None, - int, - float, - {"A": int, "B": float}, - ], + df.to_sql(name="test_multiindex_roundtrip", con=conn) + result = sql.read_sql_query( + "SELECT * FROM test_multiindex_roundtrip", conn, index_col=["A", "B"] ) - def test_dtype_argument(self, dtype): - # GH10285 Add dtype argument to read_sql_query - df = DataFrame([[1.2, 3.4], [5.6, 7.8]], columns=["A", "B"]) - assert df.to_sql(name="test_dtype_argument", con=self.conn) == 2 - - expected = df.astype(dtype) - result = sql.read_sql_query( - "SELECT A, B FROM test_dtype_argument", con=self.conn, dtype=dtype - ) + tm.assert_frame_equal(df, result, check_index_type=True) - tm.assert_frame_equal(result, expected) - def test_integer_col_names(self): - df = DataFrame([[1, 2], [3, 4]], columns=[0, 1]) - sql.to_sql(df, "test_frame_integer_col_names", self.conn, if_exists="replace") +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +@pytest.mark.parametrize( + "dtype", + [ + None, + int, + float, + {"A": int, "B": float}, + ], +) +def test_api_dtype_argument(conn, request, dtype): + # GH10285 Add dtype argument to read_sql_query + conn_name = conn + conn = request.getfixturevalue(conn) + if sql.has_table("test_dtype_argument", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_dtype_argument") - def test_get_schema(self, test_frame1): - create_sql = sql.get_schema(test_frame1, "test", con=self.conn) - assert "CREATE" in create_sql + df = DataFrame([[1.2, 3.4], [5.6, 7.8]], columns=["A", "B"]) + assert df.to_sql(name="test_dtype_argument", con=conn) == 2 - def test_get_schema_with_schema(self, test_frame1): - # GH28486 - create_sql = sql.get_schema(test_frame1, "test", con=self.conn, schema="pypi") - assert "CREATE TABLE pypi." in create_sql + expected = df.astype(dtype) - def test_get_schema_dtypes(self): - if self.mode == "sqlalchemy": - from sqlalchemy import Integer + if "postgres" in conn_name: + query = 'SELECT "A", "B" FROM test_dtype_argument' + else: + query = "SELECT A, B FROM test_dtype_argument" + result = sql.read_sql_query(query, con=conn, dtype=dtype) - dtype = Integer - else: - dtype = "INTEGER" + tm.assert_frame_equal(result, expected) + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_integer_col_names(conn, request): + conn = request.getfixturevalue(conn) + df = DataFrame([[1, 2], [3, 4]], columns=[0, 1]) + sql.to_sql(df, "test_frame_integer_col_names", conn, if_exists="replace") + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_get_schema(conn, request, test_frame1): + conn = request.getfixturevalue(conn) + create_sql = sql.get_schema(test_frame1, "test", con=conn) + assert "CREATE" in create_sql + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_get_schema_with_schema(conn, request, test_frame1): + # GH28486 + conn = request.getfixturevalue(conn) + create_sql = sql.get_schema(test_frame1, "test", con=conn, schema="pypi") + assert "CREATE TABLE pypi." in create_sql + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_get_schema_dtypes(conn, request): + conn_name = conn + conn = request.getfixturevalue(conn) + float_frame = DataFrame({"a": [1.1, 1.2], "b": [2.1, 2.2]}) + + if conn_name == "sqlite_buildin": + dtype = "INTEGER" + else: + from sqlalchemy import Integer + + dtype = Integer + create_sql = sql.get_schema(float_frame, "test", con=conn, dtype={"b": dtype}) + assert "CREATE" in create_sql + assert "INTEGER" in create_sql - float_frame = DataFrame({"a": [1.1, 1.2], "b": [2.1, 2.2]}) - create_sql = sql.get_schema( - float_frame, "test", con=self.conn, dtype={"b": dtype} - ) - assert "CREATE" in create_sql - assert "INTEGER" in create_sql - def test_get_schema_keys(self, test_frame1): - frame = DataFrame({"Col1": [1.1, 1.2], "Col2": [2.1, 2.2]}) - create_sql = sql.get_schema(frame, "test", con=self.conn, keys="Col1") +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_get_schema_keys(conn, request, test_frame1): + conn_name = conn + conn = request.getfixturevalue(conn) + frame = DataFrame({"Col1": [1.1, 1.2], "Col2": [2.1, 2.2]}) + create_sql = sql.get_schema(frame, "test", con=conn, keys="Col1") + + if "mysql" in conn_name: + constraint_sentence = "CONSTRAINT test_pk PRIMARY KEY (`Col1`)" + else: constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("Col1")' - assert constraint_sentence in create_sql + assert constraint_sentence in create_sql - # multiple columns as key (GH10385) - create_sql = sql.get_schema(test_frame1, "test", con=self.conn, keys=["A", "B"]) + # multiple columns as key (GH10385) + create_sql = sql.get_schema(test_frame1, "test", con=conn, keys=["A", "B"]) + if "mysql" in conn_name: + constraint_sentence = "CONSTRAINT test_pk PRIMARY KEY (`A`, `B`)" + else: constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("A", "B")' - assert constraint_sentence in create_sql + assert constraint_sentence in create_sql - def test_chunksize_read(self): - df = DataFrame( - np.random.default_rng(2).standard_normal((22, 5)), columns=list("abcde") - ) - df.to_sql(name="test_chunksize", con=self.conn, index=False) - # reading the query in one time - res1 = sql.read_sql_query("select * from test_chunksize", self.conn) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_chunksize_read(conn, request): + conn_name = conn + conn = request.getfixturevalue(conn) + if sql.has_table("test_chunksize", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_chunksize") + + df = DataFrame( + np.random.default_rng(2).standard_normal((22, 5)), columns=list("abcde") + ) + df.to_sql(name="test_chunksize", con=conn, index=False) + + # reading the query in one time + res1 = sql.read_sql_query("select * from test_chunksize", conn) + + # reading the query in chunks with read_sql_query + res2 = DataFrame() + i = 0 + sizes = [5, 5, 5, 5, 2] + + for chunk in sql.read_sql_query("select * from test_chunksize", conn, chunksize=5): + res2 = concat([res2, chunk], ignore_index=True) + assert len(chunk) == sizes[i] + i += 1 - # reading the query in chunks with read_sql_query - res2 = DataFrame() + tm.assert_frame_equal(res1, res2) + + # reading the query in chunks with read_sql_query + if conn_name == "sqlite_buildin": + with pytest.raises(NotImplementedError, match=""): + sql.read_sql_table("test_chunksize", conn, chunksize=5) + else: + res3 = DataFrame() i = 0 sizes = [5, 5, 5, 5, 2] - for chunk in sql.read_sql_query( - "select * from test_chunksize", self.conn, chunksize=5 - ): - res2 = concat([res2, chunk], ignore_index=True) + for chunk in sql.read_sql_table("test_chunksize", conn, chunksize=5): + res3 = concat([res3, chunk], ignore_index=True) assert len(chunk) == sizes[i] i += 1 - tm.assert_frame_equal(res1, res2) + tm.assert_frame_equal(res1, res3) - # reading the query in chunks with read_sql_query - if self.mode == "sqlalchemy": - res3 = DataFrame() - i = 0 - sizes = [5, 5, 5, 5, 2] - for chunk in sql.read_sql_table("test_chunksize", self.conn, chunksize=5): - res3 = concat([res3, chunk], ignore_index=True) - assert len(chunk) == sizes[i] - i += 1 +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_categorical(conn, request): + # GH8624 + # test that categorical gets written correctly as dense column + conn = request.getfixturevalue(conn) + if sql.has_table("test_categorical", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_categorical") - tm.assert_frame_equal(res1, res3) + df = DataFrame( + { + "person_id": [1, 2, 3], + "person_name": ["John P. Doe", "Jane Dove", "John P. Doe"], + } + ) + df2 = df.copy() + df2["person_name"] = df2["person_name"].astype("category") - def test_categorical(self): - # GH8624 - # test that categorical gets written correctly as dense column - df = DataFrame( - { - "person_id": [1, 2, 3], - "person_name": ["John P. Doe", "Jane Dove", "John P. Doe"], - } - ) - df2 = df.copy() - df2["person_name"] = df2["person_name"].astype("category") + df2.to_sql(name="test_categorical", con=conn, index=False) + res = sql.read_sql_query("SELECT * FROM test_categorical", conn) - df2.to_sql(name="test_categorical", con=self.conn, index=False) - res = sql.read_sql_query("SELECT * FROM test_categorical", self.conn) + tm.assert_frame_equal(res, df) - tm.assert_frame_equal(res, df) - def test_unicode_column_name(self): - # GH 11431 - df = DataFrame([[1, 2], [3, 4]], columns=["\xe9", "b"]) - df.to_sql(name="test_unicode", con=self.conn, index=False) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_unicode_column_name(conn, request): + # GH 11431 + conn = request.getfixturevalue(conn) + if sql.has_table("test_unicode", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_unicode") - def test_escaped_table_name(self): - # GH 13206 - df = DataFrame({"A": [0, 1, 2], "B": [0.2, np.nan, 5.6]}) - df.to_sql(name="d1187b08-4943-4c8d-a7f6", con=self.conn, index=False) + df = DataFrame([[1, 2], [3, 4]], columns=["\xe9", "b"]) + df.to_sql(name="test_unicode", con=conn, index=False) - res = sql.read_sql_query("SELECT * FROM `d1187b08-4943-4c8d-a7f6`", self.conn) - tm.assert_frame_equal(res, df) +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_escaped_table_name(conn, request): + # GH 13206 + conn_name = conn + conn = request.getfixturevalue(conn) + if sql.has_table("d1187b08-4943-4c8d-a7f6", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("d1187b08-4943-4c8d-a7f6") - def test_read_sql_duplicate_columns(self): - # GH#53117 - df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3], "c": 1}) - df.to_sql(name="test_table", con=self.conn, index=False) + df = DataFrame({"A": [0, 1, 2], "B": [0.2, np.nan, 5.6]}) + df.to_sql(name="d1187b08-4943-4c8d-a7f6", con=conn, index=False) - result = pd.read_sql("SELECT a, b, a +1 as a, c FROM test_table;", self.conn) - expected = DataFrame( - [[1, 0.1, 2, 1], [2, 0.2, 3, 1], [3, 0.3, 4, 1]], - columns=["a", "b", "a", "c"], - ) - tm.assert_frame_equal(result, expected) + if "postgres" in conn_name: + query = 'SELECT * FROM "d1187b08-4943-4c8d-a7f6"' + else: + query = "SELECT * FROM `d1187b08-4943-4c8d-a7f6`" + res = sql.read_sql_query(query, conn) + + tm.assert_frame_equal(res, df) + + +@pytest.mark.db +@pytest.mark.parametrize("conn", all_connectable) +def test_api_read_sql_duplicate_columns(conn, request): + # GH#53117 + conn = request.getfixturevalue(conn) + if sql.has_table("test_table", conn): + with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: + pandasSQL.drop_table("test_table") + + df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3], "c": 1}) + df.to_sql(name="test_table", con=conn, index=False) + + result = pd.read_sql("SELECT a, b, a +1 as a, c FROM test_table;", conn) + expected = DataFrame( + [[1, 0.1, 2, 1], [2, 0.2, 3, 1], [3, 0.3, 4, 1]], + columns=["a", "b", "a", "c"], + ) + tm.assert_frame_equal(result, expected) + + +class _TestSQLApi(PandasSQLTest): + """ + Base class to test the public API. + + From this two classes are derived to run these tests for both the + sqlalchemy mode (`TestSQLApi`) and the fallback mode + (`TestSQLiteFallbackApi`). These tests are run with sqlite3. Specific + tests for the different sql flavours are included in `_TestSQLAlchemy`. + + Notes: + flavor can always be passed even in SQLAlchemy mode, + should be correctly ignored. + + we don't use drop_table because that isn't part of the public api + + """ + + flavor = "sqlite" + mode: str + + @pytest.fixture(autouse=True) + def setup_method(self, iris_path, types_data): + self.conn = self.connect() + self.load_iris_data(iris_path) + self.load_types_data(types_data) + self.load_test_data_and_sql() + + def load_test_data_and_sql(self): + create_and_load_iris_view(self.conn) @pytest.mark.skipif(not SQLALCHEMY_INSTALLED, reason="SQLAlchemy not installed")