Skip to content

Commit b2b6417

Browse files
schettino72Pingviinituutti
authored andcommitted
ENH: to_sql() add parameter "method" to control insertions method (pandas-dev#8… (pandas-dev#21401)
* ENH: to_sql() add parameter "method" to control insertions method (pandas-dev#8953) * ENH: to_sql() add parameter "method". Fix docstrings (pandas-dev#8953) * ENH: to_sql() add parameter "method". Improve docs based on reviews (pandas-dev#8953) * ENH: to_sql() add parameter "method". Fix unit-test (pandas-dev#8953) * doc clean-up * additional doc clean-up * use dict(zip()) directly * clean up merge * default --> None * Remove stray default * Remove method kwarg * change default to None * test copy insert snippit * print debug * index=False * Add reference to documentation
1 parent 83b902c commit b2b6417

File tree

5 files changed

+199
-18
lines changed

5 files changed

+199
-18
lines changed

doc/source/io.rst

+48
Original file line numberDiff line numberDiff line change
@@ -4989,6 +4989,54 @@ with respect to the timezone.
49894989
timezone aware or naive. When reading ``TIMESTAMP WITH TIME ZONE`` types, pandas
49904990
will convert the data to UTC.
49914991

4992+
.. _io.sql.method:
4993+
4994+
Insertion Method
4995+
++++++++++++++++
4996+
4997+
.. versionadded:: 0.24.0
4998+
4999+
The parameter ``method`` controls the SQL insertion clause used.
5000+
Possible values are:
5001+
5002+
- ``None``: Uses standard SQL ``INSERT`` clause (one per row).
5003+
- ``'multi'``: Pass multiple values in a single ``INSERT`` clause.
5004+
It uses a *special* SQL syntax not supported by all backends.
5005+
This usually provides better performance for analytic databases
5006+
like *Presto* and *Redshift*, but has worse performance for
5007+
traditional SQL backend if the table contains many columns.
5008+
For more information check the SQLAlchemy `documention
5009+
<http://docs.sqlalchemy.org/en/latest/core/dml.html#sqlalchemy.sql.expression.Insert.values.params.*args>`__.
5010+
- callable with signature ``(pd_table, conn, keys, data_iter)``:
5011+
This can be used to implement a more performant insertion method based on
5012+
specific backend dialect features.
5013+
5014+
Example of a callable using PostgreSQL `COPY clause
5015+
<https://www.postgresql.org/docs/current/static/sql-copy.html>`__::
5016+
5017+
# Alternative to_sql() *method* for DBs that support COPY FROM
5018+
import csv
5019+
from io import StringIO
5020+
5021+
def psql_insert_copy(table, conn, keys, data_iter):
5022+
# gets a DBAPI connection that can provide a cursor
5023+
dbapi_conn = conn.connection
5024+
with dbapi_conn.cursor() as cur:
5025+
s_buf = StringIO()
5026+
writer = csv.writer(s_buf)
5027+
writer.writerows(data_iter)
5028+
s_buf.seek(0)
5029+
5030+
columns = ', '.join('"{}"'.format(k) for k in keys)
5031+
if table.schema:
5032+
table_name = '{}.{}'.format(table.schema, table.name)
5033+
else:
5034+
table_name = table.name
5035+
5036+
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
5037+
table_name, columns)
5038+
cur.copy_expert(sql=sql, file=s_buf)
5039+
49925040
Reading Tables
49935041
''''''''''''''
49945042

doc/source/whatsnew/v0.24.0.rst

+1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ Other Enhancements
377377
- :meth:`DataFrame.between_time` and :meth:`DataFrame.at_time` have gained the ``axis`` parameter (:issue:`8839`)
378378
- The ``scatter_matrix``, ``andrews_curves``, ``parallel_coordinates``, ``lag_plot``, ``autocorrelation_plot``, ``bootstrap_plot``, and ``radviz`` plots from the ``pandas.plotting`` module are now accessible from calling :meth:`DataFrame.plot` (:issue:`11978`)
379379
- :class:`IntervalIndex` has gained the :attr:`~IntervalIndex.is_overlapping` attribute to indicate if the ``IntervalIndex`` contains any overlapping intervals (:issue:`23309`)
380+
- :func:`pandas.DataFrame.to_sql` has gained the ``method`` argument to control SQL insertion clause. See the :ref:`insertion method <io.sql.method>` section in the documentation. (:issue:`8953`)
380381

381382
.. _whatsnew_0240.api_breaking:
382383

pandas/core/generic.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -2386,7 +2386,7 @@ def to_msgpack(self, path_or_buf=None, encoding='utf-8', **kwargs):
23862386
**kwargs)
23872387

