Skip to content

ENH: add chunksize argument to read_sql (GH2908) #8330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3411,6 +3411,18 @@ Of course, you can specify a more "complex" query.
pd.read_sql_query("SELECT id, Col_1, Col_2 FROM data WHERE id = 42;", engine)
The func:`~pandas.read_sql_query` function supports a ``chunksize`` argument.
Specifying this will return an iterator through chunks of the query result:

.. ipython:: python
df = pd.DataFrame(np.random.randn(20, 3), columns=list('abc'))
df.to_sql('data_chunks', engine, index=False)
.. ipython:: python
for chunk in pd.read_sql_query("SELECT * FROM data_chunks", engine, chunksize):
print(chunk)
You can also run a plain query without creating a dataframe with
:func:`~pandas.io.sql.execute`. This is useful for queries that don't return values,
Expand Down
3 changes: 2 additions & 1 deletion doc/source/v0.15.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,8 @@ Deprecations
Enhancements
~~~~~~~~~~~~

- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`)
- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`).
- Added support for a ``chunksize`` parameter to ``read_sql`` function. Specifying this argument will return an iterator through chunks of the query result (:issue:`2908`).
- Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`).
- Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`).
For example:
Expand Down
192 changes: 142 additions & 50 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DatabaseError(IOError):


#------------------------------------------------------------------------------
# Helper functions
#--- Helper functions

_SQLALCHEMY_INSTALLED = None

Expand Down Expand Up @@ -115,6 +115,21 @@ def _parse_date_columns(data_frame, parse_dates):
return data_frame


def _wrap_result(data, columns, index_col=None, coerce_float=True,
parse_dates=None):
"""Wrap result set of query in a DataFrame """

frame = DataFrame.from_records(data, columns=columns,
coerce_float=coerce_float)

_parse_date_columns(frame, parse_dates)

if index_col is not None:
frame.set_index(index_col, inplace=True)

return frame


def execute(sql, con, cur=None, params=None):
"""
Execute the given SQL query using the provided connection object.
Expand Down Expand Up @@ -262,7 +277,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None):
#--- Read and write to DataFrames

def read_sql_table(table_name, con, schema=None, index_col=None,
coerce_float=True, parse_dates=None, columns=None):
coerce_float=True, parse_dates=None, columns=None,
chunksize=None):
"""Read SQL database table into a DataFrame.
Given a table name and an SQLAlchemy engine, returns a DataFrame.
Expand Down Expand Up @@ -293,6 +309,9 @@ def read_sql_table(table_name, con, schema=None, index_col=None,
such as SQLite
columns : list
List of column names to select from sql table
chunksize : int, default None
If specified, return an iterator where `chunksize` is the number of
rows to include in each chunk.
Returns
-------
Expand All @@ -318,7 +337,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None,
pandas_sql = SQLDatabase(con, meta=meta)
table = pandas_sql.read_table(
table_name, index_col=index_col, coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns)
parse_dates=parse_dates, columns=columns, chunksize=chunksize)

if table is not None:
return table
Expand All @@ -327,7 +346,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None,


def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
parse_dates=None):
parse_dates=None, chunksize=None):
"""Read SQL query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query
Expand Down Expand Up @@ -362,6 +381,9 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite
chunksize : int, default None
If specified, return an iterator where `chunksize` is the number of
rows to include in each chunk.
Returns
-------
Expand All @@ -376,11 +398,11 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
pandas_sql = pandasSQL_builder(con)
return pandas_sql.read_query(
sql, index_col=index_col, params=params, coerce_float=coerce_float,
parse_dates=parse_dates)
parse_dates=parse_dates, chunksize=chunksize)


def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
parse_dates=None, columns=None):
parse_dates=None, columns=None, chunksize=None):
"""
Read SQL query or database table into a DataFrame.
Expand Down Expand Up @@ -415,6 +437,9 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
columns : list
List of column names to select from sql table (only used when reading
a table).
chunksize : int, default None
If specified, return an iterator where `chunksize` is the
number of rows to include in each chunk.
Returns
-------
Expand All @@ -438,7 +463,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
if isinstance(pandas_sql, SQLiteDatabase):
return pandas_sql.read_query(
sql, index_col=index_col, params=params,
coerce_float=coerce_float, parse_dates=parse_dates)
coerce_float=coerce_float, parse_dates=parse_dates,
chunksize=chunksize)

