Skip to content

Commit feb5d4c

Browse files
ENH: add chunksize argument to read_sql (GH2908)
1 parent 7c15827 commit feb5d4c

File tree

4 files changed

+193
-51
lines changed

4 files changed

+193
-51
lines changed

doc/source/io.rst

+12
Original file line numberDiff line numberDiff line change
@@ -3411,6 +3411,18 @@ Of course, you can specify a more "complex" query.
34113411
34123412
pd.read_sql_query("SELECT id, Col_1, Col_2 FROM data WHERE id = 42;", engine)
34133413
3414+
The func:`~pandas.read_sql_query` function supports a ``chunksize`` argument.
3415+
Specifying this will return an iterator through chunks of the query result:
3416+
3417+
.. ipython:: python
3418+
3419+
df = pd.DataFrame(np.random.randn(20, 3), columns=list('abc'))
3420+
df.to_sql('data_chunks', engine, index=False)
3421+
3422+
.. ipython:: python
3423+
3424+
for chunk in pd.read_sql_query("SELECT * FROM data_chunks", engine, chunksize):
3425+
print(chunk)
34143426
34153427
You can also run a plain query without creating a dataframe with
34163428
:func:`~pandas.io.sql.execute`. This is useful for queries that don't return values,

doc/source/v0.15.0.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,8 @@ Deprecations
801801
Enhancements
802802
~~~~~~~~~~~~
803803

804-
- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`)
804+
- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`).
805+
- Added support for a ``chunksize`` parameter to ``read_sql`` function. Specifying this argument will return an iterator through chunks of the query result (:issue:`2908`).
805806
- Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`).
806807
- Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`).
807808
For example:

pandas/io/sql.py

+142-50
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class DatabaseError(IOError):
3232

3333

3434
#------------------------------------------------------------------------------
35-
# Helper functions
35+
#--- Helper functions
3636

3737
_SQLALCHEMY_INSTALLED = None
3838

@@ -115,6 +115,21 @@ def _parse_date_columns(data_frame, parse_dates):
115115
return data_frame
116116

117117

118+
def _wrap_result(data, columns, index_col=None, coerce_float=True,
119+
parse_dates=None):
120+
"""Wrap result set of query in a DataFrame """
121+
122+
frame = DataFrame.from_records(data, columns=columns,
123+
coerce_float=coerce_float)
124+
125+
_parse_date_columns(frame, parse_dates)
126+
127+
if index_col is not None:
128+
frame.set_index(index_col, inplace=True)
129+
130+
return frame
131+
132+
118133
def execute(sql, con, cur=None, params=None):
119134
"""
120135
Execute the given SQL query using the provided connection object.
@@ -262,7 +277,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None):
262277
#--- Read and write to DataFrames
263278

264279
def read_sql_table(table_name, con, schema=None, index_col=None,
265-
coerce_float=True, parse_dates=None, columns=None):
280+
coerce_float=True, parse_dates=None, columns=None,
281+
chunksize=None):
266282
"""Read SQL database table into a DataFrame.
267283
268284
Given a table name and an SQLAlchemy engine, returns a DataFrame.
@@ -293,6 +309,9 @@ def read_sql_table(table_name, con, schema=None, index_col=None,
293309
such as SQLite
294310
columns : list
295311
List of column names to select from sql table
312+
chunksize : int, default None
313+
If specified, return an iterator where `chunksize` is the number of
314+
rows to include in each chunk.
296315
297316
Returns
298317
-------
@@ -318,7 +337,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None,
318337
pandas_sql = SQLDatabase(con, meta=meta)
319338
table = pandas_sql.read_table(
320339
table_name, index_col=index_col, coerce_float=coerce_float,
321-
parse_dates=parse_dates, columns=columns)
340+
parse_dates=parse_dates, columns=columns, chunksize=chunksize)
322341

323342
if table is not None:
324343
return table
@@ -327,7 +346,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None,
327346

328347