23882388
def to_sql(self, name, con, schema=None, if_exists='fail', index=True,
2389-
index_label=None, chunksize=None, dtype=None):
2389+
index_label=None, chunksize=None, dtype=None, method=None):
23902390
"""
23912391
Write records stored in a DataFrame to a SQL database.
23922392
@@ -2424,6 +2424,17 @@ def to_sql(self, name, con, schema=None, if_exists='fail', index=True,
24242424
Specifying the datatype for columns. The keys should be the column
24252425
names and the values should be the SQLAlchemy types or strings for
24262426
the sqlite3 legacy mode.
2427+
method : {None, 'multi', callable}, default None
2428+
Controls the SQL insertion clause used:
2429+
2430+
* None : Uses standard SQL ``INSERT`` clause (one per row).
2431+
* 'multi': Pass multiple values in a single ``INSERT`` clause.
2432+
* callable with signature ``(pd_table, conn, keys, data_iter)``.
2433+
2434+
Details and a sample callable implementation can be found in the
2435+
section :ref:`insert method <io.sql.method>`.
2436+
2437+
.. versionadded:: 0.24.0
24272438
24282439
Raises
24292440
------
@@ -2505,7 +2516,7 @@ def to_sql(self, name, con, schema=None, if_exists='fail', index=True,
25052516
from pandas.io import sql
25062517
sql.to_sql(self, name, con, schema=schema, if_exists=if_exists,
25072518
index=index, index_label=index_label, chunksize=chunksize,
2508-
dtype=dtype)
2519+
dtype=dtype, method=method)
25092520

25102521
def to_pickle(self, path, compression='infer',
25112522
protocol=pkl.HIGHEST_PROTOCOL):

pandas/io/sql.py

+75-13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from contextlib import contextmanager
1010
from datetime import date, datetime, time
11+
from functools import partial
1112
import re
1213
import warnings
1314

@@ -395,7 +396,7 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
395396

396397

397398
def to_sql(frame, name, con, schema=None, if_exists='fail', index=True,
398-
index_label=None, chunksize=None, dtype=None):
399+
index_label=None, chunksize=None, dtype=None, method=None):
399400
"""
400401
Write records stored in a DataFrame to a SQL database.
401402
@@ -429,6 +430,17 @@ def to_sql(frame, name, con, schema=None, if_exists='fail', index=True,
429430
Optional specifying the datatype for columns. The SQL type should
430431
be a SQLAlchemy type, or a string for sqlite3 fallback connection.
431432
If all columns are of the same type, one single value can be used.
433+
method : {None, 'multi', callable}, default None
434+
Controls the SQL insertion clause used:
435+
436+
- None : Uses standard SQL ``INSERT`` clause (one per row).
437+
- 'multi': Pass multiple values in a single ``INSERT`` clause.
438+
- callable with signature ``(pd_table, conn, keys, data_iter)``.
439+
440+
Details and a sample callable implementation can be found in the
441+
section :ref:`insert method <io.sql.method>`.
442+
443+
.. versionadded:: 0.24.0
432444
"""
433445
if if_exists not in ('fail', 'replace', 'append'):
434446
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
@@ -443,7 +455,7 @@ def to_sql(frame, name, con, schema=None, if_exists='fail', index=True,
443455

444456
pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
445457
index_label=index_label, schema=schema,
446-
chunksize=chunksize, dtype=dtype)
458+
chunksize=chunksize, dtype=dtype, method=method)
447459

448460

449461
def has_table(table_name, con, schema=None):
@@ -568,8 +580,29 @@ def create(self):
568580
else:
569581
self._execute_create()
570582

571-
def insert_statement(self):
572-
return self.table.insert()
583+
def _execute_insert(self, conn, keys, data_iter):
584+
"""Execute SQL statement inserting data
585+
586+
Parameters
587+
----------
588+
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
589+
keys : list of str
590+
Column names
591+
data_iter : generator of list
592+
Each item contains a list of values to be inserted
593+
"""
594+
data = [dict(zip(keys, row)) for row in data_iter]
595+
conn.execute(self.table.insert(), data)
596+
597+
def _execute_insert_multi(self, conn, keys, data_iter):
598+
"""Alternative to _execute_insert for DBs support multivalue INSERT.
599+
600+
Note: multi-value insert is usually faster for analytics DBs
601+
and tables containing a few columns
602+
but performance degrades quickly with increase of columns.
603+
"""
604+
data = [dict(zip(keys, row)) for row in data_iter]
605+
conn.execute(self.table.insert(data))
573606

574607
def insert_data(self):
575608
if self.index is not None:
@@ -612,11 +645,18 @@ def insert_data(self):
612645

613646
return column_names, data_list
614647

615-
def _execute_insert(self, conn, keys, data_iter):
616-
data = [dict(zip(keys, row)) for row in data_iter]
617-
conn.execute(self.insert_statement(), data)
648+
def insert(self, chunksize=None, method=None):
649+
650+
# set insert method
651+
if method is None:
652+
exec_insert = self._execute_insert
653+
elif method == 'multi':
654+
exec_insert = self._execute_insert_multi
655+
elif callable(method):
656+
exec_insert = partial(method, self)
657+
else:
658+
raise ValueError('Invalid parameter `method`: {}'.format(method))
618659

619-
def insert(self, chunksize=None):
620660
keys, data_list = self.insert_data()
621661

622662
nrows = len(self.frame)
@@ -639,7 +679,7 @@ def insert(self, chunksize=None):
639679
break
640680

641681
chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
642-
self._execute_insert(conn, keys, chunk_iter)
682+
exec_insert(conn, keys, chunk_iter)
643683

644684
def _query_iterator(self, result, chunksize, columns, coerce_float=True,
645685
parse_dates=None):
@@ -1085,7 +1125,8 @@ def read_query(self, sql, index_col=None, coerce_float=True,
10851125
read_sql = read_query
10861126

10871127
def to_sql(self, frame, name, if_exists='fail', index=True,
1088-
index_label=None, schema=None, chunksize=None, dtype=None):
1128+
index_label=None, schema=None, chunksize=None, dtype=None,
1129+
method=None):
10891130
"""
10901131
Write records stored in a DataFrame to a SQL database.
10911132
@@ -1115,7 +1156,17 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
11151156
Optional specifying the datatype for columns. The SQL type should
11161157
be a SQLAlchemy type. If all columns are of the same type, one
11171158
single value can be used.
1159+
method : {None', 'multi', callable}, default None
1160+
Controls the SQL insertion clause used:
1161+
1162+
* None : Uses standard SQL ``INSERT`` clause (one per row).
1163+
* 'multi': Pass multiple values in a single ``INSERT`` clause.
1164+
* callable with signature ``(pd_table, conn, keys, data_iter)``.
1165+
1166+
Details and a sample callable implementation can be found in the
1167+
section :ref:`insert method <io.sql.method>`.
11181168
1169+
.. versionadded:: 0.24.0
11191170
"""
11201171
if dtype and not is_dict_like(dtype):
11211172
dtype = {col_name: dtype for col_name in frame}
@@ -1131,7 +1182,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
11311182
if_exists=if_exists, index_label=index_label,
11321183
schema=schema, dtype=dtype)
11331184
table.create()
1134-
table.insert(chunksize)
1185+
table.insert(chunksize, method=method)
11351186
if (not name.isdigit() and not name.islower()):
11361187
# check for potentially case sensitivity issues (GH7815)
11371188
# Only check when name is not a number and name is not lower case
@@ -1442,7 +1493,8 @@ def _fetchall_as_list(self, cur):
14421493
return result
14431494

14441495
def to_sql(self, frame, name, if_exists='fail', index=True,
1445-
index_label=None, schema=None, chunksize=None, dtype=None):
1496+
index_label=None, schema=None, chunksize=None, dtype=None,
1497+
method=None):
14461498
"""
14471499
Write records stored in a DataFrame to a SQL database.
14481500
@@ -1471,7 +1523,17 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
14711523
Optional specifying the datatype for columns. The SQL type should
14721524
be a string. If all columns are of the same type, one single value
14731525
can be used.
1526+
method : {None, 'multi', callable}, default None
1527+
Controls the SQL insertion clause used:
1528+
1529+
* None : Uses standard SQL ``INSERT`` clause (one per row).
1530+
* 'multi': Pass multiple values in a single ``INSERT`` clause.
1531+
* callable with signature ``(pd_table, conn, keys, data_iter)``.
1532+
1533+
Details and a sample callable implementation can be found in the
1534+
section :ref:`insert method <io.sql.method>`.
14741535
1536+
.. versionadded:: 0.24.0
14751537
"""
14761538
if dtype and not is_dict_like(dtype):
14771539
dtype = {col_name: dtype for col_name in frame}
@@ -1486,7 +1548,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
14861548
if_exists=if_exists, index_label=index_label,
14871549
dtype=dtype)
14881550
table.create()
1489-
table.insert(chunksize)
1551+
table.insert(chunksize, method)
14901552

14911553
def has_table(self, name, schema=None):
14921554
# TODO(wesm): unused?

pandas/tests/io/test_sql.py

+62-3
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,16 @@ def _read_sql_iris_named_parameter(self):
375375
iris_frame = self.pandasSQL.read_query(query, params=params)
376376
self._check_iris_loaded_frame(iris_frame)
377377

378-
def _to_sql(self):
378+
def _to_sql(self, method=None):
379379
self.drop_table('test_frame1')
380380

381-
self.pandasSQL.to_sql(self.test_frame1, 'test_frame1')
381+
self.pandasSQL.to_sql(self.test_frame1, 'test_frame1', method=method)
382382
assert self.pandasSQL.has_table('test_frame1')
383383

384+
num_entries = len(self.test_frame1)
385+
num_rows = self._count_rows('test_frame1')
386+
assert num_rows == num_entries
387+
384388
# Nuke table
385389
self.drop_table('test_frame1')
386390

@@ -434,6 +438,25 @@ def _to_sql_append(self):
434438
assert num_rows == num_entries
435439
self.drop_table('test_frame1')
436440

441+
def _to_sql_method_callable(self):
442+
check = [] # used to double check function below is really being used
443+
444+
def sample(pd_table, conn, keys, data_iter):
445+
check.append(1)
446+
data = [dict(zip(keys, row)) for row in data_iter]
447+
conn.execute(pd_table.table.insert(), data)
448+
self.drop_table('test_frame1')
449+
450+
self.pandasSQL.to_sql(self.test_frame1, 'test_frame1', method=sample)
451+
assert self.pandasSQL.has_table('test_frame1')
452+
453+
assert check == [1]
454+
num_entries = len(self.test_frame1)
455+
num_rows = self._count_rows('test_frame1')
456+
assert num_rows == num_entries
457+
# Nuke table
458+
self.drop_table('test_frame1')
459+
437460
def _roundtrip(self):
438461
self.drop_table('test_frame_roundtrip')
439462
self.pandasSQL.to_sql(self.test_frame1, 'test_frame_roundtrip')
@@ -1193,7 +1216,7 @@ def setup_connect(self):
11931216
pytest.skip(
11941217
"Can't connect to {0} server".format(self.flavor))
11951218

1196-
def test_aread_sql(self):
1219+
def test_read_sql(self):
11971220
self._read_sql_iris()
11981221

11991222
def test_read_sql_parameter(self):
@@ -1217,6 +1240,12 @@ def test_to_sql_replace(self):
12171240
def test_to_sql_append(self):
12181241
self._to_sql_append()
12191242

1243+
def test_to_sql_method_multi(self):
1244+
self._to_sql(method='multi')
1245+
1246+
def test_to_sql_method_callable(self):
1247+
self._to_sql_method_callable()
1248+
12201249
def test_create_table(self):
12211250
temp_conn = self.connect()
12221251
temp_frame = DataFrame(
@@ -1930,6 +1959,36 @@ def test_schema_support(self):
19301959
res2 = pdsql.read_table('test_schema_other2')
19311960
tm.assert_frame_equal(res1, res2)
19321961

1962+
def test_copy_from_callable_insertion_method(self):
1963+
# GH 8953
1964+
# Example in io.rst found under _io.sql.method
1965+
# not available in sqlite, mysql
1966+
def psql_insert_copy(table, conn, keys, data_iter):
1967+
# gets a DBAPI connection that can provide a cursor
1968+
dbapi_conn = conn.connection
1969+
with dbapi_conn.cursor() as cur:
1970+
s_buf = compat.StringIO()
1971+
writer = csv.writer(s_buf)
1972+
writer.writerows(data_iter)
1973+
s_buf.seek(0)
1974+
1975+
columns = ', '.join('"{}"'.format(k) for k in keys)
1976+
if table.schema:
1977+
table_name = '{}.{}'.format(table.schema, table.name)
1978+
else:
1979+
table_name = table.name
1980+
1981+
sql_query = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
1982+
table_name, columns)
1983+
cur.copy_expert(sql=sql_query, file=s_buf)
1984+
1985+
expected = DataFrame({'col1': [1, 2], 'col2': [0.1, 0.2],
1986+
'col3': ['a', 'n']})
1987+
expected.to_sql('test_copy_insert', self.conn, index=False,
1988+
method=psql_insert_copy)
1989+
result = sql.read_sql_table('test_copy_insert', self.conn)
1990+
tm.assert_frame_equal(result, expected)
1991+
19331992

19341993
@pytest.mark.single
19351994
class TestMySQLAlchemy(_TestMySQLAlchemy, _TestSQLAlchemy):

0 commit comments

Comments
 (0)