-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
BUG: Allow read_sql to work with chunksize. #49967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,10 @@ | |
ABC, | ||
abstractmethod, | ||
) | ||
from contextlib import contextmanager | ||
from contextlib import ( | ||
ExitStack, | ||
contextmanager, | ||
) | ||
from datetime import ( | ||
date, | ||
datetime, | ||
|
@@ -69,6 +72,14 @@ | |
# -- Helper functions | ||
|
||
|
||
def _cleanup_after_generator(generator, exit_stack: ExitStack): | ||
"""Does the cleanup after iterating through the generator.""" | ||
try: | ||
yield from generator | ||
finally: | ||
exit_stack.close() | ||
|
||
|
||
def _convert_params(sql, params): | ||
"""Convert SQL and params args to DBAPI2.0 compliant format.""" | ||
args = [sql] | ||
|
@@ -772,12 +783,11 @@ def has_table(table_name: str, con, schema: str | None = None) -> bool: | |
table_exists = has_table | ||
|
||
|
||
@contextmanager | ||
def pandasSQL_builder( | ||
con, | ||
schema: str | None = None, | ||
need_transaction: bool = False, | ||
) -> Iterator[PandasSQL]: | ||
) -> PandasSQL: | ||
""" | ||
Convenience function to return the correct PandasSQL subclass based on the | ||
provided parameters. Also creates a sqlalchemy connection and transaction | ||
|
@@ -786,45 +796,24 @@ def pandasSQL_builder( | |
import sqlite3 | ||
|
||
if isinstance(con, sqlite3.Connection) or con is None: | ||
yield SQLiteDatabase(con) | ||
else: | ||
sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore") | ||
return SQLiteDatabase(con) | ||
|
||
if sqlalchemy is not None and isinstance( | ||
con, (str, sqlalchemy.engine.Connectable) | ||
): | ||
with _sqlalchemy_con(con, need_transaction) as con: | ||
yield SQLDatabase(con, schema=schema) | ||
elif isinstance(con, str) and sqlalchemy is None: | ||
raise ImportError("Using URI string without sqlalchemy installed.") | ||
else: | ||
|
||
warnings.warn( | ||
"pandas only supports SQLAlchemy connectable (engine/connection) or " | ||
"database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 " | ||
"objects are not tested. Please consider using SQLAlchemy.", | ||
UserWarning, | ||
stacklevel=find_stack_level() + 2, | ||
) | ||
yield SQLiteDatabase(con) | ||
sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore") | ||
|
||
if isinstance(con, str) and sqlalchemy is None: | ||
raise ImportError("Using URI string without sqlalchemy installed.") | ||
|
||
@contextmanager | ||
def _sqlalchemy_con(connectable, need_transaction: bool): | ||
"""Create a sqlalchemy connection and a transaction if necessary.""" | ||
sqlalchemy = import_optional_dependency("sqlalchemy", errors="raise") | ||
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)): | ||
return SQLDatabase(con, schema, need_transaction) | ||
|
||
if isinstance(connectable, str): | ||
connectable = sqlalchemy.create_engine(connectable) | ||
if isinstance(connectable, sqlalchemy.engine.Engine): | ||
with connectable.connect() as con: | ||
if need_transaction: | ||
with con.begin(): | ||
yield con | ||
else: | ||
yield con | ||
else: | ||
yield connectable | ||
warnings.warn( | ||
"pandas only supports SQLAlchemy connectable (engine/connection) or " | ||
"database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 " | ||
"objects are not tested. Please consider using SQLAlchemy.", | ||
UserWarning, | ||
stacklevel=find_stack_level(), | ||
) | ||
return SQLiteDatabase(con) | ||
|
||
|
||
class SQLTable(PandasObject): | ||
|
@@ -1049,6 +1038,7 @@ def _query_iterator( | |
|
||
def read( | ||
self, | ||
exit_stack: ExitStack, | ||
coerce_float: bool = True, | ||
parse_dates=None, | ||
columns=None, | ||
|
@@ -1069,13 +1059,16 @@ def read( | |
column_names = result.keys() | ||
|
||
if chunksize is not None: | ||
return self._query_iterator( | ||
result, | ||
chunksize, | ||
column_names, | ||
coerce_float=coerce_float, | ||
parse_dates=parse_dates, | ||
use_nullable_dtypes=use_nullable_dtypes, | ||
return _cleanup_after_generator( | ||
self._query_iterator( | ||
result, | ||
chunksize, | ||
column_names, | ||
coerce_float=coerce_float, | ||
parse_dates=parse_dates, | ||
use_nullable_dtypes=use_nullable_dtypes, | ||
), | ||
exit_stack, | ||
) | ||
else: | ||
data = result.fetchall() | ||
|
@@ -1327,6 +1320,12 @@ class PandasSQL(PandasObject, ABC): | |
Subclasses Should define read_query and to_sql. | ||
""" | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, *args) -> None: | ||
pass | ||
|
||
def read_table( | ||
self, | ||
table_name: str, | ||
|
@@ -1482,20 +1481,38 @@ class SQLDatabase(PandasSQL): | |
|
||
Parameters | ||
---------- | ||
con : SQLAlchemy Connection | ||
Connection to connect with the database. Using SQLAlchemy makes it | ||
con : SQLAlchemy Connectable or URI string. | ||
Connectable to connect with the database. Using SQLAlchemy makes it | ||
possible to use any DB supported by that library. | ||
schema : string, default None | ||
Name of SQL schema in database to write to (if database flavor | ||
supports this). If None, use default schema (default). | ||
need_transaction : bool, default False | ||
If True, SQLDatabase will create a transaction. | ||
|
||
""" | ||
|
||
def __init__(self, con, schema: str | None = None) -> None: | ||
def __init__( | ||
self, con, schema: str | None = None, need_transaction: bool = False | ||
) -> None: | ||
from sqlalchemy import create_engine | ||
from sqlalchemy.engine import Engine | ||
from sqlalchemy.schema import MetaData | ||
|
||
self.exit_stack = ExitStack() | ||
if isinstance(con, str): | ||
con = create_engine(con) | ||
if isinstance(con, Engine): | ||
con = self.exit_stack.enter_context(con.connect()) | ||
if need_transaction: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any chance that this is related to the autocommit warnings we are getting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this makes sense. If we get a connection that was not begun yet, we see the warning since the implicit autocommit handled this previously There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and in case the connection is already in a transaction, my other PR modifies the line |
||
self.exit_stack.enter_context(con.begin()) | ||
self.con = con | ||
self.meta = MetaData(schema=schema) | ||
self.returns_generator = False | ||
|
||
def __exit__(self, *args) -> None: | ||
if not self.returns_generator: | ||
self.exit_stack.close() | ||
|
||
@contextmanager | ||
def run_transaction(self): | ||
|
@@ -1566,7 +1583,10 @@ def read_table( | |
""" | ||
self.meta.reflect(bind=self.con, only=[table_name]) | ||
table = SQLTable(table_name, self, index=index_col, schema=schema) | ||
if chunksize is not None: | ||
self.returns_generator = True | ||
return table.read( | ||
self.exit_stack, | ||
coerce_float=coerce_float, | ||
parse_dates=parse_dates, | ||
columns=columns, | ||
|
@@ -1675,15 +1695,19 @@ def read_query( | |
columns = result.keys() | ||
|
||
if chunksize is not None: | ||
return self._query_iterator( | ||
result, | ||
chunksize, | ||
columns, | ||
index_col=index_col, | ||
coerce_float=coerce_float, | ||
parse_dates=parse_dates, | ||
dtype=dtype, | ||
use_nullable_dtypes=use_nullable_dtypes, | ||
self.returns_generator = True | ||
return _cleanup_after_generator( | ||
self._query_iterator( | ||
result, | ||
chunksize, | ||
columns, | ||
index_col=index_col, | ||
coerce_float=coerce_float, | ||
parse_dates=parse_dates, | ||
dtype=dtype, | ||
use_nullable_dtypes=use_nullable_dtypes, | ||
), | ||
self.exit_stack, | ||
) | ||
else: | ||
data = result.fetchall() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might have slightly preferred if
_query_iterator
usedself.exit_stack
directly in the functions, but could be a follow up e.g.