try:
_is_table_name = pandas_sql.has_table(sql)
Expand All @@ -449,11 +475,12 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
pandas_sql.meta.reflect(only=[sql])
return pandas_sql.read_table(
sql, index_col=index_col, coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns)
parse_dates=parse_dates, columns=columns, chunksize=chunksize)
else:
return pandas_sql.read_query(
sql, index_col=index_col, params=params,
coerce_float=coerce_float, parse_dates=parse_dates)
coerce_float=coerce_float, parse_dates=parse_dates,
chunksize=chunksize)


def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail',
Expand Down Expand Up @@ -684,7 +711,27 @@ def insert(self, chunksize=None):
chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
self._execute_insert(conn, keys, chunk_iter)

def read(self, coerce_float=True, parse_dates=None, columns=None):
def _query_iterator(self, result, chunksize, columns, coerce_float=True,
parse_dates=None):
"""Return generator through chunked result set"""

while True:
data = result.fetchmany(chunksize)
if not data:
break
else:
self.frame = DataFrame.from_records(
data, columns=columns, coerce_float=coerce_float)

self._harmonize_columns(parse_dates=parse_dates)

if self.index is not None:
self.frame.set_index(self.index, inplace=True)

yield self.frame

def read(self, coerce_float=True, parse_dates=None, columns=None,
chunksize=None):

if columns is not None and len(columns) > 0:
from sqlalchemy import select
Expand All @@ -696,18 +743,23 @@ def read(self, coerce_float=True, parse_dates=None, columns=None):
sql_select = self.table.select()

result = self.pd_sql.execute(sql_select)
data = result.fetchall()
column_names = result.keys()

self.frame = DataFrame.from_records(
data, columns=column_names, coerce_float=coerce_float)
if chunksize is not None:
return self._query_iterator(result, chunksize, column_names,
coerce_float=coerce_float,
parse_dates=parse_dates)
else:
data = result.fetchall()
self.frame = DataFrame.from_records(
data, columns=column_names, coerce_float=coerce_float)

self._harmonize_columns(parse_dates=parse_dates)
self._harmonize_columns(parse_dates=parse_dates)

if self.index is not None:
self.frame.set_index(self.index, inplace=True)
if self.index is not None:
self.frame.set_index(self.index, inplace=True)

return self.frame
return self.frame

def _index_name(self, index, index_label):
# for writing: index=True to include index in sql table
Expand Down Expand Up @@ -898,8 +950,8 @@ class SQLDatabase(PandasSQL):
Parameters
----------
engine : SQLAlchemy engine
Engine to connect with the database. Using SQLAlchemy makes it possible to use any DB supported by that
library.
Engine to connect with the database. Using SQLAlchemy makes it
possible to use any DB supported by that library.
schema : string, default None
Name of SQL schema in database to write to (if database flavor
supports this). If None, use default schema (default).
Expand All @@ -926,25 +978,27 @@ def execute(self, *args, **kwargs):
return self.engine.execute(*args, **kwargs)

def read_table(self, table_name, index_col=None, coerce_float=True,
parse_dates=None, columns=None, schema=None):
parse_dates=None, columns=None, schema=None,
chunksize=None):
"""Read SQL database table into a DataFrame.
Parameters
----------
table_name : string
Name of SQL table in database
index_col : string, optional
Column to set as index
coerce_float : boolean, default True
Attempt to convert values to non-string, non-numeric objects (like
decimal.Decimal) to floating point. Can result in loss of Precision.
Attempt to convert values to non-string, non-numeric objects
(like decimal.Decimal) to floating point. This can result in
loss of precision.
parse_dates : list or dict
- List of column names to parse as dates
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
- Dict of ``{column_name: arg}``, where the arg corresponds
to the keyword arguments of :func:`pandas.to_datetime`.
Especially useful with databases without native Datetime support,
such as SQLite
columns : list
Expand All @@ -953,6 +1007,9 @@ def read_table(self, table_name, index_col=None, coerce_float=True,
Name of SQL schema in database to query (if database flavor
supports this). If specified, this overwrites the default
schema of the SQLDatabase object.
chunksize : int, default None
If specified, return an iterator where `chunksize` is the number
of rows to include in each chunk.
Returns
-------
Expand All @@ -966,10 +1023,25 @@ def read_table(self, table_name, index_col=None, coerce_float=True,
"""
table = SQLTable(table_name, self, index=index_col, schema=schema)
return table.read(coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns)

