diff --git a/README.md b/README.md index 3b5f69912823b..ec7b8b07f3e89 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,7 @@ pip install pandas - [Cython](http://www.cython.org): Only necessary to build development version. Version 0.17.1 or higher. - [SciPy](http://www.scipy.org): miscellaneous statistical functions - [PyTables](http://www.pytables.org): necessary for HDF5-based storage +- [SQLAlchemy](http://www.sqlalchemy.org): for SQL database support. Version 0.8.1 or higher recommended. - [matplotlib](http://matplotlib.sourceforge.net/): for plotting - [statsmodels](http://statsmodels.sourceforge.net/) - Needed for parts of `pandas.stats` diff --git a/ci/requirements-2.6.txt b/ci/requirements-2.6.txt index 8199fdd9b9648..183d0a2d888f0 100644 --- a/ci/requirements-2.6.txt +++ b/ci/requirements-2.6.txt @@ -5,4 +5,5 @@ pytz==2013b http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz html5lib==1.0b2 bigquery==2.0.17 +sqlalchemy==0.8.1 numexpr==1.4.2 diff --git a/ci/requirements-2.7.txt b/ci/requirements-2.7.txt index c7cf69bc92927..f9ccc54fbbcb6 100644 --- a/ci/requirements-2.7.txt +++ b/ci/requirements-2.7.txt @@ -19,3 +19,4 @@ scipy==0.10.0 beautifulsoup4==4.2.1 statsmodels==0.5.0 bigquery==2.0.17 +sqlalchemy==0.8.1 diff --git a/ci/requirements-2.7_LOCALE.txt b/ci/requirements-2.7_LOCALE.txt index 06574cdd6b299..e45c27141c907 100644 --- a/ci/requirements-2.7_LOCALE.txt +++ b/ci/requirements-2.7_LOCALE.txt @@ -7,8 +7,6 @@ xlrd==0.9.2 numpy==1.6.1 cython==0.19.1 bottleneck==0.6.0 -numexpr==2.1 -tables==2.3.1 matplotlib==1.3.0 patsy==0.1.0 html5lib==1.0b2 @@ -17,3 +15,4 @@ scipy==0.10.0 beautifulsoup4==4.2.1 statsmodels==0.5.0 bigquery==2.0.17 +sqlalchemy==0.8.1 diff --git a/ci/requirements-3.3.txt b/ci/requirements-3.3.txt index 480fde477d88b..73009b572c4c2 100644 --- a/ci/requirements-3.3.txt +++ b/ci/requirements-3.3.txt @@ -14,3 +14,4 @@ lxml==3.2.1 scipy==0.12.0 beautifulsoup4==4.2.1 statsmodels==0.4.3 +sqlalchemy==0.9.1 diff --git a/doc/source/install.rst b/doc/source/install.rst index 631973934cc3b..f67bdc10a457f 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -95,6 +95,7 @@ Optional Dependencies version. Version 0.17.1 or higher. * `SciPy `__: miscellaneous statistical functions * `PyTables `__: necessary for HDF5-based storage + * `SQLAlchemy `__: for SQL database support. Version 0.8.1 or higher recommended. * `matplotlib `__: for plotting * `statsmodels `__ * Needed for parts of :mod:`pandas.stats` diff --git a/doc/source/io.rst b/doc/source/io.rst index 34af31747ca4a..cc354b6d134d8 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3068,13 +3068,48 @@ SQL Queries ----------- The :mod:`pandas.io.sql` module provides a collection of query wrappers to both -facilitate data retrieval and to reduce dependency on DB-specific API. These -wrappers only support the Python database adapters which respect the `Python -DB-API `__. See some -:ref:`cookbook examples ` for some advanced strategies +facilitate data retrieval and to reduce dependency on DB-specific API. Database abstraction +is provided by SQLAlchemy if installed, in addition you will need a driver library for +your database. -For example, suppose you want to query some data with different types from a -table such as: +.. versionadded:: 0.14.0 + + +If SQLAlchemy is not installed a legacy fallback is provided for sqlite and mysql. +These legacy modes require Python database adapters which respect the `Python +DB-API `__. + +See also some :ref:`cookbook examples ` for some advanced strategies. + +The key functions are: +:func:`~pandas.io.sql.to_sql` +:func:`~pandas.io.sql.read_sql` +:func:`~pandas.io.sql.read_table` + + +In the following example, we use the `SQlite `__ SQL database +engine. You can use a temporary SQLite database where data are stored in +"memory". + +To connect with SQLAlchemy you use the :func:`create_engine` function to create an engine +object from database URI. You only need to create the engine once per database you are +connecting to. + +For more information on :func:`create_engine` and the URI formatting, see the examples +below and the SQLAlchemy `documentation `__ + +.. code-block:: python + + from sqlalchemy import create_engine + from pandas.io import sql + # Create your connection. + engine = create_engine('sqlite:///:memory:') + +Writing DataFrames +~~~~~~~~~~~~~~~~~~ + +Assuming the following data is in a DataFrame ``data``, we can insert it into +the database using :func:`~pandas.io.sql.to_sql`. +-----+------------+-------+-------+-------+ @@ -3088,81 +3123,144 @@ table such as: +-----+------------+-------+-------+-------+ -Functions from :mod:`pandas.io.sql` can extract some data into a DataFrame. In -the following example, we use the `SQlite `__ SQL database -engine. You can use a temporary SQLite database where data are stored in -"memory". Just do: - -.. code-block:: python +.. ipython:: python + :suppress: - import sqlite3 + from sqlalchemy import create_engine from pandas.io import sql - # Create your connection. - cnx = sqlite3.connect(':memory:') + engine = create_engine('sqlite:///:memory:') .. ipython:: python :suppress: + + c = ['id', 'Date', 'Col_1', 'Col_2', 'Col_3'] + d = [(26, datetime.datetime(2010,10,18), 'X', 27.5, True), + (42, datetime.datetime(2010,10,19), 'Y', -12.5, False), + (63, datetime.datetime(2010,10,20), 'Z', 5.73, True)] - import sqlite3 - from pandas.io import sql - cnx = sqlite3.connect(':memory:') + data = DataFrame(d, columns=c) .. ipython:: python - :suppress: + + sql.to_sql(data, 'data', engine) + +Reading Tables +~~~~~~~~~~~~~~ + +:func:`~pandas.io.sql.read_table` will read a databse table given the +table name and optionally a subset of columns to read. - cu = cnx.cursor() - # Create a table named 'data'. - cu.execute("""CREATE TABLE data(id integer, - date date, - Col_1 string, - Col_2 float, - Col_3 bool);""") - cu.executemany('INSERT INTO data VALUES (?,?,?,?,?)', - [(26, datetime.datetime(2010,10,18), 'X', 27.5, True), - (42, datetime.datetime(2010,10,19), 'Y', -12.5, False), - (63, datetime.datetime(2010,10,20), 'Z', 5.73, True)]) +.. note:: + + In order to use :func:`~pandas.io.sql.read_table`, you **must** have the + SQLAlchemy optional dependency installed. +.. ipython:: python + + sql.read_table('data', engine) -Let ``data`` be the name of your SQL table. With a query and your database -connection, just use the :func:`~pandas.io.sql.read_sql` function to get the -query results into a DataFrame: +You can also specify the name of the column as the DataFrame index, +and specify a subset of columns to be read. .. ipython:: python - sql.read_sql("SELECT * FROM data;", cnx) + sql.read_table('data', engine, index_col='id') + sql.read_table('data', engine, columns=['Col_1', 'Col_2']) -You can also specify the name of the column as the DataFrame index: +And you can explicitly force columns to be parsed as dates: .. ipython:: python - sql.read_sql("SELECT * FROM data;", cnx, index_col='id') - sql.read_sql("SELECT * FROM data;", cnx, index_col='date') + sql.read_table('data', engine, parse_dates=['Date']) -Of course, you can specify a more "complex" query. +If needed you can explicitly specifiy a format string, or a dict of arguments +to pass to :func:`pandas.tseries.tools.to_datetime`. + +.. code-block:: python + + sql.read_table('data', engine, parse_dates={'Date': '%Y-%m-%d'}) + sql.read_table('data', engine, parse_dates={'Date': {'format': '%Y-%m-%d %H:%M:%S'}}) + + +You can check if a table exists using :func:`~pandas.io.sql.has_table` + +In addition, the class :class:`~pandas.io.sql.PandasSQLWithEngine` can be +instantiated directly for more manual control over the SQL interaction. + +Querying +~~~~~~~~ + +You can query using raw SQL in the :func:`~pandas.io.sql.read_sql` function. +In this case you must use the SQL variant appropriate for your database. +When using SQLAlchemy, you can also pass SQLAlchemy Expression language constructs, +which are database-agnostic. .. ipython:: python + + sql.read_sql('SELECT * FROM data', engine) - sql.read_sql("SELECT id, Col_1, Col_2 FROM data WHERE id = 42;", cnx) +Of course, you can specify a more "complex" query. .. ipython:: python - :suppress: - cu.close() - cnx.close() + sql.read_frame("SELECT id, Col_1, Col_2 FROM data WHERE id = 42;", engine) -There are a few other available functions: +You can also run a plain query without creating a dataframe with +:func:`~pandas.io.sql.execute`. This is useful for queries that don't return values, +such as INSERT. This is functionally equivalent to calling ``execute`` on the +SQLAlchemy engine or db connection object. Again, ou must use the SQL syntax +variant appropriate for your database. - - ``tquery`` returns a list of tuples corresponding to each row. - - ``uquery`` does the same thing as tquery, but instead of returning results - it returns the number of related rows. - - ``write_frame`` writes records stored in a DataFrame into the SQL table. - - ``has_table`` checks if a given SQLite table exists. +.. code-block:: python -.. note:: + sql.execute('SELECT * FROM table_name', engine) + + sql.execute('INSERT INTO table_name VALUES(?, ?, ?)', engine, params=[('id', 1, 12.2, True)]) + + +Engine connection examples +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + from sqlalchemy import create_engine + + engine = create_engine('postgresql://scott:tiger@localhost:5432/mydatabase') + + engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo') + + engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname') + + engine = create_engine('mssql+pyodbc://mydsn') + + # sqlite:/// + # where is relative: + engine = create_engine('sqlite:///foo.db') + + # or absolute, starting with a slash: + engine = create_engine('sqlite:////absolute/path/to/foo.db') + + +Legacy +~~~~~~ +To use the sqlite support without SQLAlchemy, you can create connections like so: + +.. code-block:: python + + import sqlite3 + from pandas.io import sql + cnx = sqlite3.connect(':memory:') + +And then issue the following queries, remembering to also specify the flavor of SQL +you are using. + +.. code-block:: python + + sql.to_sql(data, 'data', cnx, flavor='sqlite') + + sql.read_sql("SELECT * FROM data", cnx, flavor='sqlite') - For now, writing your DataFrame into a database works only with - **SQLite**. Moreover, the **index** will currently be **dropped**. .. _io.bigquery: diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 5e83a0921189b..e705a3b20585a 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -4,20 +4,77 @@ """ from __future__ import print_function from datetime import datetime, date - -from pandas.compat import range, lzip, map, zip -import pandas.compat as compat +import warnings +from pandas.compat import lzip, map, zip, raise_with_traceback, string_types import numpy as np -import traceback -from pandas.core.datetools import format as date_format + from pandas.core.api import DataFrame +from pandas.core.base import PandasObject +from pandas.tseries.tools import to_datetime + + +class SQLAlchemyRequired(ImportError): + pass + + +class DatabaseError(IOError): + pass + #------------------------------------------------------------------------------ -# Helper execution function +# Helper functions + +def _convert_params(sql, params): + """convert sql and params args to DBAPI2.0 compliant format""" + args = [sql] + if params is not None: + args += list(params) + return args + +def _safe_col_name(col_name): + #TODO: probably want to forbid database reserved names, such as "database" + return col_name.strip().replace(' ', '_') -def execute(sql, con, retry=True, cur=None, params=None): + +def _handle_date_column(col, format=None): + if isinstance(format, dict): + return to_datetime(col, **format) + else: + if format in ['D', 's', 'ms', 'us', 'ns']: + return to_datetime(col, coerce=True, unit=format) + elif issubclass(col.dtype.type, np.floating) or issubclass(col.dtype.type, np.integer): + # parse dates as timestamp + format = 's' if format is None else format + return to_datetime(col, coerce=True, unit=format) + else: + return to_datetime(col, coerce=True, format=format) + + +def _parse_date_columns(data_frame, parse_dates): + """ Force non-datetime columns to be read as such. + Supports both string formatted and integer timestamp columns + """ + # handle non-list entries for parse_dates gracefully + if parse_dates is True or parse_dates is None or parse_dates is False: + parse_dates = [] + + if not hasattr(parse_dates, '__iter__'): + parse_dates = [parse_dates] + + for col_name in parse_dates: + df_col = data_frame[col_name] + try: + fmt = parse_dates[col_name] + except TypeError: + fmt = None + data_frame[col_name] = _handle_date_column(df_col, format=fmt) + + return data_frame + + +def execute(sql, con, cur=None, params=None, flavor='sqlite'): """ Execute the given SQL query using the provided connection object. @@ -25,52 +82,26 @@ def execute(sql, con, retry=True, cur=None, params=None): ---------- sql: string Query to be executed - con: database connection instance - Database connection. Must implement PEP249 (Database API v2.0). - retry: bool - Not currently implemented - cur: database cursor, optional - Must implement PEP249 (Datbase API v2.0). If cursor is not provided, - one will be obtained from the database connection. + con: SQLAlchemy engine or DBAPI2 connection (legacy mode) + Using SQLAlchemy makes it possible to use any DB supported by that + library. + If a DBAPI2 object, a supported SQL flavor must also be provided + cur: depreciated, cursor is obtained from connection params: list or tuple, optional List of parameters to pass to execute method. - + flavor : string "sqlite", "mysql" + Specifies the flavor of SQL to use. + Ignored when using SQLAlchemy engine. Required when using DBAPI2 connection. Returns ------- - Cursor object + Results Iterable """ - try: - if cur is None: - cur = con.cursor() - - if params is None: - cur.execute(sql) - else: - cur.execute(sql, params) - return cur - except Exception: - try: - con.rollback() - except Exception: # pragma: no cover - pass - - print('Error on sql %s' % sql) - raise - - -def _safe_fetch(cur): - try: - result = cur.fetchall() - if not isinstance(result, list): - result = list(result) - return result - except Exception as e: # pragma: no cover - excName = e.__class__.__name__ - if excName == 'OperationalError': - return [] + pandas_sql = pandasSQL_builder(con, flavor=flavor) + args = _convert_params(sql, params) + return pandas_sql.execute(*args) -def tquery(sql, con=None, cur=None, retry=True): +def tquery(sql, con, cur=None, params=None, flavor='sqlite'): """ Returns list of tuples corresponding to each row in given sql query. @@ -81,102 +112,118 @@ def tquery(sql, con=None, cur=None, retry=True): ---------- sql: string SQL query to be executed - con: SQLConnection or DB API 2.0-compliant connection - cur: DB API 2.0 cursor - - Provide a specific connection or a specific cursor if you are executing a - lot of sequential statements and want to commit outside. + con: SQLAlchemy engine or DBAPI2 connection (legacy mode) + Using SQLAlchemy makes it possible to use any DB supported by that + library. + If a DBAPI2 object is given, a supported SQL flavor must also be provided + cur: depreciated, cursor is obtained from connection + params: list or tuple, optional + List of parameters to pass to execute method. + flavor : string "sqlite", "mysql" + Specifies the flavor of SQL to use. + Ignored when using SQLAlchemy engine. Required when using DBAPI2 + connection. + Returns + ------- + Results Iterable """ - cur = execute(sql, con, cur=cur) - result = _safe_fetch(cur) - - if con is not None: - try: - cur.close() - con.commit() - except Exception as e: - excName = e.__class__.__name__ - if excName == 'OperationalError': # pragma: no cover - print('Failed to commit, may need to restart interpreter') - else: - raise - - traceback.print_exc() - if retry: - return tquery(sql, con=con, retry=False) - - if result and len(result[0]) == 1: - # python 3 compat - result = list(lzip(*result)[0]) - elif result is None: # pragma: no cover - result = [] + warnings.warn( + "tquery is depreciated, and will be removed in future versions", + DeprecationWarning) - return result + pandas_sql = pandasSQL_builder(con, flavor=flavor) + args = _convert_params(sql, params) + return pandas_sql.tquery(*args) -def uquery(sql, con=None, cur=None, retry=True, params=None): +def uquery(sql, con, cur=None, params=None, engine=None, flavor='sqlite'): """ Does the same thing as tquery, but instead of returning results, it returns the number of rows affected. Good for update queries. + + Parameters + ---------- + sql: string + SQL query to be executed + con: SQLAlchemy engine or DBAPI2 connection (legacy mode) + Using SQLAlchemy makes it possible to use any DB supported by that + library. + If a DBAPI2 object is given, a supported SQL flavor must also be provided + cur: depreciated, cursor is obtained from connection + params: list or tuple, optional + List of parameters to pass to execute method. + flavor : string "sqlite", "mysql" + Specifies the flavor of SQL to use. + Ignored when using SQLAlchemy engine. Required when using DBAPI2 + connection. + Returns + ------- + Number of affected rows """ - cur = execute(sql, con, cur=cur, retry=retry, params=params) + warnings.warn( + "uquery is depreciated, and will be removed in future versions", + DeprecationWarning) + pandas_sql = pandasSQL_builder(con, flavor=flavor) + args = _convert_params(sql, params) + return pandas_sql.uquery(*args) - result = cur.rowcount - try: - con.commit() - except Exception as e: - excName = e.__class__.__name__ - if excName != 'OperationalError': - raise - traceback.print_exc() - if retry: - print('Looks like your connection failed, reconnecting...') - return uquery(sql, con, retry=False) - return result +#------------------------------------------------------------------------------ +# Read and write to DataFrames -def read_frame(sql, con, index_col=None, coerce_float=True, params=None): +def read_sql(sql, con, index_col=None, flavor='sqlite', coerce_float=True, + params=None, parse_dates=None): """ Returns a DataFrame corresponding to the result set of the query string. Optionally provide an index_col parameter to use one of the - columns as the index. Otherwise will be 0 to len(results) - 1. + columns as the index, otherwise default integer index will be used Parameters ---------- sql: string SQL query to be executed - con: DB connection object, optional + con: SQLAlchemy engine or DBAPI2 connection (legacy mode) + Using SQLAlchemy makes it possible to use any DB supported by that + library. + If a DBAPI2 object is given, a supported SQL flavor must also be provided index_col: string, optional column name to use for the returned DataFrame object. + flavor : string specifying the flavor of SQL to use. Ignored when using + SQLAlchemy engine. Required when using DBAPI2 connection. coerce_float : boolean, default True Attempt to convert values to non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets + cur: depreciated, cursor is obtained from connection params: list or tuple, optional List of parameters to pass to execute method. - """ - cur = execute(sql, con, params=params) - rows = _safe_fetch(cur) - columns = [col_desc[0] for col_desc in cur.description] - - cur.close() - con.commit() - - result = DataFrame.from_records(rows, columns=columns, - coerce_float=coerce_float) + parse_dates: list or dict + List of column names to parse as dates + Or + Dict of {column_name: format string} where format string is + strftime compatible in case of parsing string times or is one of + (D, s, ns, ms, us) in case of parsing integer timestamps + Or + Dict of {column_name: arg dict}, where the arg dict corresponds + to the keyword arguments of :func:`pandas.tseries.tools.to_datetime` + Especially useful with databases without native Datetime support, + such as SQLite - if index_col is not None: - result = result.set_index(index_col) - - return result - -frame_query = read_frame -read_sql = read_frame + Returns + ------- + DataFrame + """ + pandas_sql = pandasSQL_builder(con, flavor=flavor) + return pandas_sql.read_sql(sql, + index_col=index_col, + params=params, + coerce_float=coerce_float, + parse_dates=parse_dates) -def write_frame(frame, name, con, flavor='sqlite', if_exists='fail', **kwargs): +def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True): """ Write records stored in a DataFrame to a SQL database. @@ -184,162 +231,689 @@ def write_frame(frame, name, con, flavor='sqlite', if_exists='fail', **kwargs): ---------- frame: DataFrame name: name of SQL table - con: an open SQL database connection object - flavor: {'sqlite', 'mysql', 'oracle'}, default 'sqlite' + con: SQLAlchemy engine or DBAPI2 connection (legacy mode) + Using SQLAlchemy makes it possible to use any DB supported by that + library. + If a DBAPI2 object is given, a supported SQL flavor must also be provided + flavor: {'sqlite', 'mysql', 'postgres'}, default 'sqlite' + ignored when SQLAlchemy engine. Required when using DBAPI2 connection. if_exists: {'fail', 'replace', 'append'}, default 'fail' fail: If table exists, do nothing. replace: If table exists, drop it, recreate it, and insert data. append: If table exists, insert data. Create if does not exist. + index : boolean, default True + Write DataFrame index as an column """ + pandas_sql = pandasSQL_builder(con, flavor=flavor) + pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index) - if 'append' in kwargs: - import warnings - warnings.warn("append is deprecated, use if_exists instead", - FutureWarning) - if kwargs['append']: - if_exists = 'append' - else: - if_exists = 'fail' - - if if_exists not in ('fail', 'replace', 'append'): - raise ValueError("'%s' is not valid for if_exists" % if_exists) - - exists = table_exists(name, con, flavor) - if if_exists == 'fail' and exists: - raise ValueError("Table '%s' already exists." % name) - - # creation/replacement dependent on the table existing and if_exist criteria - create = None - if exists: - if if_exists == 'fail': - raise ValueError("Table '%s' already exists." % name) - elif if_exists == 'replace': - cur = con.cursor() - cur.execute("DROP TABLE %s;" % name) - cur.close() - create = get_schema(frame, name, flavor) + +def has_table(table_name, con, meta=None, flavor='sqlite'): + """ + Check if DB has named table + + Parameters + ---------- + frame: DataFrame + name: name of SQL table + con: SQLAlchemy engine or DBAPI2 connection (legacy mode) + Using SQLAlchemy makes it possible to use any DB supported by that + library. + If a DBAPI2 object is given, a supported SQL flavor name must also be provided + flavor: {'sqlite', 'mysql'}, default 'sqlite', ignored when using engine + Returns + ------- + boolean + """ + pandas_sql = pandasSQL_builder(con, flavor=flavor) + return pandas_sql.has_table(table_name) + + +def read_table(table_name, con, meta=None, index_col=None, coerce_float=True, + parse_dates=None, columns=None): + """Given a table name and SQLAlchemy engine, return a DataFrame. + Type convertions will be done automatically + + Parameters + ---------- + table_name: name of SQL table in database + con: SQLAlchemy engine. Legacy mode not supported + meta: SQLAlchemy meta, optional. If omitted MetaData is reflected from engine + index_col: column to set as index, optional + coerce_float : boolean, default True + Attempt to convert values to non-string, non-numeric objects (like + decimal.Decimal) to floating point. Can result in loss of Precision. + parse_dates: list or dict + List of column names to parse as dates + Or + Dict of {column_name: format string} where format string is + strftime compatible in case of parsing string times or is one of + (D, s, ns, ms, us) in case of parsing integer timestamps + Or + Dict of {column_name: arg dict}, where the arg dict corresponds + to the keyword arguments of :func:`pandas.tseries.tools.to_datetime` + Especially useful with databases without native Datetime support, + such as SQLite + columns: list + List of column names to select from sql table + Returns + ------- + DataFrame + """ + pandas_sql = PandasSQLAlchemy(con, meta=meta) + table = pandas_sql.read_table(table_name, + index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + + if table is not None: + return table else: - create = get_schema(frame, name, flavor) + raise ValueError("Table %s not found" % table_name, con) + - if create is not None: - cur = con.cursor() - cur.execute(create) +def pandasSQL_builder(con, flavor=None, meta=None): + """ + Convenience function to return the correct PandasSQL subclass based on the + provided parameters + """ + try: + import sqlalchemy + + if isinstance(con, sqlalchemy.engine.Engine): + return PandasSQLAlchemy(con, meta=meta) + else: + warnings.warn( + """Not an SQLAlchemy engine, + attempting to use as legacy DBAPI connection""") + if flavor is None: + raise ValueError( + """PandasSQL must be created with an SQLAlchemy engine + or a DBAPI2 connection and SQL flavour""") + else: + return PandasSQLLegacy(con, flavor) + + except ImportError: + warnings.warn("SQLAlchemy not installed, using legacy mode") + if flavor is None: + raise SQLAlchemyRequired + else: + return PandasSQLLegacy(con, flavor) + + +class PandasSQLTable(PandasObject): + """ For mapping Pandas tables to SQL tables. + Uses fact that table is reflected by SQLAlchemy to + do better type convertions. + Also holds various flags needed to avoid having to + pass them between functions all the time. + """ + # TODO: support for multiIndex + def __init__(self, name, pandas_sql_engine, frame=None, index=True, + if_exists='fail', prefix='pandas'): + self.name = name + self.pd_sql = pandas_sql_engine + self.prefix = prefix + self.frame = frame + self.index = self._index_name(index) + + if frame is not None: + # We want to write a frame + if self.pd_sql.has_table(self.name): + if if_exists == 'fail': + raise ValueError("Table '%s' already exists." % name) + elif if_exists == 'replace': + self.pd_sql.drop_table(self.name) + self.table = self._create_table_statement() + self.create() + elif if_exists == 'append': + self.table = self.pd_sql.get_table(self.name) + if self.table is None: + self.table = self._create_table_statement() + else: + self.table = self._create_table_statement() + self.create() + else: + # no data provided, read-only mode + self.table = self.pd_sql.get_table(self.name) + + if self.table is None: + raise ValueError("Could not init table '%s'" % name) + + def exists(self): + return self.pd_sql.has_table(self.name) + + def sql_schema(self): + return str(self.table.compile()) + + def create(self): + self.table.create() + + def insert_statement(self): + return self.table.insert() + + def maybe_asscalar(self, i): + try: + return np.asscalar(i) + except AttributeError: + return i + + def insert(self): + ins = self.insert_statement() + + for t in self.frame.iterrows(): + data = dict((k, self.maybe_asscalar(v)) + for k, v in t[1].iteritems()) + if self.index is not None: + data[self.index] = self.maybe_asscalar(t[0]) + self.pd_sql.execute(ins, **data) + + def read(self, coerce_float=True, parse_dates=None, columns=None): + + if columns is not None and len(columns) > 0: + from sqlalchemy import select + cols = [self.table.c[n] for n in columns] + if self.index is not None: + cols.insert(0, self.table.c[self.index]) + sql_select = select(cols) + else: + sql_select = self.table.select() + + result = self.pd_sql.execute(sql_select) + data = result.fetchall() + column_names = result.keys() + + self.frame = DataFrame.from_records( + data, columns=column_names, coerce_float=coerce_float) + + self._harmonize_columns(parse_dates=parse_dates) + + if self.index is not None: + self.frame.set_index(self.index, inplace=True) + + # Assume if the index in prefix_index format, we gave it a name + # and should return it nameless + if self.index == self.prefix + '_index': + self.frame.index.name = None + + return self.frame + + def _index_name(self, index): + if index is True: + if self.frame.index.name is not None: + return _safe_col_name(self.frame.index.name) + else: + return self.prefix + '_index' + elif isinstance(index, string_types): + return index + else: + return None + + def _create_table_statement(self): + from sqlalchemy import Table, Column + + safe_columns = map(_safe_col_name, self.frame.dtypes.index) + column_types = map(self._sqlalchemy_type, self.frame.dtypes) + + columns = [Column(name, typ) + for name, typ in zip(safe_columns, column_types)] + + if self.index is not None: + columns.insert(0, Column(self.index, + self._sqlalchemy_type( + self.frame.index.dtype), + index=True)) + + return Table(self.name, self.pd_sql.meta, *columns) + + def _harmonize_columns(self, parse_dates=None): + """ Make a data_frame's column type align with an sql_table + column types + Need to work around limited NA value support. + Floats are always fine, ints must always + be floats if there are Null values. + Booleans are hard because converting bool column with None replaces + all Nones with false. Therefore only convert bool if there are no + NA values. + Datetimes should already be converted + to np.datetime if supported, but here we also force conversion + if required + """ + # handle non-list entries for parse_dates gracefully + if parse_dates is True or parse_dates is None or parse_dates is False: + parse_dates = [] + + if not hasattr(parse_dates, '__iter__'): + parse_dates = [parse_dates] + + for sql_col in self.table.columns: + col_name = sql_col.name + try: + df_col = self.frame[col_name] + # the type the dataframe column should have + col_type = self._numpy_type(sql_col.type) + + if col_type is datetime or col_type is date: + if not issubclass(df_col.dtype.type, np.datetime64): + self.frame[col_name] = _handle_date_column(df_col) + + elif col_type is float: + # floats support NA, can always convert! + self.frame[col_name].astype(col_type, copy=False) + + elif len(df_col) == df_col.count(): + # No NA values, can convert ints and bools + if col_type is int or col_type is bool: + self.frame[col_name].astype(col_type, copy=False) + + # Handle date parsing + if col_name in parse_dates: + try: + fmt = parse_dates[col_name] + except TypeError: + fmt = None + self.frame[col_name] = _handle_date_column( + df_col, format=fmt) + + except KeyError: + pass # this column not in results + + def _sqlalchemy_type(self, dtype): + from sqlalchemy.types import Integer, Float, Text, Boolean, DateTime, Date + + pytype = dtype.type + + if pytype is date: + return Date + if issubclass(pytype, np.datetime64) or pytype is datetime: + # Caution: np.datetime64 is also a subclass of np.number. + return DateTime + if issubclass(pytype, np.floating): + return Float + if issubclass(pytype, np.integer): + # TODO: Refine integer size. + return Integer + if issubclass(pytype, np.bool_): + return Boolean + return Text + + def _numpy_type(self, sqltype): + from sqlalchemy.types import Integer, Float, Boolean, DateTime, Date + + if isinstance(sqltype, Float): + return float + if isinstance(sqltype, Integer): + # TODO: Refine integer size. + return int + if isinstance(sqltype, DateTime): + # Caution: np.datetime64 is also a subclass of np.number. + return datetime + if isinstance(sqltype, Date): + return date + if isinstance(sqltype, Boolean): + return bool + return object + + +class PandasSQL(PandasObject): + + """ + Subclasses Should define read_sql and to_sql + """ + + def read_sql(self, *args, **kwargs): + raise ValueError( + "PandasSQL must be created with an SQLAlchemy engine or connection+sql flavor") + + def to_sql(self, *args, **kwargs): + raise ValueError( + "PandasSQL must be created with an SQLAlchemy engine or connection+sql flavor") + + +class PandasSQLAlchemy(PandasSQL): + + """ + This class enables convertion between DataFrame and SQL databases + using SQLAlchemy to handle DataBase abstraction + """ + + def __init__(self, engine, meta=None): + self.engine = engine + if not meta: + from sqlalchemy.schema import MetaData + meta = MetaData(self.engine) + meta.reflect(self.engine) + + self.meta = meta + + def execute(self, *args, **kwargs): + """Simple passthrough to SQLAlchemy engine""" + return self.engine.execute(*args, **kwargs) + + def tquery(self, *args, **kwargs): + result = self.execute(*args, **kwargs) + return result.fetchall() + + def uquery(self, *args, **kwargs): + result = self.execute(*args, **kwargs) + return result.rowcount + + def read_sql(self, sql, index_col=None, coerce_float=True, + parse_dates=None, params=None): + args = _convert_params(sql, params) + + result = self.execute(*args) + data = result.fetchall() + columns = result.keys() + + data_frame = DataFrame.from_records( + data, columns=columns, coerce_float=coerce_float) + + _parse_date_columns(data_frame, parse_dates) + + if index_col is not None: + data_frame.set_index(index_col, inplace=True) + + return data_frame + + def to_sql(self, frame, name, if_exists='fail', index=True): + table = PandasSQLTable( + name, self, frame=frame, index=index, if_exists=if_exists) + table.insert() + + def has_table(self, name): + return self.engine.has_table(name) + + def get_table(self, table_name): + if self.engine.has_table(table_name): + return self.meta.tables[table_name] + else: + return None + + def read_table(self, table_name, index_col=None, coerce_float=True, + parse_dates=None, columns=None): + + table = PandasSQLTable(table_name, self, index=index_col) + return table.read(coerce_float=parse_dates, + parse_dates=parse_dates, columns=columns) + + def drop_table(self, table_name): + if self.engine.has_table(table_name): + self.get_table(table_name).drop() + self.meta.clear() + self.meta.reflect() + + def _create_sql_schema(self, frame, table_name): + table = PandasSQLTable(table_name, self, frame=frame) + return str(table.compile()) + + +# ---- SQL without SQLAlchemy --- +# Flavour specific sql strings and handler class for access to DBs without +# SQLAlchemy installed +# SQL type convertions for each DB +_SQL_TYPES = { + 'text': { + 'mysql': 'VARCHAR (63)', + 'sqlite': 'TEXT', + }, + 'float': { + 'mysql': 'FLOAT', + 'sqlite': 'REAL', + }, + 'int': { + 'mysql': 'BIGINT', + 'sqlite': 'INTEGER', + }, + 'datetime': { + 'mysql': 'DATETIME', + 'sqlite': 'TIMESTAMP', + }, + 'date': { + 'mysql': 'DATE', + 'sqlite': 'TIMESTAMP', + }, + 'bool': { + 'mysql': 'BOOLEAN', + 'sqlite': 'INTEGER', + } +} + +# SQL enquote and wildcard symbols +_SQL_SYMB = { + 'mysql': { + 'br_l': '`', + 'br_r': '`', + 'wld': '%s' + }, + 'sqlite': { + 'br_l': '[', + 'br_r': ']', + 'wld': '?' + } +} + + +class PandasSQLTableLegacy(PandasSQLTable): + """Patch the PandasSQLTable for legacy support. + Instead of a table variable just use the Create Table + statement""" + def sql_schema(self): + return str(self.table) + + def create(self): + self.pd_sql.execute(self.table) + + def insert_statement(self): + # Replace spaces in DataFrame column names with _. + safe_names = [_safe_col_name(n) for n in self.frame.dtypes.index] + flv = self.pd_sql.flavor + br_l = _SQL_SYMB[flv]['br_l'] # left val quote char + br_r = _SQL_SYMB[flv]['br_r'] # right val quote char + wld = _SQL_SYMB[flv]['wld'] # wildcard char + + if self.index is not None: + safe_names.insert(0, self.index) + + bracketed_names = [br_l + column + br_r for column in safe_names] + col_names = ','.join(bracketed_names) + wildcards = ','.join([wld] * len(safe_names)) + insert_statement = 'INSERT INTO %s (%s) VALUES (%s)' % ( + self.name, col_names, wildcards) + return insert_statement + + def insert(self): + ins = self.insert_statement() + cur = self.pd_sql.con.cursor() + for r in self.frame.iterrows(): + data = [self.maybe_asscalar(v) for v in r[1].values] + if self.index is not None: + data.insert(0, self.maybe_asscalar(r[0])) + print(type(data[2])) + print(type(r[0])) + cur.execute(ins, tuple(data)) cur.close() - cur = con.cursor() - # Replace spaces in DataFrame column names with _. - safe_names = [s.replace(' ', '_').strip() for s in frame.columns] - flavor_picker = {'sqlite': _write_sqlite, - 'mysql': _write_mysql} - - func = flavor_picker.get(flavor, None) - if func is None: - raise NotImplementedError - func(frame, name, safe_names, cur) - cur.close() - con.commit() - - -def _write_sqlite(frame, table, names, cur): - bracketed_names = ['[' + column + ']' for column in names] - col_names = ','.join(bracketed_names) - wildcards = ','.join(['?'] * len(names)) - insert_query = 'INSERT INTO %s (%s) VALUES (%s)' % ( - table, col_names, wildcards) - # pandas types are badly handled if there is only 1 column ( Issue #3628 ) - if not len(frame.columns) == 1: - data = [tuple(x) for x in frame.values] - else: - data = [tuple(x) for x in frame.values.tolist()] - cur.executemany(insert_query, data) - - -def _write_mysql(frame, table, names, cur): - bracketed_names = ['`' + column + '`' for column in names] - col_names = ','.join(bracketed_names) - wildcards = ','.join([r'%s'] * len(names)) - insert_query = "INSERT INTO %s (%s) VALUES (%s)" % ( - table, col_names, wildcards) - data = [tuple(x) for x in frame.values] - cur.executemany(insert_query, data) - - -def table_exists(name, con, flavor): - flavor_map = { - 'sqlite': ("SELECT name FROM sqlite_master " - "WHERE type='table' AND name='%s';") % name, - 'mysql': "SHOW TABLES LIKE '%s'" % name} - query = flavor_map.get(flavor, None) - if query is None: - raise NotImplementedError - return len(tquery(query, con)) > 0 - - -def get_sqltype(pytype, flavor): - sqltype = {'mysql': 'VARCHAR (63)', - 'sqlite': 'TEXT'} - - if issubclass(pytype, np.floating): - sqltype['mysql'] = 'FLOAT' - sqltype['sqlite'] = 'REAL' - - if issubclass(pytype, np.integer): - #TODO: Refine integer size. - sqltype['mysql'] = 'BIGINT' - sqltype['sqlite'] = 'INTEGER' - - if issubclass(pytype, np.datetime64) or pytype is datetime: - # Caution: np.datetime64 is also a subclass of np.number. - sqltype['mysql'] = 'DATETIME' - sqltype['sqlite'] = 'TIMESTAMP' - - if pytype is datetime.date: - sqltype['mysql'] = 'DATE' - sqltype['sqlite'] = 'TIMESTAMP' - - if issubclass(pytype, np.bool_): - sqltype['sqlite'] = 'INTEGER' - - return sqltype[flavor] - - -def get_schema(frame, name, flavor, keys=None): - "Return a CREATE TABLE statement to suit the contents of a DataFrame." - lookup_type = lambda dtype: get_sqltype(dtype.type, flavor) - # Replace spaces in DataFrame column names with _. - safe_columns = [s.replace(' ', '_').strip() for s in frame.dtypes.index] - column_types = lzip(safe_columns, map(lookup_type, frame.dtypes)) - if flavor == 'sqlite': - columns = ',\n '.join('[%s] %s' % x for x in column_types) - else: - columns = ',\n '.join('`%s` %s' % x for x in column_types) - - keystr = '' - if keys is not None: - if isinstance(keys, compat.string_types): - keys = (keys,) - keystr = ', PRIMARY KEY (%s)' % ','.join(keys) - template = """CREATE TABLE %(name)s ( - %(columns)s - %(keystr)s - );""" - create_statement = template % {'name': name, 'columns': columns, - 'keystr': keystr} - return create_statement - - -def sequence2dict(seq): - """Helper function for cx_Oracle. - - For each element in the sequence, creates a dictionary item equal - to the element and keyed by the position of the item in the list. - >>> sequence2dict(("Matt", 1)) - {'1': 'Matt', '2': 1} - - Source: - http://www.gingerandjohn.com/archives/2004/02/26/cx_oracle-executemany-example/ + def _create_table_statement(self): + "Return a CREATE TABLE statement to suit the contents of a DataFrame." + + # Replace spaces in DataFrame column names with _. + safe_columns = [_safe_col_name(n) for n in self.frame.dtypes.index] + column_types = [self._sql_type_name(typ) for typ in self.frame.dtypes] + + if self.index is not None: + safe_columns.insert(0, self.index) + column_types.insert(0, self._sql_type_name(self.frame.index.dtype)) + flv = self.pd_sql.flavor + + br_l = _SQL_SYMB[flv]['br_l'] # left val quote char + br_r = _SQL_SYMB[flv]['br_r'] # right val quote char + + col_template = br_l + '%s' + br_r + ' %s' + + columns = ',\n '.join(col_template % + x for x in zip(safe_columns, column_types)) + template = """CREATE TABLE %(name)s ( + %(columns)s + );""" + create_statement = template % {'name': self.name, 'columns': columns} + return create_statement + + def _sql_type_name(self, dtype): + pytype = dtype.type + pytype_name = "text" + if issubclass(pytype, np.floating): + pytype_name = "float" + elif issubclass(pytype, np.integer): + pytype_name = "int" + elif issubclass(pytype, np.datetime64) or pytype is datetime: + # Caution: np.datetime64 is also a subclass of np.number. + pytype_name = "datetime" + elif pytype is datetime.date: + pytype_name = "date" + elif issubclass(pytype, np.bool_): + pytype_name = "bool" + + return _SQL_TYPES[pytype_name][self.pd_sql.flavor] + + +class PandasSQLLegacy(PandasSQL): + + def __init__(self, con, flavor): + self.con = con + if flavor not in ['sqlite', 'mysql']: + raise NotImplementedError + else: + self.flavor = flavor + + def execute(self, *args, **kwargs): + try: + cur = self.con.cursor() + if kwargs: + cur.execute(*args, **kwargs) + else: + cur.execute(*args) + return cur + except Exception as e: + try: + self.con.rollback() + except Exception: # pragma: no cover + ex = DatabaseError( + "Execution failed on sql: %s\n%s\nunable to rollback" % (args[0], e)) + raise_with_traceback(ex) + + ex = DatabaseError("Execution failed on sql: %s" % args[0]) + raise_with_traceback(ex) + + def tquery(self, *args): + cur = self.execute(*args) + result = self._fetchall_as_list(cur) + + # This makes into tuples + if result and len(result[0]) == 1: + # python 3 compat + result = list(lzip(*result)[0]) + elif result is None: # pragma: no cover + result = [] + return result + + def uquery(self, *args): + cur = self.execute(*args) + return cur.rowcount + + def read_sql(self, sql, index_col=None, coerce_float=True, params=None, + parse_dates=None): + args = _convert_params(sql, params) + cursor = self.execute(*args) + columns = [col_desc[0] for col_desc in cursor.description] + data = self._fetchall_as_list(cursor) + cursor.close() + + data_frame = DataFrame.from_records( + data, columns=columns, coerce_float=coerce_float) + + _parse_date_columns(data_frame, parse_dates) + + if index_col is not None: + data_frame.set_index(index_col, inplace=True) + return data_frame + + def _fetchall_as_list(self, cur): + result = cur.fetchall() + if not isinstance(result, list): + result = list(result) + return result + + def to_sql(self, frame, name, if_exists='fail', index=True): + """ + Write records stored in a DataFrame to a SQL database. + + Parameters + ---------- + frame: DataFrame + name: name of SQL table + flavor: {'sqlite', 'mysql', 'postgres'}, default 'sqlite' + if_exists: {'fail', 'replace', 'append'}, default 'fail' + fail: If table exists, do nothing. + replace: If table exists, drop it, recreate it, and insert data. + append: If table exists, insert data. Create if does not exist. + """ + table = PandasSQLTableLegacy( + name, self, frame=frame, index=index, if_exists=if_exists) + table.insert() + + def has_table(self, name): + flavor_map = { + 'sqlite': ("SELECT name FROM sqlite_master " + "WHERE type='table' AND name='%s';") % name, + 'mysql': "SHOW TABLES LIKE '%s'" % name} + query = flavor_map.get(self.flavor) + + return len(self.tquery(query)) > 0 + + def get_table(self, table_name): + return None # not supported in Legacy mode + + def drop_table(self, name): + drop_sql = "DROP TABLE %s" % name + self.execute(drop_sql) + + +# legacy names, with depreciation warnings and copied docs +def get_schema(frame, name, con, flavor='sqlite'): + """ + Get the SQL db table schema for the given frame + + Parameters + ---------- + frame: DataFrame + name: name of SQL table + con: an open SQL database connection object + engine: an SQLAlchemy engine - replaces connection and flavor + flavor: {'sqlite', 'mysql', 'postgres'}, default 'sqlite' + + """ + warnings.warn( + "get_schema is depreciated", DeprecationWarning) + pandas_sql = pandasSQL_builder(con=con, flavor=flavor) + return pandas_sql._create_sql_schema(frame, name) + + +def read_frame(*args, **kwargs): + """DEPRECIATED - use read_sql """ - d = {} - for k, v in zip(range(1, 1 + len(seq)), seq): - d[str(k)] = v - return d + warnings.warn( + "read_frame is depreciated, use read_sql", DeprecationWarning) + return read_sql(*args, **kwargs) + + +def write_frame(*args, **kwargs): + """DEPRECIATED - use to_sql + """ + warnings.warn("write_frame is depreciated, use to_sql", DeprecationWarning) + return to_sql(*args, **kwargs) + + +# Append wrapped function docstrings +read_frame.__doc__ += read_sql.__doc__ +write_frame.__doc__ += to_sql.__doc__ diff --git a/pandas/io/tests/data/iris.csv b/pandas/io/tests/data/iris.csv new file mode 100644 index 0000000000000..c19b9c3688515 --- /dev/null +++ b/pandas/io/tests/data/iris.csv @@ -0,0 +1,151 @@ +SepalLength,SepalWidth,PetalLength,PetalWidth,Name +5.1,3.5,1.4,0.2,Iris-setosa +4.9,3.0,1.4,0.2,Iris-setosa +4.7,3.2,1.3,0.2,Iris-setosa +4.6,3.1,1.5,0.2,Iris-setosa +5.0,3.6,1.4,0.2,Iris-setosa +5.4,3.9,1.7,0.4,Iris-setosa +4.6,3.4,1.4,0.3,Iris-setosa +5.0,3.4,1.5,0.2,Iris-setosa +4.4,2.9,1.4,0.2,Iris-setosa +4.9,3.1,1.5,0.1,Iris-setosa +5.4,3.7,1.5,0.2,Iris-setosa +4.8,3.4,1.6,0.2,Iris-setosa +4.8,3.0,1.4,0.1,Iris-setosa +4.3,3.0,1.1,0.1,Iris-setosa +5.8,4.0,1.2,0.2,Iris-setosa +5.7,4.4,1.5,0.4,Iris-setosa +5.4,3.9,1.3,0.4,Iris-setosa +5.1,3.5,1.4,0.3,Iris-setosa +5.7,3.8,1.7,0.3,Iris-setosa +5.1,3.8,1.5,0.3,Iris-setosa +5.4,3.4,1.7,0.2,Iris-setosa +5.1,3.7,1.5,0.4,Iris-setosa +4.6,3.6,1.0,0.2,Iris-setosa +5.1,3.3,1.7,0.5,Iris-setosa +4.8,3.4,1.9,0.2,Iris-setosa +5.0,3.0,1.6,0.2,Iris-setosa +5.0,3.4,1.6,0.4,Iris-setosa +5.2,3.5,1.5,0.2,Iris-setosa +5.2,3.4,1.4,0.2,Iris-setosa +4.7,3.2,1.6,0.2,Iris-setosa +4.8,3.1,1.6,0.2,Iris-setosa +5.4,3.4,1.5,0.4,Iris-setosa +5.2,4.1,1.5,0.1,Iris-setosa +5.5,4.2,1.4,0.2,Iris-setosa +4.9,3.1,1.5,0.1,Iris-setosa +5.0,3.2,1.2,0.2,Iris-setosa +5.5,3.5,1.3,0.2,Iris-setosa +4.9,3.1,1.5,0.1,Iris-setosa +4.4,3.0,1.3,0.2,Iris-setosa +5.1,3.4,1.5,0.2,Iris-setosa +5.0,3.5,1.3,0.3,Iris-setosa +4.5,2.3,1.3,0.3,Iris-setosa +4.4,3.2,1.3,0.2,Iris-setosa +5.0,3.5,1.6,0.6,Iris-setosa +5.1,3.8,1.9,0.4,Iris-setosa +4.8,3.0,1.4,0.3,Iris-setosa +5.1,3.8,1.6,0.2,Iris-setosa +4.6,3.2,1.4,0.2,Iris-setosa +5.3,3.7,1.5,0.2,Iris-setosa +5.0,3.3,1.4,0.2,Iris-setosa +7.0,3.2,4.7,1.4,Iris-versicolor +6.4,3.2,4.5,1.5,Iris-versicolor +6.9,3.1,4.9,1.5,Iris-versicolor +5.5,2.3,4.0,1.3,Iris-versicolor +6.5,2.8,4.6,1.5,Iris-versicolor +5.7,2.8,4.5,1.3,Iris-versicolor +6.3,3.3,4.7,1.6,Iris-versicolor +4.9,2.4,3.3,1.0,Iris-versicolor +6.6,2.9,4.6,1.3,Iris-versicolor +5.2,2.7,3.9,1.4,Iris-versicolor +5.0,2.0,3.5,1.0,Iris-versicolor +5.9,3.0,4.2,1.5,Iris-versicolor +6.0,2.2,4.0,1.0,Iris-versicolor +6.1,2.9,4.7,1.4,Iris-versicolor +5.6,2.9,3.6,1.3,Iris-versicolor +6.7,3.1,4.4,1.4,Iris-versicolor +5.6,3.0,4.5,1.5,Iris-versicolor +5.8,2.7,4.1,1.0,Iris-versicolor +6.2,2.2,4.5,1.5,Iris-versicolor +5.6,2.5,3.9,1.1,Iris-versicolor +5.9,3.2,4.8,1.8,Iris-versicolor +6.1,2.8,4.0,1.3,Iris-versicolor +6.3,2.5,4.9,1.5,Iris-versicolor +6.1,2.8,4.7,1.2,Iris-versicolor +6.4,2.9,4.3,1.3,Iris-versicolor +6.6,3.0,4.4,1.4,Iris-versicolor +6.8,2.8,4.8,1.4,Iris-versicolor +6.7,3.0,5.0,1.7,Iris-versicolor +6.0,2.9,4.5,1.5,Iris-versicolor +5.7,2.6,3.5,1.0,Iris-versicolor +5.5,2.4,3.8,1.1,Iris-versicolor +5.5,2.4,3.7,1.0,Iris-versicolor +5.8,2.7,3.9,1.2,Iris-versicolor +6.0,2.7,5.1,1.6,Iris-versicolor +5.4,3.0,4.5,1.5,Iris-versicolor +6.0,3.4,4.5,1.6,Iris-versicolor +6.7,3.1,4.7,1.5,Iris-versicolor +6.3,2.3,4.4,1.3,Iris-versicolor +5.6,3.0,4.1,1.3,Iris-versicolor +5.5,2.5,4.0,1.3,Iris-versicolor +5.5,2.6,4.4,1.2,Iris-versicolor +6.1,3.0,4.6,1.4,Iris-versicolor +5.8,2.6,4.0,1.2,Iris-versicolor +5.0,2.3,3.3,1.0,Iris-versicolor +5.6,2.7,4.2,1.3,Iris-versicolor +5.7,3.0,4.2,1.2,Iris-versicolor +5.7,2.9,4.2,1.3,Iris-versicolor +6.2,2.9,4.3,1.3,Iris-versicolor +5.1,2.5,3.0,1.1,Iris-versicolor +5.7,2.8,4.1,1.3,Iris-versicolor +6.3,3.3,6.0,2.5,Iris-virginica +5.8,2.7,5.1,1.9,Iris-virginica +7.1,3.0,5.9,2.1,Iris-virginica +6.3,2.9,5.6,1.8,Iris-virginica +6.5,3.0,5.8,2.2,Iris-virginica +7.6,3.0,6.6,2.1,Iris-virginica +4.9,2.5,4.5,1.7,Iris-virginica +7.3,2.9,6.3,1.8,Iris-virginica +6.7,2.5,5.8,1.8,Iris-virginica +7.2,3.6,6.1,2.5,Iris-virginica +6.5,3.2,5.1,2.0,Iris-virginica +6.4,2.7,5.3,1.9,Iris-virginica +6.8,3.0,5.5,2.1,Iris-virginica +5.7,2.5,5.0,2.0,Iris-virginica +5.8,2.8,5.1,2.4,Iris-virginica +6.4,3.2,5.3,2.3,Iris-virginica +6.5,3.0,5.5,1.8,Iris-virginica +7.7,3.8,6.7,2.2,Iris-virginica +7.7,2.6,6.9,2.3,Iris-virginica +6.0,2.2,5.0,1.5,Iris-virginica +6.9,3.2,5.7,2.3,Iris-virginica +5.6,2.8,4.9,2.0,Iris-virginica +7.7,2.8,6.7,2.0,Iris-virginica +6.3,2.7,4.9,1.8,Iris-virginica +6.7,3.3,5.7,2.1,Iris-virginica +7.2,3.2,6.0,1.8,Iris-virginica +6.2,2.8,4.8,1.8,Iris-virginica +6.1,3.0,4.9,1.8,Iris-virginica +6.4,2.8,5.6,2.1,Iris-virginica +7.2,3.0,5.8,1.6,Iris-virginica +7.4,2.8,6.1,1.9,Iris-virginica +7.9,3.8,6.4,2.0,Iris-virginica +6.4,2.8,5.6,2.2,Iris-virginica +6.3,2.8,5.1,1.5,Iris-virginica +6.1,2.6,5.6,1.4,Iris-virginica +7.7,3.0,6.1,2.3,Iris-virginica +6.3,3.4,5.6,2.4,Iris-virginica +6.4,3.1,5.5,1.8,Iris-virginica +6.0,3.0,4.8,1.8,Iris-virginica +6.9,3.1,5.4,2.1,Iris-virginica +6.7,3.1,5.6,2.4,Iris-virginica +6.9,3.1,5.1,2.3,Iris-virginica +5.8,2.7,5.1,1.9,Iris-virginica +6.8,3.2,5.9,2.3,Iris-virginica +6.7,3.3,5.7,2.5,Iris-virginica +6.7,3.0,5.2,2.3,Iris-virginica +6.3,2.5,5.0,1.9,Iris-virginica +6.5,3.0,5.2,2.0,Iris-virginica +6.2,3.4,5.4,2.3,Iris-virginica +5.9,3.0,5.1,1.8,Iris-virginica \ No newline at end of file diff --git a/pandas/io/tests/test_sql.py b/pandas/io/tests/test_sql.py index ef9917c9a02f7..c11d64302d955 100644 --- a/pandas/io/tests/test_sql.py +++ b/pandas/io/tests/test_sql.py @@ -1,608 +1,701 @@ from __future__ import print_function +import unittest import sqlite3 -import sys - -import warnings +import csv +import os import nose - import numpy as np -from pandas.core.datetools import format as date_format -from pandas.core.api import DataFrame, isnull -from pandas.compat import StringIO, range, lrange -import pandas.compat as compat +from pandas import DataFrame +from pandas.compat import range, lrange, iteritems +#from pandas.core.datetools import format as date_format import pandas.io.sql as sql import pandas.util.testing as tm -from pandas import Series, Index, DataFrame -from datetime import datetime - -_formatters = { - datetime: lambda dt: "'%s'" % date_format(dt), - str: lambda x: "'%s'" % x, - np.str_: lambda x: "'%s'" % x, - compat.text_type: lambda x: "'%s'" % x, - compat.binary_type: lambda x: "'%s'" % x, - float: lambda x: "%.8f" % x, - int: lambda x: "%s" % x, - type(None): lambda x: "NULL", - np.float64: lambda x: "%.10f" % x, - bool: lambda x: "'%s'" % x, + + +try: + import sqlalchemy + SQLALCHEMY_INSTALLED = True +except ImportError: + SQLALCHEMY_INSTALLED = False + +SQL_STRINGS = { + 'create_iris': { + 'sqlite': """CREATE TABLE iris ( + `SepalLength` REAL, + `SepalWidth` REAL, + `PetalLength` REAL, + `PetalWidth` REAL, + `Name` TEXT + )""", + 'mysql': """CREATE TABLE iris ( + `SepalLength` DOUBLE, + `SepalWidth` DOUBLE, + `PetalLength` DOUBLE, + `PetalWidth` DOUBLE, + `Name` VARCHAR(200) + )""" + }, + 'insert_iris': { + 'sqlite': """INSERT INTO iris VALUES(?, ?, ?, ?, ?)""", + 'mysql': """INSERT INTO iris VALUES(%s, %s, %s, %s, "%s");""" + }, + 'create_test_types': { + 'sqlite': """CREATE TABLE types_test_data ( + `TextCol` TEXT, + `DateCol` TEXT, + `IntDateCol` INTEGER, + `FloatCol` REAL, + `IntCol` INTEGER, + `BoolCol` INTEGER, + `IntColWithNull` INTEGER, + `BoolColWithNull` INTEGER + )""", + 'mysql': """CREATE TABLE types_test_data ( + `TextCol` TEXT, + `DateCol` DATETIME, + `IntDateCol` INTEGER, + `FloatCol` DOUBLE, + `IntCol` INTEGER, + `BoolCol` BOOLEAN, + `IntColWithNull` INTEGER, + `BoolColWithNull` BOOLEAN + )""" + }, + 'insert_test_types': { + 'sqlite': """ + INSERT INTO types_test_data + VALUES(?, ?, ?, ?, ?, ?, ?, ?) + """, + 'mysql': """ + INSERT INTO types_test_data + VALUES("%s", %s, %s, %s, %s, %s, %s, %s) + """ + } } -def format_query(sql, *args): + +class PandasSQLTest(unittest.TestCase): + + """Base class with common private methods for + SQLAlchemy and fallback cases. """ + def drop_table(self, table_name): + self._get_exec().execute("DROP TABLE IF EXISTS %s" % table_name) + + def _get_exec(self): + if hasattr(self.conn, 'execute'): + return self.conn + else: + return self.conn.cursor() + + def _load_iris_data(self): + iris_csv_file = os.path.join(tm.get_data_path(), 'iris.csv') + + self.drop_table('iris') + self._get_exec().execute(SQL_STRINGS['create_iris'][self.flavor]) + + with open(iris_csv_file, 'rU') as iris_csv: + r = csv.reader(iris_csv) + next(r) # skip header row + ins = SQL_STRINGS['insert_iris'][self.flavor] + + for row in r: + self._get_exec().execute(ins, row) + + def _check_iris_loaded_frame(self, iris_frame): + pytype = iris_frame.dtypes[0].type + row = iris_frame.iloc[0] + + self.assertTrue( + issubclass(pytype, np.floating), 'Loaded frame has incorrect type') + tm.equalContents(row.values, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) + + def _load_test1_data(self): + columns = ['index', 'A', 'B', 'C', 'D'] + data = [( + '2000-01-03 00:00:00', 0.980268513777, 3.68573087906, -0.364216805298, -1.15973806169), + ('2000-01-04 00:00:00', 1.04791624281, - + 0.0412318367011, -0.16181208307, 0.212549316967), + ('2000-01-05 00:00:00', 0.498580885705, + 0.731167677815, -0.537677223318, 1.34627041952), + ('2000-01-06 00:00:00', 1.12020151869, 1.56762092543, 0.00364077397681, 0.67525259227)] + + self.test_frame1 = DataFrame(data, columns=columns) + + def _load_raw_sql(self): + self.drop_table('types_test_data') + self._get_exec().execute(SQL_STRINGS['create_test_types'][self.flavor]) + ins = SQL_STRINGS['insert_test_types'][self.flavor] + + data = [( + 'first', '2000-01-03 00:00:00', 535852800, 10.10, 1, False, 1, False), + ('first', '2000-01-04 00:00:00', 1356998400, 10.10, 1, False, None, None)] + for d in data: + self._get_exec().execute(ins, d) + + def _count_rows(self, table_name): + result = self._get_exec().execute( + "SELECT count(*) AS count_1 FROM %s" % table_name).fetchone() + return result[0] + + def _read_sql_iris(self): + iris_frame = self.pandasSQL.read_sql("SELECT * FROM iris") + self._check_iris_loaded_frame(iris_frame) + + def _to_sql(self): + self.drop_table('test_frame1') + + self.pandasSQL.to_sql(self.test_frame1, 'test_frame1') + self.assertTrue(self.pandasSQL.has_table( + 'test_frame1'), 'Table not written to DB') + + # Nuke table + self.drop_table('test_frame1') + + def _to_sql_fail(self): + self.drop_table('test_frame1') + + self.pandasSQL.to_sql( + self.test_frame1, 'test_frame1', if_exists='fail') + self.assertTrue(self.pandasSQL.has_table( + 'test_frame1'), 'Table not written to DB') + + self.assertRaises(ValueError, self.pandasSQL.to_sql, + self.test_frame1, 'test_frame1', if_exists='fail') + + self.drop_table('test_frame1') + + def _to_sql_replace(self): + self.drop_table('test_frame1') + + self.pandasSQL.to_sql( + self.test_frame1, 'test_frame1', if_exists='fail') + # Add to table again + self.pandasSQL.to_sql( + self.test_frame1, 'test_frame1', if_exists='replace') + self.assertTrue(self.pandasSQL.has_table( + 'test_frame1'), 'Table not written to DB') + + num_entries = len(self.test_frame1) + num_rows = self._count_rows('test_frame1') + + self.assertEqual( + num_rows, num_entries, "not the same number of rows as entries") + + self.drop_table('test_frame1') + + def _to_sql_append(self): + # Nuke table just in case + self.drop_table('test_frame1') + + self.pandasSQL.to_sql( + self.test_frame1, 'test_frame1', if_exists='fail') + + # Add to table again + self.pandasSQL.to_sql( + self.test_frame1, 'test_frame1', if_exists='append') + self.assertTrue(self.pandasSQL.has_table( + 'test_frame1'), 'Table not written to DB') + + num_entries = 2 * len(self.test_frame1) + num_rows = self._count_rows('test_frame1') + + self.assertEqual( + num_rows, num_entries, "not the same number of rows as entries") + + self.drop_table('test_frame1') + + def _roundtrip(self): + self.drop_table('test_frame_roundtrip') + self.pandasSQL.to_sql(self.test_frame1, 'test_frame_roundtrip') + result = self.pandasSQL.read_sql('SELECT * FROM test_frame_roundtrip') + + result.set_index('pandas_index', inplace=True) + # result.index.astype(int) + + result.index.name = None + + tm.assert_frame_equal(result, self.test_frame1) + + def _execute_sql(self): + # drop_sql = "DROP TABLE IF EXISTS test" # should already be done + iris_results = self.pandasSQL.execute("SELECT * FROM iris") + row = iris_results.fetchone() + tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) + + def _tquery(self): + iris_results = self.pandasSQL.tquery("SELECT * FROM iris") + row = iris_results[0] + tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) + + +class TestSQLApi(PandasSQLTest): + + """Test the public API as it would be used + directly, including legacy names + + Notes: + flavor can always be passed even in SQLAlchemy mode, + should be correctly ignored. + + we don't use drop_table because that isn't part of the public api + """ - processed_args = [] - for arg in args: - if isinstance(arg, float) and isnull(arg): - arg = None + flavor = 'sqlite' - formatter = _formatters[type(arg)] - processed_args.append(formatter(arg)) + def connect(self): + if SQLALCHEMY_INSTALLED: + return sqlalchemy.create_engine('sqlite:///:memory:') + else: + return sqlite3.connect(':memory:') - return sql % tuple(processed_args) + def setUp(self): + self.conn = self.connect() + self._load_iris_data() + self._load_test1_data() + self._load_raw_sql() + + def test_read_sql_iris(self): + iris_frame = sql.read_sql( + "SELECT * FROM iris", self.conn, flavor='sqlite') + self._check_iris_loaded_frame(iris_frame) + + def test_legacy_read_frame(self): + """Test legacy name read_frame""" + iris_frame = sql.read_frame( + "SELECT * FROM iris", self.conn, flavor='sqlite') + self._check_iris_loaded_frame(iris_frame) + + def test_to_sql(self): + sql.to_sql(self.test_frame1, 'test_frame1', self.conn, flavor='sqlite') + self.assertTrue( + sql.has_table('test_frame1', self.conn, flavor='sqlite'), 'Table not written to DB') + + def test_to_sql_fail(self): + sql.to_sql(self.test_frame1, 'test_frame2', + self.conn, flavor='sqlite', if_exists='fail') + self.assertTrue( + sql.has_table('test_frame2', self.conn, flavor='sqlite'), 'Table not written to DB') + + self.assertRaises(ValueError, sql.to_sql, self.test_frame1, + 'test_frame2', self.conn, flavor='sqlite', if_exists='fail') + + def test_to_sql_replace(self): + sql.to_sql(self.test_frame1, 'test_frame3', + self.conn, flavor='sqlite', if_exists='fail') + # Add to table again + sql.to_sql(self.test_frame1, 'test_frame3', + self.conn, flavor='sqlite', if_exists='replace') + self.assertTrue( + sql.has_table('test_frame3', self.conn, flavor='sqlite'), 'Table not written to DB') + + num_entries = len(self.test_frame1) + num_rows = self._count_rows('test_frame3') + + self.assertEqual( + num_rows, num_entries, "not the same number of rows as entries") + + def test_to_sql_append(self): + sql.to_sql(self.test_frame1, 'test_frame4', + self.conn, flavor='sqlite', if_exists='fail') + + # Add to table again + sql.to_sql(self.test_frame1, 'test_frame4', + self.conn, flavor='sqlite', if_exists='append') + self.assertTrue( + sql.has_table('test_frame4', self.conn, flavor='sqlite'), 'Table not written to DB') + + num_entries = 2 * len(self.test_frame1) + num_rows = self._count_rows('test_frame4') + + self.assertEqual( + num_rows, num_entries, "not the same number of rows as entries") + + def test_legacy_write_frame(self): + """Test legacy write frame name. + Assume that functionality is already tested above so just do quick check that it basically works""" + sql.write_frame( + self.test_frame1, 'test_frame_legacy', self.conn, flavor='sqlite') + self.assertTrue( + sql.has_table('test_frame_legacy', self.conn, flavor='sqlite'), 'Table not written to DB') + + def test_roundtrip(self): + sql.to_sql(self.test_frame1, 'test_frame_roundtrip', + con=self.conn, flavor='sqlite') + result = sql.read_sql( + 'SELECT * FROM test_frame_roundtrip', + con=self.conn, + flavor='sqlite') + + # HACK! + result.index = self.test_frame1.index + result.set_index('pandas_index', inplace=True) + result.index.astype(int) + result.index.name = None + tm.assert_frame_equal(result, self.test_frame1) + + def test_execute_sql(self): + # drop_sql = "DROP TABLE IF EXISTS test" # should already be done + iris_results = sql.execute( + "SELECT * FROM iris", con=self.conn, flavor='sqlite') + row = iris_results.fetchone() + tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) + + def test_tquery(self): + iris_results = sql.tquery( + "SELECT * FROM iris", con=self.conn, flavor='sqlite') + row = iris_results[0] + tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) + + def test_date_parsing(self): + """ Test date parsing in read_sql """ + # No Parsing + df = sql.read_sql( + "SELECT * FROM types_test_data", self.conn, flavor='sqlite') + self.assertFalse( + issubclass(df.DateCol.dtype.type, np.datetime64), + "DateCol loaded with incorrect type") + + df = sql.read_sql("SELECT * FROM types_test_data", + self.conn, flavor='sqlite', parse_dates=['DateCol']) + self.assertTrue( + issubclass(df.DateCol.dtype.type, np.datetime64), + "DateCol loaded with incorrect type") + + df = sql.read_sql("SELECT * FROM types_test_data", self.conn, + flavor='sqlite', + parse_dates={'DateCol': '%Y-%m-%d %H:%M:%S'}) + self.assertTrue( + issubclass(df.DateCol.dtype.type, np.datetime64), + "DateCol loaded with incorrect type") + + df = sql.read_sql("SELECT * FROM types_test_data", + self.conn, flavor='sqlite', + parse_dates=['IntDateCol']) + + self.assertTrue(issubclass(df.IntDateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") + + df = sql.read_sql("SELECT * FROM types_test_data", + self.conn, flavor='sqlite', + parse_dates={'IntDateCol': 's'}) + + self.assertTrue(issubclass(df.IntDateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") + + def test_date_and_index(self): + """ Test case where same column appears in parse_date and index_col""" + + df = sql.read_sql("SELECT * FROM types_test_data", + self.conn, flavor='sqlite', + parse_dates=['DateCol', 'IntDateCol'], + index_col='DateCol') + self.assertTrue( + issubclass(df.index.dtype.type, np.datetime64), + "DateCol loaded with incorrect type") + + self.assertTrue( + issubclass(df.IntDateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") -def _skip_if_no_MySQLdb(): - try: - import MySQLdb - except ImportError: - raise nose.SkipTest('MySQLdb not installed, skipping') -class TestSQLite(tm.TestCase): +class TestSQLAlchemy(PandasSQLTest): + + ''' + Test the sqlalchemy backend against an in-memory sqlite database. + Assume that sqlalchemy takes case of the DB specifics + ''' + flavor = 'sqlite' + + def connect(self): + return sqlalchemy.create_engine('sqlite:///:memory:') def setUp(self): - self.db = sqlite3.connect(':memory:') - - def test_basic(self): - frame = tm.makeTimeDataFrame() - self._check_roundtrip(frame) - - def test_write_row_by_row(self): - frame = tm.makeTimeDataFrame() - frame.ix[0, 0] = np.nan - create_sql = sql.get_schema(frame, 'test', 'sqlite') - cur = self.db.cursor() - cur.execute(create_sql) - - cur = self.db.cursor() - - ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" - for idx, row in frame.iterrows(): - fmt_sql = format_query(ins, *row) - sql.tquery(fmt_sql, cur=cur) - - self.db.commit() - - result = sql.read_frame("select * from test", con=self.db) - result.index = frame.index - tm.assert_frame_equal(result, frame) - - def test_execute(self): - frame = tm.makeTimeDataFrame() - create_sql = sql.get_schema(frame, 'test', 'sqlite') - cur = self.db.cursor() - cur.execute(create_sql) - ins = "INSERT INTO test VALUES (?, ?, ?, ?)" - - row = frame.ix[0] - sql.execute(ins, self.db, params=tuple(row)) - self.db.commit() - - result = sql.read_frame("select * from test", self.db) - result.index = frame.index[:1] - tm.assert_frame_equal(result, frame[:1]) - - def test_schema(self): - frame = tm.makeTimeDataFrame() - create_sql = sql.get_schema(frame, 'test', 'sqlite') - lines = create_sql.splitlines() - for l in lines: - tokens = l.split(' ') - if len(tokens) == 2 and tokens[0] == 'A': - self.assert_(tokens[1] == 'DATETIME') - - frame = tm.makeTimeDataFrame() - create_sql = sql.get_schema(frame, 'test', 'sqlite', keys=['A', 'B'],) - lines = create_sql.splitlines() - self.assert_('PRIMARY KEY (A,B)' in create_sql) - cur = self.db.cursor() - cur.execute(create_sql) - - def test_execute_fail(self): - create_sql = """ - CREATE TABLE test - ( - a TEXT, - b TEXT, - c REAL, - PRIMARY KEY (a, b) - ); - """ - cur = self.db.cursor() - cur.execute(create_sql) - - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.db) - sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.db) + # Skip this test if SQLAlchemy not available + if not SQLALCHEMY_INSTALLED: + raise nose.SkipTest('SQLAlchemy not installed') - try: - sys.stdout = StringIO() - self.assertRaises(Exception, sql.execute, - 'INSERT INTO test VALUES("foo", "bar", 7)', - self.db) - finally: - sys.stdout = sys.__stdout__ - - def test_execute_closed_connection(self): - create_sql = """ - CREATE TABLE test - ( - a TEXT, - b TEXT, - c REAL, - PRIMARY KEY (a, b) - ); - """ - cur = self.db.cursor() - cur.execute(create_sql) - - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.db) - self.db.close() - try: - sys.stdout = StringIO() - self.assertRaises(Exception, sql.tquery, "select * from test", - con=self.db) - finally: - sys.stdout = sys.__stdout__ - - def test_na_roundtrip(self): - pass - - def _check_roundtrip(self, frame): - sql.write_frame(frame, name='test_table', con=self.db) - result = sql.read_frame("select * from test_table", self.db) - - # HACK! Change this once indexes are handled properly. - result.index = frame.index - - expected = frame - tm.assert_frame_equal(result, expected) - - frame['txt'] = ['a'] * len(frame) - frame2 = frame.copy() - frame2['Idx'] = Index(lrange(len(frame2))) + 10 - sql.write_frame(frame2, name='test_table2', con=self.db) - result = sql.read_frame("select * from test_table2", self.db, - index_col='Idx') - expected = frame.copy() - expected.index = Index(lrange(len(frame2))) + 10 - expected.index.name = 'Idx' - print(expected.index.names) - print(result.index.names) - tm.assert_frame_equal(expected, result) + self.conn = self.connect() + self.pandasSQL = sql.PandasSQLAlchemy(self.conn) - def test_tquery(self): - frame = tm.makeTimeDataFrame() - sql.write_frame(frame, name='test_table', con=self.db) - result = sql.tquery("select A from test_table", self.db) - expected = frame.A - result = Series(result, frame.index) - tm.assert_series_equal(result, expected) + self._load_iris_data() + self._load_raw_sql() - try: - sys.stdout = StringIO() - self.assertRaises(sqlite3.OperationalError, sql.tquery, - 'select * from blah', con=self.db) + self._load_test1_data() - self.assertRaises(sqlite3.OperationalError, sql.tquery, - 'select * from blah', con=self.db, retry=True) - finally: - sys.stdout = sys.__stdout__ + def test_read_sql(self): + self._read_sql_iris() - def test_uquery(self): - frame = tm.makeTimeDataFrame() - sql.write_frame(frame, name='test_table', con=self.db) - stmt = 'INSERT INTO test_table VALUES(2.314, -123.1, 1.234, 2.3)' - self.assertEqual(sql.uquery(stmt, con=self.db), 1) + def test_to_sql(self): + self._to_sql() - try: - sys.stdout = StringIO() - - self.assertRaises(sqlite3.OperationalError, sql.tquery, - 'insert into blah values (1)', con=self.db) - - self.assertRaises(sqlite3.OperationalError, sql.tquery, - 'insert into blah values (1)', con=self.db, - retry=True) - finally: - sys.stdout = sys.__stdout__ - - def test_keyword_as_column_names(self): - ''' - ''' - df = DataFrame({'From':np.ones(5)}) - sql.write_frame(df, con = self.db, name = 'testkeywords') - - def test_onecolumn_of_integer(self): - # GH 3628 - # a column_of_integers dataframe should transfer well to sql - - mono_df=DataFrame([1 , 2], columns=['c0']) - sql.write_frame(mono_df, con = self.db, name = 'mono_df') - # computing the sum via sql - con_x=self.db - the_sum=sum([my_c0[0] for my_c0 in con_x.execute("select * from mono_df")]) - # it should not fail, and gives 3 ( Issue #3628 ) - self.assertEqual(the_sum , 3) - - result = sql.read_frame("select * from mono_df",con_x) - tm.assert_frame_equal(result,mono_df) - - def test_if_exists(self): - df_if_exists_1 = DataFrame({'col1': [1, 2], 'col2': ['A', 'B']}) - df_if_exists_2 = DataFrame({'col1': [3, 4, 5], 'col2': ['C', 'D', 'E']}) - table_name = 'table_if_exists' - sql_select = "SELECT * FROM %s" % table_name - - def clean_up(test_table_to_drop): - """ - Drops tables created from individual tests - so no dependencies arise from sequential tests - """ - if sql.table_exists(test_table_to_drop, self.db, flavor='sqlite'): - cur = self.db.cursor() - cur.execute("DROP TABLE %s" % test_table_to_drop) - cur.close() - - # test if invalid value for if_exists raises appropriate error - self.assertRaises(ValueError, - sql.write_frame, - frame=df_if_exists_1, - con=self.db, - name=table_name, - flavor='sqlite', - if_exists='notvalidvalue') - clean_up(table_name) - - # test if_exists='fail' - sql.write_frame(frame=df_if_exists_1, con=self.db, name=table_name, - flavor='sqlite', if_exists='fail') - self.assertRaises(ValueError, - sql.write_frame, - frame=df_if_exists_1, - con=self.db, - name=table_name, - flavor='sqlite', - if_exists='fail') - - # test if_exists='replace' - sql.write_frame(frame=df_if_exists_1, con=self.db, name=table_name, - flavor='sqlite', if_exists='replace') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(1, 'A'), (2, 'B')]) - sql.write_frame(frame=df_if_exists_2, con=self.db, name=table_name, - flavor='sqlite', if_exists='replace') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(3, 'C'), (4, 'D'), (5, 'E')]) - clean_up(table_name) - - # test if_exists='append' - sql.write_frame(frame=df_if_exists_1, con=self.db, name=table_name, - flavor='sqlite', if_exists='fail') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(1, 'A'), (2, 'B')]) - sql.write_frame(frame=df_if_exists_2, con=self.db, name=table_name, - flavor='sqlite', if_exists='append') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E')]) - clean_up(table_name) - - -class TestMySQL(tm.TestCase): + def test_to_sql_fail(self): + self._to_sql_fail() + + def test_to_sql_replace(self): + self._to_sql_replace() + + def test_to_sql_append(self): + self._to_sql_append() + + def test_create_table(self): + temp_conn = self.connect() + temp_frame = DataFrame( + {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]}) + + pandasSQL = sql.PandasSQLAlchemy(temp_conn) + pandasSQL.to_sql(temp_frame, 'temp_frame') + + self.assertTrue( + temp_conn.has_table('temp_frame'), 'Table not written to DB') + + def test_drop_table(self): + temp_conn = self.connect() + + temp_frame = DataFrame( + {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]}) + + pandasSQL = sql.PandasSQLAlchemy(temp_conn) + pandasSQL.to_sql(temp_frame, 'temp_frame') + + self.assertTrue( + temp_conn.has_table('temp_frame'), 'Table not written to DB') + + pandasSQL.drop_table('temp_frame') + + self.assertFalse( + temp_conn.has_table('temp_frame'), 'Table not deleted from DB') + + def test_roundtrip(self): + self._roundtrip() + + def test_execute_sql(self): + self._execute_sql() + + def test_read_table(self): + iris_frame = sql.read_table("iris", con=self.conn) + self._check_iris_loaded_frame(iris_frame) + + def test_read_table_columns(self): + iris_frame = sql.read_table( + "iris", con=self.conn, columns=['SepalLength', 'SepalLength']) + tm.equalContents( + iris_frame.columns.values, ['SepalLength', 'SepalLength']) + + def test_read_table_absent(self): + self.assertRaises( + ValueError, sql.read_table, "this_doesnt_exist", con=self.conn) + + def test_default_type_convertion(self): + """ Test default type conversion""" + df = sql.read_table("types_test_data", self.conn) + self.assertTrue( + issubclass(df.FloatCol.dtype.type, np.floating), "FloatCol loaded with incorrect type") + self.assertTrue( + issubclass(df.IntCol.dtype.type, np.integer), "IntCol loaded with incorrect type") + self.assertTrue( + issubclass(df.BoolCol.dtype.type, np.integer), "BoolCol loaded with incorrect type") + + # Int column with NA values stays as float + self.assertTrue(issubclass(df.IntColWithNull.dtype.type, np.floating), + "IntColWithNull loaded with incorrect type") + # Non-native Bool column with NA values stays as float + self.assertTrue( + issubclass(df.BoolColWithNull.dtype.type, np.floating), "BoolCol loaded with incorrect type") + + def test_default_date_load(self): + df = sql.read_table("types_test_data", self.conn) + + # IMPORTANT - sqlite has no native date type, so shouldn't parse, but + # MySQL SHOULD be converted. + self.assertFalse( + issubclass(df.DateCol.dtype.type, np.datetime64), "DateCol loaded with incorrect type") + + def test_date_parsing(self): + """ Test date parsing """ + # No Parsing + df = sql.read_table("types_test_data", self.conn) + + df = sql.read_table( + "types_test_data", self.conn, parse_dates=['DateCol']) + self.assertTrue( + issubclass(df.DateCol.dtype.type, np.datetime64), "DateCol loaded with incorrect type") + + df = sql.read_table( + "types_test_data", self.conn, parse_dates={'DateCol': '%Y-%m-%d %H:%M:%S'}) + self.assertTrue( + issubclass(df.DateCol.dtype.type, np.datetime64), "DateCol loaded with incorrect type") + + df = sql.read_table("types_test_data", self.conn, parse_dates={ + 'DateCol': {'format': '%Y-%m-%d %H:%M:%S'}}) + self.assertTrue(issubclass(df.DateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") + + df = sql.read_table( + "types_test_data", self.conn, parse_dates=['IntDateCol']) + self.assertTrue(issubclass(df.IntDateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") + + df = sql.read_table( + "types_test_data", self.conn, parse_dates={'IntDateCol': 's'}) + self.assertTrue(issubclass(df.IntDateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") + + df = sql.read_table( + "types_test_data", self.conn, parse_dates={'IntDateCol': {'unit': 's'}}) + self.assertTrue(issubclass(df.IntDateCol.dtype.type, np.datetime64), + "IntDateCol loaded with incorrect type") + + +# --- Test SQLITE fallback +class TestSQLite(PandasSQLTest): + + ''' + Test the sqlalchemy backend against an in-memory sqlite database. + Assume that sqlalchemy takes case of the DB specifics + ''' + flavor = 'sqlite' + + def connect(self): + return sqlite3.connect(':memory:') + + def drop_table(self, table_name): + cur = self.conn.cursor() + cur.execute("DROP TABLE IF EXISTS %s" % table_name) + self.conn.commit() def setUp(self): - _skip_if_no_MySQLdb() - import MySQLdb - try: - # Try Travis defaults. - # No real user should allow root access with a blank password. - self.db = MySQLdb.connect(host='localhost', user='root', passwd='', - db='pandas_nosetest') - except: - pass - else: - return - try: - self.db = MySQLdb.connect(read_default_group='pandas') - except MySQLdb.ProgrammingError as e: - raise nose.SkipTest( - "Create a group of connection parameters under the heading " - "[pandas] in your system's mysql default file, " - "typically located at ~/.my.cnf or /etc/.my.cnf. ") - except MySQLdb.Error as e: - raise nose.SkipTest( - "Cannot connect to database. " - "Create a group of connection parameters under the heading " - "[pandas] in your system's mysql default file, " - "typically located at ~/.my.cnf or /etc/.my.cnf. ") - - def test_basic(self): - _skip_if_no_MySQLdb() - frame = tm.makeTimeDataFrame() - self._check_roundtrip(frame) - - def test_write_row_by_row(self): - _skip_if_no_MySQLdb() - frame = tm.makeTimeDataFrame() - frame.ix[0, 0] = np.nan - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = sql.get_schema(frame, 'test', 'mysql') - cur = self.db.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" - for idx, row in frame.iterrows(): - fmt_sql = format_query(ins, *row) - sql.tquery(fmt_sql, cur=cur) - - self.db.commit() - - result = sql.read_frame("select * from test", con=self.db) - result.index = frame.index - tm.assert_frame_equal(result, frame) - - def test_execute(self): - _skip_if_no_MySQLdb() - frame = tm.makeTimeDataFrame() - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = sql.get_schema(frame, 'test', 'mysql') - cur = self.db.cursor() - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Unknown table.*") - cur.execute(drop_sql) - cur.execute(create_sql) - ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" - - row = frame.ix[0] - sql.execute(ins, self.db, params=tuple(row)) - self.db.commit() - - result = sql.read_frame("select * from test", self.db) - result.index = frame.index[:1] - tm.assert_frame_equal(result, frame[:1]) - - def test_schema(self): - _skip_if_no_MySQLdb() - frame = tm.makeTimeDataFrame() - create_sql = sql.get_schema(frame, 'test', 'mysql') - lines = create_sql.splitlines() - for l in lines: - tokens = l.split(' ') - if len(tokens) == 2 and tokens[0] == 'A': - self.assert_(tokens[1] == 'DATETIME') - - frame = tm.makeTimeDataFrame() - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = sql.get_schema(frame, 'test', 'mysql', keys=['A', 'B'],) - lines = create_sql.splitlines() - self.assert_('PRIMARY KEY (A,B)' in create_sql) - cur = self.db.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - - def test_execute_fail(self): - _skip_if_no_MySQLdb() - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = """ - CREATE TABLE test - ( - a TEXT, - b TEXT, - c REAL, - PRIMARY KEY (a(5), b(5)) - ); - """ - cur = self.db.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.db) - sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.db) + self.conn = self.connect() + self.pandasSQL = sql.PandasSQLLegacy(self.conn, 'sqlite') - try: - sys.stdout = StringIO() - self.assertRaises(Exception, sql.execute, - 'INSERT INTO test VALUES("foo", "bar", 7)', - self.db) - finally: - sys.stdout = sys.__stdout__ - - def test_execute_closed_connection(self): - _skip_if_no_MySQLdb() - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = """ - CREATE TABLE test - ( - a TEXT, - b TEXT, - c REAL, - PRIMARY KEY (a(5), b(5)) - ); - """ - cur = self.db.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.db) - self.db.close() - try: - sys.stdout = StringIO() - self.assertRaises(Exception, sql.tquery, "select * from test", - con=self.db) - finally: - sys.stdout = sys.__stdout__ - - def test_na_roundtrip(self): - _skip_if_no_MySQLdb() - pass - - def _check_roundtrip(self, frame): - _skip_if_no_MySQLdb() - drop_sql = "DROP TABLE IF EXISTS test_table" - cur = self.db.cursor() - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Unknown table.*") - cur.execute(drop_sql) - sql.write_frame(frame, name='test_table', con=self.db, flavor='mysql') - result = sql.read_frame("select * from test_table", self.db) - - # HACK! Change this once indexes are handled properly. - result.index = frame.index - result.index.name = frame.index.name - - expected = frame - tm.assert_frame_equal(result, expected) - - frame['txt'] = ['a'] * len(frame) - frame2 = frame.copy() - index = Index(lrange(len(frame2))) + 10 - frame2['Idx'] = index - drop_sql = "DROP TABLE IF EXISTS test_table2" - cur = self.db.cursor() - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Unknown table.*") - cur.execute(drop_sql) - sql.write_frame(frame2, name='test_table2', con=self.db, flavor='mysql') - result = sql.read_frame("select * from test_table2", self.db, - index_col='Idx') - expected = frame.copy() - - # HACK! Change this once indexes are handled properly. - expected.index = index - expected.index.names = result.index.names - tm.assert_frame_equal(expected, result) + self._load_iris_data() + + self._load_test1_data() + + def test_invalid_flavor(self): + self.assertRaises( + NotImplementedError, sql.PandasSQLLegacy, self.conn, 'oracle') + + def test_read_sql(self): + self._read_sql_iris() + + def test_to_sql(self): + self._to_sql() + + def test_to_sql_fail(self): + self._to_sql_fail() + + def test_to_sql_replace(self): + self._to_sql_replace() + + def test_to_sql_append(self): + self._to_sql_append() + + def test_create_and_drop_table(self): + temp_frame = DataFrame( + {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]}) + + self.pandasSQL.to_sql(temp_frame, 'drop_test_frame') + + self.assertTrue(self.pandasSQL.has_table( + 'drop_test_frame'), 'Table not written to DB') + + self.pandasSQL.drop_table('drop_test_frame') + + self.assertFalse(self.pandasSQL.has_table( + 'drop_test_frame'), 'Table not deleted from DB') + + def test_roundtrip(self): + self._roundtrip() + + def test_execute_sql(self): + self._execute_sql() def test_tquery(self): - try: - import MySQLdb - except ImportError: - raise nose.SkipTest("no MySQLdb") - frame = tm.makeTimeDataFrame() - drop_sql = "DROP TABLE IF EXISTS test_table" - cur = self.db.cursor() - cur.execute(drop_sql) - sql.write_frame(frame, name='test_table', con=self.db, flavor='mysql') - result = sql.tquery("select A from test_table", self.db) - expected = frame.A - result = Series(result, frame.index) - tm.assert_series_equal(result, expected) + self._tquery() - try: - sys.stdout = StringIO() - self.assertRaises(MySQLdb.ProgrammingError, sql.tquery, - 'select * from blah', con=self.db) - self.assertRaises(MySQLdb.ProgrammingError, sql.tquery, - 'select * from blah', con=self.db, retry=True) - finally: - sys.stdout = sys.__stdout__ +class TestMySQL(TestSQLite): + flavor = 'mysql' - def test_uquery(self): + def drop_table(self, table_name): + cur = self.conn.cursor() + cur.execute("DROP TABLE IF EXISTS %s" % table_name) + self.conn.commit() + + def _count_rows(self, table_name): + cur = self._get_exec() + cur.execute( + "SELECT count(*) AS count_1 FROM %s" % table_name) + rows = cur.fetchall() + return rows[0][0] + + def connect(self): + return self.driver.connect(host='127.0.0.1', user='root', passwd='', db='pandas_nosetest') + + def setUp(self): try: - import MySQLdb + import pymysql + self.driver = pymysql + except ImportError: - raise nose.SkipTest("no MySQLdb") - frame = tm.makeTimeDataFrame() - drop_sql = "DROP TABLE IF EXISTS test_table" - cur = self.db.cursor() - cur.execute(drop_sql) - sql.write_frame(frame, name='test_table', con=self.db, flavor='mysql') - stmt = 'INSERT INTO test_table VALUES(2.314, -123.1, 1.234, 2.3)' - self.assertEqual(sql.uquery(stmt, con=self.db), 1) + raise nose.SkipTest - try: - sys.stdout = StringIO() - - self.assertRaises(MySQLdb.ProgrammingError, sql.tquery, - 'insert into blah values (1)', con=self.db) - - self.assertRaises(MySQLdb.ProgrammingError, sql.tquery, - 'insert into blah values (1)', con=self.db, - retry=True) - finally: - sys.stdout = sys.__stdout__ - - def test_keyword_as_column_names(self): - ''' - ''' - _skip_if_no_MySQLdb() - df = DataFrame({'From':np.ones(5)}) - sql.write_frame(df, con = self.db, name = 'testkeywords', - if_exists='replace', flavor='mysql') - - def test_if_exists(self): - _skip_if_no_MySQLdb() - df_if_exists_1 = DataFrame({'col1': [1, 2], 'col2': ['A', 'B']}) - df_if_exists_2 = DataFrame({'col1': [3, 4, 5], 'col2': ['C', 'D', 'E']}) - table_name = 'table_if_exists' - sql_select = "SELECT * FROM %s" % table_name - - def clean_up(test_table_to_drop): - """ - Drops tables created from individual tests - so no dependencies arise from sequential tests - """ - if sql.table_exists(test_table_to_drop, self.db, flavor='mysql'): - cur = self.db.cursor() - cur.execute("DROP TABLE %s" % test_table_to_drop) - cur.close() - - # test if invalid value for if_exists raises appropriate error - self.assertRaises(ValueError, - sql.write_frame, - frame=df_if_exists_1, - con=self.db, - name=table_name, - flavor='mysql', - if_exists='notvalidvalue') - clean_up(table_name) - - # test if_exists='fail' - sql.write_frame(frame=df_if_exists_1, con=self.db, name=table_name, - flavor='mysql', if_exists='fail') - self.assertRaises(ValueError, - sql.write_frame, - frame=df_if_exists_1, - con=self.db, - name=table_name, - flavor='mysql', - if_exists='fail') - - # test if_exists='replace' - sql.write_frame(frame=df_if_exists_1, con=self.db, name=table_name, - flavor='mysql', if_exists='replace') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(1, 'A'), (2, 'B')]) - sql.write_frame(frame=df_if_exists_2, con=self.db, name=table_name, - flavor='mysql', if_exists='replace') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(3, 'C'), (4, 'D'), (5, 'E')]) - clean_up(table_name) - - # test if_exists='append' - sql.write_frame(frame=df_if_exists_1, con=self.db, name=table_name, - flavor='mysql', if_exists='fail') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(1, 'A'), (2, 'B')]) - sql.write_frame(frame=df_if_exists_2, con=self.db, name=table_name, - flavor='mysql', if_exists='append') - self.assertEqual(sql.tquery(sql_select, con=self.db), - [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E')]) - clean_up(table_name) - - -if __name__ == '__main__': - nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'], - exit=False) + self.conn = self.connect() + self.pandasSQL = sql.PandasSQLLegacy(self.conn, 'mysql') + + self._load_iris_data() + + self._load_test1_data() + + def tearDown(self): + c = self.conn.cursor() + c.execute('SHOW TABLES') + for table in c.fetchall(): + c.execute('DROP TABLE %s' % table[0]) + self.conn.commit() + self.conn.close() + + +class TestMySQLAlchemy(TestSQLAlchemy): + flavor = 'mysql' + + def connect(self): + return sqlalchemy.create_engine( + 'mysql+{driver}://root@localhost/pandas_nosetest'.format(driver=self.driver)) + + def setUp(self): + if not SQLALCHEMY_INSTALLED: + raise nose.SkipTest('SQLAlchemy not installed') + + try: + import pymysql + self.driver = 'pymysql' + + except ImportError: + raise nose.SkipTest + + self.conn = self.connect() + self.pandasSQL = sql.PandasSQLAlchemy(self.conn) + + self._load_iris_data() + self._load_raw_sql() + + self._load_test1_data() + + def tearDown(self): + c = self.conn.execute('SHOW TABLES') + for table in c.fetchall(): + self.conn.execute('DROP TABLE %s' % table[0]) + + def test_default_date_load(self): + df = sql.read_table("types_test_data", self.conn) + + # IMPORTANT - sqlite has no native date type, so shouldn't parse, + # but MySQL SHOULD be converted. + self.assertTrue( + issubclass(df.DateCol.dtype.type, np.datetime64), "DateCol loaded with incorrect type")