@@ -820,6 +820,117 @@ def psql_insert_copy(table, conn, keys, data_iter):
820
820
tm .assert_frame_equal (result , expected )
821
821
822
822
823
+ @pytest .mark .db
824
+ @pytest .mark .parametrize ("conn" , postgresql_connectable )
825
+ def test_insertion_method_on_conflict_do_nothing (conn , request ):
826
+ # GH 15988: Example in to_sql docstring
827
+ conn = request .getfixturevalue (conn )
828
+
829
+ from sqlalchemy .dialects .postgresql import insert
830
+ from sqlalchemy .engine import Engine
831
+ from sqlalchemy .sql import text
832
+
833
+ def insert_on_conflict (table , conn , keys , data_iter ):
834
+ data = [dict (zip (keys , row )) for row in data_iter ]
835
+ stmt = (
836
+ insert (table .table )
837
+ .values (data )
838
+ .on_conflict_do_nothing (index_elements = ["a" ])
839
+ )
840
+ result = conn .execute (stmt )
841
+ return result .rowcount
842
+
843
+ create_sql = text (
844
+ """
845
+ CREATE TABLE test_insert_conflict (
846
+ a integer PRIMARY KEY,
847
+ b numeric,
848
+ c text
849
+ );
850
+ """
851
+ )
852
+ if isinstance (conn , Engine ):
853
+ with conn .connect () as con :
854
+ with con .begin ():
855
+ con .execute (create_sql )
856
+ else :
857
+ with conn .begin ():
858
+ conn .execute (create_sql )
859
+
860
+ expected = DataFrame ([[1 , 2.1 , "a" ]], columns = list ("abc" ))
861
+ expected .to_sql ("test_insert_conflict" , conn , if_exists = "append" , index = False )
862
+
863
+ df_insert = DataFrame ([[1 , 3.2 , "b" ]], columns = list ("abc" ))
864
+ inserted = df_insert .to_sql (
865
+ "test_insert_conflict" ,
866
+ conn ,
867
+ index = False ,
868
+ if_exists = "append" ,
869
+ method = insert_on_conflict ,
870
+ )
871
+ result = sql .read_sql_table ("test_insert_conflict" , conn )
872
+ tm .assert_frame_equal (result , expected )
873
+ assert inserted == 0
874
+
875
+ # Cleanup
876
+ with sql .SQLDatabase (conn , need_transaction = True ) as pandasSQL :
877
+ pandasSQL .drop_table ("test_insert_conflict" )
878
+
879
+
880
+ @pytest .mark .db
881
+ @pytest .mark .parametrize ("conn" , mysql_connectable )
882
+ def test_insertion_method_on_conflict_update (conn , request ):
883
+ # GH 14553: Example in to_sql docstring
884
+ conn = request .getfixturevalue (conn )
885
+
886
+ from sqlalchemy .dialects .mysql import insert
887
+ from sqlalchemy .engine import Engine
888
+ from sqlalchemy .sql import text
889
+
890
+ def insert_on_conflict (table , conn , keys , data_iter ):
891
+ data = [dict (zip (keys , row )) for row in data_iter ]
892
+ stmt = insert (table .table ).values (data )
893
+ stmt = stmt .on_duplicate_key_update (b = stmt .inserted .b , c = stmt .inserted .c )
894
+ result = conn .execute (stmt )
895
+ return result .rowcount
896
+
897
+ create_sql = text (
898
+ """
899
+ CREATE TABLE test_insert_conflict (
900
+ a INT PRIMARY KEY,
901
+ b FLOAT,
902
+ c VARCHAR(10)
903
+ );
904
+ """
905
+ )
906
+ if isinstance (conn , Engine ):
907
+ with conn .connect () as con :
908
+ with con .begin ():
909
+ con .execute (create_sql )
910
+ else :
911
+ with conn .begin ():
912
+ conn .execute (create_sql )
913
+
914
+ df = DataFrame ([[1 , 2.1 , "a" ]], columns = list ("abc" ))
915
+ df .to_sql ("test_insert_conflict" , conn , if_exists = "append" , index = False )
916
+
917
+ expected = DataFrame ([[1 , 3.2 , "b" ]], columns = list ("abc" ))
918
+ inserted = expected .to_sql (
919
+ "test_insert_conflict" ,
920
+ conn ,
921
+ index = False ,
922
+ if_exists = "append" ,
923
+ method = insert_on_conflict ,
924
+ )
925
+ result = sql .read_sql_table ("test_insert_conflict" , conn )
926
+ tm .assert_frame_equal (result , expected )
927
+ assert inserted == 2
928
+
929
+ # Cleanup
930
+ with sql .SQLDatabase (conn , need_transaction = True ) as pandasSQL :
931
+ pandasSQL .drop_table ("test_insert_conflict" )
932
+
933
+
823
934
def test_execute_typeerror (sqlite_iris_engine ):
824
935
with pytest .raises (TypeError , match = "pandas.io.sql.execute requires a connection" ):
825
936
with tm .assert_produces_warning (
0 commit comments