329348
def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
330-
parse_dates=None):
349+
parse_dates=None, chunksize=None):
331350
"""Read SQL query into a DataFrame.
332351
333352
Returns a DataFrame corresponding to the result set of the query
@@ -362,6 +381,9 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
362381
to the keyword arguments of :func:`pandas.to_datetime`
363382
Especially useful with databases without native Datetime support,
364383
such as SQLite
384+
chunksize : int, default None
385+
If specified, return an iterator where `chunksize` is the number of
386+
rows to include in each chunk.
365387
366388
Returns
367389
-------
@@ -376,11 +398,11 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
376398
pandas_sql = pandasSQL_builder(con)
377399
return pandas_sql.read_query(
378400
sql, index_col=index_col, params=params, coerce_float=coerce_float,
379-
parse_dates=parse_dates)
401+
parse_dates=parse_dates, chunksize=chunksize)
380402

381403

382404
def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
383-
parse_dates=None, columns=None):
405+
parse_dates=None, columns=None, chunksize=None):
384406
"""
385407
Read SQL query or database table into a DataFrame.
386408
@@ -415,6 +437,9 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
415437
columns : list
416438
List of column names to select from sql table (only used when reading
417439
a table).
440+
chunksize : int, default None
441+
If specified, return an iterator where `chunksize` is the
442+
number of rows to include in each chunk.
418443
419444
Returns
420445
-------
@@ -438,7 +463,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
438463
if isinstance(pandas_sql, SQLiteDatabase):
439464
return pandas_sql.read_query(
440465
sql, index_col=index_col, params=params,
441-
coerce_float=coerce_float, parse_dates=parse_dates)
466+
coerce_float=coerce_float, parse_dates=parse_dates,
467+
chunksize=chunksize)
442468

443469
try:
444470
_is_table_name = pandas_sql.has_table(sql)
@@ -449,11 +475,12 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
449475
pandas_sql.meta.reflect(only=[sql])
450476
return pandas_sql.read_table(
451477
sql, index_col=index_col, coerce_float=coerce_float,
452-
parse_dates=parse_dates, columns=columns)
478+
parse_dates=parse_dates, columns=columns, chunksize=chunksize)
453479
else:
454480
return pandas_sql.read_query(
455481
sql, index_col=index_col, params=params,
456-
coerce_float=coerce_float, parse_dates=parse_dates)
482+
coerce_float=coerce_float, parse_dates=parse_dates,
483+
chunksize=chunksize)
457484

458485

459486
def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail',
@@ -684,7 +711,27 @@ def insert(self, chunksize=None):
684711
chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
685712
self._execute_insert(conn, keys, chunk_iter)
686713

687-
def read(self, coerce_float=True, parse_dates=None, columns=None):
714+
def _query_iterator(self, result, chunksize, columns, coerce_float=True,
715+
parse_dates=None):
716+
"""Return generator through chunked result set"""
717+
718+
while True:
719+
data = result.fetchmany(chunksize)
720+
if not data:
721+
break
722+
else:
723+
self.frame = DataFrame.from_records(
724+
data, columns=columns, coerce_float=coerce_float)
725+
726+
self._harmonize_columns(parse_dates=parse_dates)
727+
728+
if self.index is not None:
729+
self.frame.set_index(self.index, inplace=True)
730+
731+
yield self.frame
732+
733+
def read(self, coerce_float=True, parse_dates=None, columns=None,
734+
chunksize=None):
688735

689736
if columns is not None and len(columns) > 0:
690737
from sqlalchemy import select
@@ -696,18 +743,23 @@ def read(self, coerce_float=True, parse_dates=None, columns=None):
696743
sql_select = self.table.select()
697744

698745
result = self.pd_sql.execute(sql_select)
699-
data = result.fetchall()
700746
column_names = result.keys()
701747

702-
self.frame = DataFrame.from_records(
703-
data, columns=column_names, coerce_float=coerce_float)
748+
if chunksize is not None:
749+
return self._query_iterator(result, chunksize, column_names,
750+
coerce_float=coerce_float,
751+
parse_dates=parse_dates)
752+
else:
753+
data = result.fetchall()
754+
self.frame = DataFrame.from_records(
755+
data, columns=column_names, coerce_float=coerce_float)
704756

