diff --git a/doc/source/io.rst b/doc/source/io.rst index 1cd1f9f5bf10f..5490e666904f9 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3411,6 +3411,18 @@ Of course, you can specify a more "complex" query. pd.read_sql_query("SELECT id, Col_1, Col_2 FROM data WHERE id = 42;", engine) +The func:`~pandas.read_sql_query` function supports a ``chunksize`` argument. +Specifying this will return an iterator through chunks of the query result: + +.. ipython:: python + + df = pd.DataFrame(np.random.randn(20, 3), columns=list('abc')) + df.to_sql('data_chunks', engine, index=False) + +.. ipython:: python + + for chunk in pd.read_sql_query("SELECT * FROM data_chunks", engine, chunksize): + print(chunk) You can also run a plain query without creating a dataframe with :func:`~pandas.io.sql.execute`. This is useful for queries that don't return values, diff --git a/doc/source/v0.15.0.txt b/doc/source/v0.15.0.txt index cec3148a1f9fa..3cb7b7d5e8b69 100644 --- a/doc/source/v0.15.0.txt +++ b/doc/source/v0.15.0.txt @@ -801,7 +801,8 @@ Deprecations Enhancements ~~~~~~~~~~~~ -- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`) +- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`). +- Added support for a ``chunksize`` parameter to ``read_sql`` function. Specifying this argument will return an iterator through chunks of the query result (:issue:`2908`). - Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`). - Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`). For example: diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 53b664458527a..09acfcaee976b 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -32,7 +32,7 @@ class DatabaseError(IOError): #------------------------------------------------------------------------------ -# Helper functions +#--- Helper functions _SQLALCHEMY_INSTALLED = None @@ -115,6 +115,21 @@ def _parse_date_columns(data_frame, parse_dates): return data_frame +def _wrap_result(data, columns, index_col=None, coerce_float=True, + parse_dates=None): + """Wrap result set of query in a DataFrame """ + + frame = DataFrame.from_records(data, columns=columns, + coerce_float=coerce_float) + + _parse_date_columns(frame, parse_dates) + + if index_col is not None: + frame.set_index(index_col, inplace=True) + + return frame + + def execute(sql, con, cur=None, params=None): """ Execute the given SQL query using the provided connection object. @@ -262,7 +277,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None): #--- Read and write to DataFrames def read_sql_table(table_name, con, schema=None, index_col=None, - coerce_float=True, parse_dates=None, columns=None): + coerce_float=True, parse_dates=None, columns=None, + chunksize=None): """Read SQL database table into a DataFrame. Given a table name and an SQLAlchemy engine, returns a DataFrame. @@ -293,6 +309,9 @@ def read_sql_table(table_name, con, schema=None, index_col=None, such as SQLite columns : list List of column names to select from sql table + chunksize : int, default None + If specified, return an iterator where `chunksize` is the number of + rows to include in each chunk. Returns ------- @@ -318,7 +337,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None, pandas_sql = SQLDatabase(con, meta=meta) table = pandas_sql.read_table( table_name, index_col=index_col, coerce_float=coerce_float, - parse_dates=parse_dates, columns=columns) + parse_dates=parse_dates, columns=columns, chunksize=chunksize) if table is not None: return table @@ -327,7 +346,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None, def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None, - parse_dates=None): + parse_dates=None, chunksize=None): """Read SQL query into a DataFrame. Returns a DataFrame corresponding to the result set of the query @@ -362,6 +381,9 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None, to the keyword arguments of :func:`pandas.to_datetime` Especially useful with databases without native Datetime support, such as SQLite + chunksize : int, default None + If specified, return an iterator where `chunksize` is the number of + rows to include in each chunk. Returns ------- @@ -376,11 +398,11 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None, pandas_sql = pandasSQL_builder(con) return pandas_sql.read_query( sql, index_col=index_col, params=params, coerce_float=coerce_float, - parse_dates=parse_dates) + parse_dates=parse_dates, chunksize=chunksize) def read_sql(sql, con, index_col=None, coerce_float=True, params=None, - parse_dates=None, columns=None): + parse_dates=None, columns=None, chunksize=None): """ Read SQL query or database table into a DataFrame. @@ -415,6 +437,9 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, columns : list List of column names to select from sql table (only used when reading a table). + chunksize : int, default None + If specified, return an iterator where `chunksize` is the + number of rows to include in each chunk. Returns ------- @@ -438,7 +463,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, if isinstance(pandas_sql, SQLiteDatabase): return pandas_sql.read_query( sql, index_col=index_col, params=params, - coerce_float=coerce_float, parse_dates=parse_dates) + coerce_float=coerce_float, parse_dates=parse_dates, + chunksize=chunksize) try: _is_table_name = pandas_sql.has_table(sql) @@ -449,11 +475,12 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, pandas_sql.meta.reflect(only=[sql]) return pandas_sql.read_table( sql, index_col=index_col, coerce_float=coerce_float, - parse_dates=parse_dates, columns=columns) + parse_dates=parse_dates, columns=columns, chunksize=chunksize) else: return pandas_sql.read_query( sql, index_col=index_col, params=params, - coerce_float=coerce_float, parse_dates=parse_dates) + coerce_float=coerce_float, parse_dates=parse_dates, + chunksize=chunksize) def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', @@ -684,7 +711,27 @@ def insert(self, chunksize=None): chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list]) self._execute_insert(conn, keys, chunk_iter) - def read(self, coerce_float=True, parse_dates=None, columns=None): + def _query_iterator(self, result, chunksize, columns, coerce_float=True, + parse_dates=None): + """Return generator through chunked result set""" + + while True: + data = result.fetchmany(chunksize) + if not data: + break + else: + self.frame = DataFrame.from_records( + data, columns=columns, coerce_float=coerce_float) + + self._harmonize_columns(parse_dates=parse_dates) + + if self.index is not None: + self.frame.set_index(self.index, inplace=True) + + yield self.frame + + def read(self, coerce_float=True, parse_dates=None, columns=None, + chunksize=None): if columns is not None and len(columns) > 0: from sqlalchemy import select @@ -696,18 +743,23 @@ def read(self, coerce_float=True, parse_dates=None, columns=None): sql_select = self.table.select() result = self.pd_sql.execute(sql_select) - data = result.fetchall() column_names = result.keys() - self.frame = DataFrame.from_records( - data, columns=column_names, coerce_float=coerce_float) + if chunksize is not None: + return self._query_iterator(result, chunksize, column_names, + coerce_float=coerce_float, + parse_dates=parse_dates) + else: + data = result.fetchall() + self.frame = DataFrame.from_records( + data, columns=column_names, coerce_float=coerce_float) - self._harmonize_columns(parse_dates=parse_dates) + self._harmonize_columns(parse_dates=parse_dates) - if self.index is not None: - self.frame.set_index(self.index, inplace=True) + if self.index is not None: + self.frame.set_index(self.index, inplace=True) - return self.frame + return self.frame def _index_name(self, index, index_label): # for writing: index=True to include index in sql table @@ -898,8 +950,8 @@ class SQLDatabase(PandasSQL): Parameters ---------- engine : SQLAlchemy engine - Engine to connect with the database. Using SQLAlchemy makes it possible to use any DB supported by that - library. + Engine to connect with the database. Using SQLAlchemy makes it + possible to use any DB supported by that library. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If None, use default schema (default). @@ -926,9 +978,10 @@ def execute(self, *args, **kwargs): return self.engine.execute(*args, **kwargs) def read_table(self, table_name, index_col=None, coerce_float=True, - parse_dates=None, columns=None, schema=None): + parse_dates=None, columns=None, schema=None, + chunksize=None): """Read SQL database table into a DataFrame. - + Parameters ---------- table_name : string @@ -936,15 +989,16 @@ def read_table(self, table_name, index_col=None, coerce_float=True, index_col : string, optional Column to set as index coerce_float : boolean, default True - Attempt to convert values to non-string, non-numeric objects (like - decimal.Decimal) to floating point. Can result in loss of Precision. + Attempt to convert values to non-string, non-numeric objects + (like decimal.Decimal) to floating point. This can result in + loss of precision. parse_dates : list or dict - List of column names to parse as dates - Dict of ``{column_name: format string}`` where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps - - Dict of ``{column_name: arg dict}``, where the arg dict corresponds - to the keyword arguments of :func:`pandas.to_datetime` + - Dict of ``{column_name: arg}``, where the arg corresponds + to the keyword arguments of :func:`pandas.to_datetime`. Especially useful with databases without native Datetime support, such as SQLite columns : list @@ -953,6 +1007,9 @@ def read_table(self, table_name, index_col=None, coerce_float=True, Name of SQL schema in database to query (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object. + chunksize : int, default None + If specified, return an iterator where `chunksize` is the number + of rows to include in each chunk. Returns ------- @@ -966,10 +1023,25 @@ def read_table(self, table_name, index_col=None, coerce_float=True, """ table = SQLTable(table_name, self, index=index_col, schema=schema) return table.read(coerce_float=coerce_float, - parse_dates=parse_dates, columns=columns) - + parse_dates=parse_dates, columns=columns, + chunksize=chunksize) + + @staticmethod + def _query_iterator(result, chunksize, columns, index_col=None, + coerce_float=True, parse_dates=None): + """Return generator through chunked result set""" + + while True: + data = result.fetchmany(chunksize) + if not data: + break + else: + yield _wrap_result(data, columns, index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + def read_query(self, sql, index_col=None, coerce_float=True, - parse_dates=None, params=None): + parse_dates=None, params=None, chunksize=None): """Read SQL query into a DataFrame. Parameters @@ -1006,30 +1078,31 @@ def read_query(self, sql, index_col=None, coerce_float=True, read_sql_table : Read SQL database table into a DataFrame read_sql - """ + """ args = _convert_params(sql, params) result = self.execute(*args) - data = result.fetchall() columns = result.keys() - data_frame = DataFrame.from_records( - data, columns=columns, coerce_float=coerce_float) - - _parse_date_columns(data_frame, parse_dates) - - if index_col is not None: - data_frame.set_index(index_col, inplace=True) + if chunksize is not None: + return self._query_iterator(result, chunksize, columns, + index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + else: + data = result.fetchall() + frame = _wrap_result(data, columns, index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + return frame - return data_frame - read_sql = read_query def to_sql(self, frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. - + Parameters ---------- frame : DataFrame @@ -1308,23 +1381,42 @@ def execute(self, *args, **kwargs): ex = DatabaseError("Execution failed on sql '%s': %s" % (args[0], exc)) raise_with_traceback(ex) + @staticmethod + def _query_iterator(cursor, chunksize, columns, index_col=None, + coerce_float=True, parse_dates=None): + """Return generator through chunked result set""" + + while True: + data = cursor.fetchmany(chunksize) + if not data: + cursor.close() + break + else: + yield _wrap_result(data, columns, index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + def read_query(self, sql, index_col=None, coerce_float=True, params=None, - parse_dates=None): + parse_dates=None, chunksize=None): + args = _convert_params(sql, params) cursor = self.execute(*args) columns = [col_desc[0] for col_desc in cursor.description] - data = self._fetchall_as_list(cursor) - cursor.close() - data_frame = DataFrame.from_records( - data, columns=columns, coerce_float=coerce_float) + if chunksize is not None: + return self._query_iterator(cursor, chunksize, columns, + index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + else: + data = self._fetchall_as_list(cursor) + cursor.close() - _parse_date_columns(data_frame, parse_dates) + frame = _wrap_result(data, columns, index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + return frame - if index_col is not None: - data_frame.set_index(index_col, inplace=True) - return data_frame - def _fetchall_as_list(self, cur): result = cur.fetchall() if not isinstance(result, list): diff --git a/pandas/io/tests/test_sql.py b/pandas/io/tests/test_sql.py index e116a14fa9625..2099a8d0de82e 100644 --- a/pandas/io/tests/test_sql.py +++ b/pandas/io/tests/test_sql.py @@ -379,6 +379,7 @@ class _TestSQLApi(PandasSQLTest): """ flavor = 'sqlite' + mode = None def setUp(self): self.conn = self.connect() @@ -643,6 +644,40 @@ def test_get_schema(self): con=self.conn) self.assertTrue('CREATE' in create_sql) + def test_chunksize_read(self): + df = DataFrame(np.random.randn(22, 5), columns=list('abcde')) + df.to_sql('test_chunksize', self.conn, index=False) + + # reading the query in one time + res1 = sql.read_sql_query("select * from test_chunksize", self.conn) + + # reading the query in chunks with read_sql_query + res2 = DataFrame() + i = 0 + sizes = [5, 5, 5, 5, 2] + + for chunk in sql.read_sql_query("select * from test_chunksize", + self.conn, chunksize=5): + res2 = concat([res2, chunk], ignore_index=True) + self.assertEqual(len(chunk), sizes[i]) + i += 1 + + tm.assert_frame_equal(res1, res2) + + # reading the query in chunks with read_sql_query + if self.mode == 'sqlalchemy': + res3 = DataFrame() + i = 0 + sizes = [5, 5, 5, 5, 2] + + for chunk in sql.read_sql_table("test_chunksize", self.conn, + chunksize=5): + res3 = concat([res3, chunk], ignore_index=True) + self.assertEqual(len(chunk), sizes[i]) + i += 1 + + tm.assert_frame_equal(res1, res3) + class TestSQLApi(_TestSQLApi): """ @@ -653,6 +688,7 @@ class TestSQLApi(_TestSQLApi): """ flavor = 'sqlite' + mode = 'sqlalchemy' def connect(self): if SQLALCHEMY_INSTALLED: @@ -742,6 +778,7 @@ class TestSQLiteFallbackApi(_TestSQLApi): """ flavor = 'sqlite' + mode = 'fallback' def connect(self, database=":memory:"): return sqlite3.connect(database)