parse_dates=parse_dates, columns=columns,
chunksize=chunksize)

@staticmethod
def _query_iterator(result, chunksize, columns, index_col=None,
coerce_float=True, parse_dates=None):
"""Return generator through chunked result set"""

while True:
data = result.fetchmany(chunksize)
if not data:
break
else:
yield _wrap_result(data, columns, index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates)

def read_query(self, sql, index_col=None, coerce_float=True,
parse_dates=None, params=None):
parse_dates=None, params=None, chunksize=None):
"""Read SQL query into a DataFrame.
Parameters
Expand Down Expand Up @@ -1006,30 +1078,31 @@ def read_query(self, sql, index_col=None, coerce_float=True,
read_sql_table : Read SQL database table into a DataFrame
read_sql
"""
"""
args = _convert_params(sql, params)

result = self.execute(*args)
data = result.fetchall()
columns = result.keys()

data_frame = DataFrame.from_records(
data, columns=columns, coerce_float=coerce_float)

_parse_date_columns(data_frame, parse_dates)

if index_col is not None:
data_frame.set_index(index_col, inplace=True)
if chunksize is not None:
return self._query_iterator(result, chunksize, columns,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates)
else:
data = result.fetchall()
frame = _wrap_result(data, columns, index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates)
return frame

return data_frame

read_sql = read_query

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, schema=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.
Parameters
----------
frame : DataFrame
Expand Down Expand Up @@ -1308,23 +1381,42 @@ def execute(self, *args, **kwargs):
ex = DatabaseError("Execution failed on sql '%s': %s" % (args[0], exc))
raise_with_traceback(ex)

@staticmethod
def _query_iterator(cursor, chunksize, columns, index_col=None,
coerce_float=True, parse_dates=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is their a reason this is a static_method? can u share this code with the other usage of this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason is that it didn't use self, so I made it explicitely static. Problem is that the version is different for the two classes, so that's why I put it in the class instead of just a general helper function like _wrap_result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make it a module level function then? (and _wrap results as well); or an use a TableIterator class and make these methods (see Pytables.py and read_csv does this too)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_wrap_results is already a module level function (if I understand correctly what you mean)

Problem with making it a module level function, is that I would have to make two versions of the function (see the two _query_iterator methods now), one for each class that uses it (PandasSQLALchemy and PandasSQLLegacy). So for that it seemed more logical to put it in the class itself.

On implementing an Iterator class, I had seen the implementation in pytables, but it seemed a lot more complex than what I needed here. The simple _query_iterator functions seems to do all I need. Is there an advantage in making it a class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your class would be simpler (and u could inherit methods)
and bundle wrap_results - really just preference - up 2 u

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I am going to leave it now as is, can always later look at implementing it as a class, but like to get this in

"""Return generator through chunked result set"""

while True:
data = cursor.fetchmany(chunksize)
if not data:
cursor.close()
break
else:
yield _wrap_result(data, columns, index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates)

def read_query(self, sql, index_col=None, coerce_float=True, params=None,
parse_dates=None):
parse_dates=None, chunksize=None):

args = _convert_params(sql, params)
cursor = self.execute(*args)
columns = [col_desc[0] for col_desc in cursor.description]
data = self._fetchall_as_list(cursor)
cursor.close()

data_frame = DataFrame.from_records(
data, columns=columns, coerce_float=coerce_float)
if chunksize is not None:
return self._query_iterator(cursor, chunksize, columns,
index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates)
else:
data = self._fetchall_as_list(cursor)
cursor.close()

_parse_date_columns(data_frame, parse_dates)
frame = _wrap_result(data, columns, index_col=index_col,
coerce_float=coerce_float,
parse_dates=parse_dates)
return frame

if index_col is not None:
data_frame.set_index(index_col, inplace=True)
return data_frame

def _fetchall_as_list(self, cur):
result = cur.fetchall()
if not isinstance(result, list):
Expand Down
Loading