705-
self._harmonize_columns(parse_dates=parse_dates)
757+
self._harmonize_columns(parse_dates=parse_dates)
706758

707-
if self.index is not None:
708-
self.frame.set_index(self.index, inplace=True)
759+
if self.index is not None:
760+
self.frame.set_index(self.index, inplace=True)
709761

710-
return self.frame
762+
return self.frame
711763

712764
def _index_name(self, index, index_label):
713765
# for writing: index=True to include index in sql table
@@ -898,8 +950,8 @@ class SQLDatabase(PandasSQL):
898950
Parameters
899951
----------
900952
engine : SQLAlchemy engine
901-
Engine to connect with the database. Using SQLAlchemy makes it possible to use any DB supported by that
902-
library.
953+
Engine to connect with the database. Using SQLAlchemy makes it
954+
possible to use any DB supported by that library.
903955
schema : string, default None
904956
Name of SQL schema in database to write to (if database flavor
905957
supports this). If None, use default schema (default).
@@ -926,25 +978,27 @@ def execute(self, *args, **kwargs):
926978
return self.engine.execute(*args, **kwargs)
927979

928980
def read_table(self, table_name, index_col=None, coerce_float=True,
929-
parse_dates=None, columns=None, schema=None):
981+
parse_dates=None, columns=None, schema=None,
982+
chunksize=None):
930983
"""Read SQL database table into a DataFrame.
931-
984+
932985
Parameters
933986
----------
934987
table_name : string
935988
Name of SQL table in database
936989
index_col : string, optional
937990
Column to set as index
938991
coerce_float : boolean, default True
939-
Attempt to convert values to non-string, non-numeric objects (like
940-
decimal.Decimal) to floating point. Can result in loss of Precision.
992+
Attempt to convert values to non-string, non-numeric objects
993+
(like decimal.Decimal) to floating point. This can result in
994+
loss of precision.
941995
parse_dates : list or dict
942996
- List of column names to parse as dates
943997
- Dict of ``{column_name: format string}`` where format string is
944998
strftime compatible in case of parsing string times or is one of
945999
(D, s, ns, ms, us) in case of parsing integer timestamps
946-
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
947-
to the keyword arguments of :func:`pandas.to_datetime`
1000+
- Dict of ``{column_name: arg}``, where the arg corresponds
1001+
to the keyword arguments of :func:`pandas.to_datetime`.
9481002
Especially useful with databases without native Datetime support,
9491003
such as SQLite
9501004
columns : list
@@ -953,6 +1007,9 @@ def read_table(self, table_name, index_col=None, coerce_float=True,
9531007
Name of SQL schema in database to query (if database flavor
9541008
supports this). If specified, this overwrites the default
9551009
schema of the SQLDatabase object.
1010+
chunksize : int, default None
1011+
If specified, return an iterator where `chunksize` is the number
1012+
of rows to include in each chunk.
9561013
9571014
Returns
9581015
-------
@@ -966,10 +1023,25 @@ def read_table(self, table_name, index_col=None, coerce_float=True,
9661023
"""
9671024
table = SQLTable(table_name, self, index=index_col, schema=schema)
9681025
return table.read(coerce_float=coerce_float,
969-
parse_dates=parse_dates, columns=columns)
970-
1026+
parse_dates=parse_dates, columns=columns,
1027+
chunksize=chunksize)
1028+
1029+
@staticmethod
1030+
def _query_iterator(result, chunksize, columns, index_col=None,
1031+
coerce_float=True, parse_dates=None):
1032+
"""Return generator through chunked result set"""
1033+
1034+
while True:
1035+
data = result.fetchmany(chunksize)
1036+
if not data:
1037+
break
1038+
else:
1039+
yield _wrap_result(data, columns, index_col=index_col,
1040+
coerce_float=coerce_float,
1041+
parse_dates=parse_dates)
1042+
9711043
def read_query(self, sql, index_col=None, coerce_float=True,
972-
parse_dates=None, params=None):
1044+
parse_dates=None, params=None, chunksize=None):
9731045
"""Read SQL query into a DataFrame.
9741046
9751047
Parameters
@@ -1006,30 +1078,31 @@ def read_query(self, sql, index_col=None, coerce_float=True,
10061078
read_sql_table : Read SQL database table into a DataFrame
10071079
read_sql
10081080
1009-
"""
1081+
"""
10101082
args = _convert_params(sql, params)
10111083

10121084
result = self.execute(*args)
1013-
data = result.fetchall()
10141085
columns = result.keys()
10151086

1016-
data_frame = DataFrame.from_records(
1017-
data, columns=columns, coerce_float=coerce_float)
1018-
1019-
_parse_date_columns(data_frame, parse_dates)
1020-
1021-
if index_col is not None:
1022-
data_frame.set_index(index_col, inplace=True)
1087+
if chunksize is not None:
1088+
return self._query_iterator(result, chunksize, columns,
1089+
index_col=index_col,
1090+
coerce_float=coerce_float,
1091+
parse_dates=parse_dates)
1092+
else:
1093+
data = result.fetchall()
1094+
frame = _wrap_result(data, columns, index_col=index_col,
1095+
coerce_float=coerce_float,
1096+
parse_dates=parse_dates)
1097+
return frame
10231098

1024-
return data_frame
1025-
10261099
read_sql = read_query
10271100

10281101
def to_sql(self, frame, name, if_exists='fail', index=True,
10291102
index_label=None, schema=None, chunksize=None):
10301103
"""
10311104
Write records stored in a DataFrame to a SQL database.
1032-
1105+
10331106
Parameters
10341107
----------
10351108
frame : DataFrame
@@ -1308,23 +1381,42 @@ def execute(self, *args, **kwargs):
13081381
ex = DatabaseError("Execution failed on sql '%s': %s" % (args[0], exc))
13091382
raise_with_traceback(ex)
13101383

1384+
@staticmethod
1385+
def _query_iterator(cursor, chunksize, columns, index_col=None,
1386+
coerce_float=True, parse_dates=None):
1387+
"""Return generator through chunked result set"""
1388+
1389+
while True:
1390+
data = cursor.fetchmany(chunksize)
1391+
if not data:
1392+
cursor.close()
1393+
break
1394+
else:
1395+
yield _wrap_result(data, columns, index_col=index_col,
1396+
coerce_float=coerce_float,
1397+
parse_dates=parse_dates)
1398+
13111399
def read_query(self, sql, index_col=None, coerce_float=True, params=None,
1312-
parse_dates=None):
1400+
parse_dates=None, chunksize=None):
1401+
13131402
args = _convert_params(sql, params)
13141403
cursor = self.execute(*args)
13151404
columns = [col_desc[0] for col_desc in cursor.description]
1316-
data = self._fetchall_as_list(cursor)
1317-
cursor.close()
13181405

1319-
data_frame = DataFrame.from_records(
1320-
data, columns=columns, coerce_float=coerce_float)
1406+
if chunksize is not None:
1407+
return self._query_iterator(cursor, chunksize, columns,
1408+
index_col=index_col,
1409+
coerce_float=coerce_float,
1410+
parse_dates=parse_dates)
1411+
else:
1412+
data = self._fetchall_as_list(cursor)
1413+
cursor.close()
13211414

1322-
_parse_date_columns(data_frame, parse_dates)
1415+
frame = _wrap_result(data, columns, index_col=index_col,
1416+
coerce_float=coerce_float,
1417+
parse_dates=parse_dates)
1418+
return frame
13231419

1324-
if index_col is not None:
1325-
data_frame.set_index(index_col, inplace=True)
1326-
return data_frame
1327-
13281420
def _fetchall_as_list(self, cur):
13291421
result = cur.fetchall()
13301422
if not isinstance(result, list):

0 commit comments

Comments
 (0)