From 5c9058b171f2f33c69e8d672b8f9ef49682f215b Mon Sep 17 00:00:00 2001 From: Tiago Antao Date: Sat, 29 Nov 2014 13:31:22 +0000 Subject: [PATCH] dtype costumization on sql read_table (#8926) testing dtypes parameter dtypes defaults to None dtype type checking and docstrings dtype exception checking sphinx dtypes corrections if/else to or simplification informative exception of errouneous SQLAlchemy subclassing type checking basic documentation of the dtypes feature issue number correct test position issue correction SQLite dtype configuration Testing Legagy SQLite with dtype configuration changed the position of a dtype check assert_raise assert_raise return user specified dtype, not SQL_TYPE test cleanup better docstrings better docstrings docs and test refactoring Do not test on MySQL legacy dtypes->dtype dtypes->dtype assert->assertTrue Type test in mysql correct mysql test type reverting unintended change --- doc/source/io.rst | 8 ++++++ doc/source/whatsnew/v0.15.2.txt | 1 + pandas/core/generic.py | 7 +++-- pandas/io/sql.py | 44 +++++++++++++++++++++++------ pandas/io/tests/test_sql.py | 50 +++++++++++++++++++++++++++++++-- 5 files changed, 97 insertions(+), 13 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index bf8776d4bc396..e05840bfdfd5e 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3413,6 +3413,14 @@ With some databases, writing large DataFrames can result in errors due to packet Because of this, reading the database table back in does **not** generate a categorical. +.. note:: + + You can specify the SQL type of any of the columns by using the dtypes + parameter (a dictionary mapping column names to SQLAlchemy types). This + can be useful in cases where columns with NULL values are inferred by + Pandas to an excessively general datatype (e.g. a boolean column is is + inferred to be object because it has NULLs). + Reading Tables ~~~~~~~~~~~~~~ diff --git a/doc/source/whatsnew/v0.15.2.txt b/doc/source/whatsnew/v0.15.2.txt index 3aa50ad609064..78e915ba83d10 100644 --- a/doc/source/whatsnew/v0.15.2.txt +++ b/doc/source/whatsnew/v0.15.2.txt @@ -61,6 +61,7 @@ API changes Enhancements ~~~~~~~~~~~~ +- Added the ability to specify the SQL type of columns when writing a DataFrame to a database (:issue:`8778`). - Added ability to export Categorical data to Stata (:issue:`8633`). See :ref:`here ` for limitations of categorical variables exported to Stata data files. - Added ability to export Categorical data to to/from HDF5 (:issue:`7621`). Queries work the same as if it was an object array. However, the ``category`` dtyped data is stored in a more efficient manner. See :ref:`here ` for an example and caveats w.r.t. prior versions of pandas. - Added support for ``utcfromtimestamp()``, ``fromtimestamp()``, and ``combine()`` on `Timestamp` class (:issue:`5351`). diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 7201428e6b935..52f37ee24f69a 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -922,7 +922,7 @@ def to_msgpack(self, path_or_buf=None, **kwargs): return packers.to_msgpack(path_or_buf, self, **kwargs) def to_sql(self, name, con, flavor='sqlite', schema=None, if_exists='fail', - index=True, index_label=None, chunksize=None): + index=True, index_label=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. @@ -954,12 +954,15 @@ def to_sql(self, name, con, flavor='sqlite', schema=None, if_exists='fail', chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. + dtype : Dictionary of column name to SQLAlchemy type, default None + Optional datatypes for SQL columns. """ from pandas.io import sql sql.to_sql( self, name, con, flavor=flavor, schema=schema, if_exists=if_exists, - index=index, index_label=index_label, chunksize=chunksize) + index=index, index_label=index_label, chunksize=chunksize, + dtype=dtype) def to_pickle(self, path): """ diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 9baae0330926d..bb810b8509ef3 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -484,7 +484,7 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', - index=True, index_label=None, chunksize=None): + index=True, index_label=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. @@ -517,6 +517,8 @@ def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. + dtype : dictionary of column name to SQLAchemy type, default None + optional datatypes for SQL columns. """ if if_exists not in ('fail', 'replace', 'append'): @@ -531,7 +533,7 @@ def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index, index_label=index_label, schema=schema, - chunksize=chunksize) + chunksize=chunksize, dtype=dtype) def has_table(table_name, con, flavor='sqlite', schema=None): @@ -596,7 +598,7 @@ class SQLTable(PandasObject): # TODO: support for multiIndex def __init__(self, name, pandas_sql_engine, frame=None, index=True, if_exists='fail', prefix='pandas', index_label=None, - schema=None, keys=None): + schema=None, keys=None, dtype=None): self.name = name self.pd_sql = pandas_sql_engine self.prefix = prefix @@ -605,6 +607,7 @@ def __init__(self, name, pandas_sql_engine, frame=None, index=True, self.schema = schema self.if_exists = if_exists self.keys = keys + self.dtype = dtype if frame is not None: # We want to initialize based on a dataframe @@ -885,6 +888,10 @@ def _sqlalchemy_type(self, col): from sqlalchemy.types import (BigInteger, Float, Text, Boolean, DateTime, Date, Time) + dtype = self.dtype or {} + if col.name in dtype: + return self.dtype[col.name] + if com.is_datetime64_dtype(col): try: tz = col.tzinfo @@ -1099,7 +1106,7 @@ def read_query(self, sql, index_col=None, coerce_float=True, read_sql = read_query def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None, schema=None, chunksize=None): + index_label=None, schema=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. @@ -1125,11 +1132,20 @@ def to_sql(self, frame, name, if_exists='fail', index=True, chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. - + dtype : dictionary of column name to SQLAlchemy type, default None + Optional datatypes for SQL columns. + """ + if dtype is not None: + import sqlalchemy.sql.type_api as type_api + for col, my_type in dtype.items(): + if not issubclass(my_type, type_api.TypeEngine): + raise ValueError('The type of %s is not a SQLAlchemy ' + 'type ' % col) + table = SQLTable(name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label, - schema=schema) + schema=schema, dtype=dtype) table.create() table.insert(chunksize) # check for potentially case sensitivity issues (GH7815) @@ -1297,6 +1313,9 @@ def _create_table_setup(self): return create_stmts def _sql_type_name(self, col): + dtype = self.dtype or {} + if col.name in dtype: + return dtype[col.name] pytype = col.dtype.type pytype_name = "text" if issubclass(pytype, np.floating): @@ -1424,7 +1443,7 @@ def _fetchall_as_list(self, cur): return result def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None, schema=None, chunksize=None): + index_label=None, schema=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. @@ -1448,10 +1467,19 @@ def to_sql(self, frame, name, if_exists='fail', index=True, chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. + dtype : dictionary of column_name to SQLite string type, default None + optional datatypes for SQL columns. """ + if dtype is not None: + for col, my_type in dtype.items(): + if not isinstance(my_type, str): + raise ValueError('%s (%s) not a string' % ( + col, str(my_type))) + table = SQLiteTable(name, self, frame=frame, index=index, - if_exists=if_exists, index_label=index_label) + if_exists=if_exists, index_label=index_label, + dtype=dtype) table.create() table.insert(chunksize) diff --git a/pandas/io/tests/test_sql.py b/pandas/io/tests/test_sql.py index 2a7aec30e7c50..eb46df7686d18 100644 --- a/pandas/io/tests/test_sql.py +++ b/pandas/io/tests/test_sql.py @@ -41,6 +41,8 @@ try: import sqlalchemy + import sqlalchemy.schema + import sqlalchemy.sql.sqltypes as sqltypes SQLALCHEMY_INSTALLED = True except ImportError: SQLALCHEMY_INSTALLED = False @@ -339,7 +341,7 @@ def _transaction_test(self): self.pandasSQL.execute("CREATE TABLE test_trans (A INT, B TEXT)") ins_sql = "INSERT INTO test_trans (A,B) VALUES (1, 'blah')" - + # Make sure when transaction is rolled back, no rows get inserted try: with self.pandasSQL.run_transaction() as trans: @@ -350,7 +352,7 @@ def _transaction_test(self): pass res = self.pandasSQL.read_query('SELECT * FROM test_trans') self.assertEqual(len(res), 0) - + # Make sure when transaction is committed, rows do get inserted with self.pandasSQL.run_transaction() as trans: trans.execute(ins_sql) @@ -1167,6 +1169,26 @@ def test_get_schema_create_table(self): tm.assert_frame_equal(returned_df, blank_test_df) self.drop_table(tbl) + def test_dtype(self): + cols = ['A', 'B'] + data = [(0.8, True), + (0.9, None)] + df = DataFrame(data, columns=cols) + df.to_sql('dtype_test', self.conn) + df.to_sql('dtype_test2', self.conn, dtype={'B': sqlalchemy.Boolean}) + meta = sqlalchemy.schema.MetaData(bind=self.conn) + meta.reflect() + self.assertTrue(isinstance(meta.tables['dtype_test'].columns['B'].type, + sqltypes.TEXT)) + if self.flavor == 'mysql': + my_type = sqltypes.Integer + else: + my_type = sqltypes.Boolean + self.assertTrue(isinstance(meta.tables['dtype_test2'].columns['B'].type, + my_type)) + self.assertRaises(ValueError, df.to_sql, + 'error', self.conn, dtype={'B': bool}) + class TestSQLiteAlchemy(_TestSQLAlchemy): """ @@ -1467,7 +1489,7 @@ def test_datetime_time(self): if self.flavor == 'sqlite': self.assertRaises(sqlite3.InterfaceError, sql.to_sql, df, 'test_time', self.conn) - + def _get_index_columns(self, tbl_name): ixs = sql.read_sql_query( "SELECT * FROM sqlite_master WHERE type = 'index' " + @@ -1485,6 +1507,28 @@ def test_to_sql_save_index(self): def test_transactions(self): self._transaction_test() + def test_dtype(self): + if self.flavor == 'mysql': + raise nose.SkipTest('Not applicable to MySQL legacy') + cols = ['A', 'B'] + data = [(0.8, True), + (0.9, None)] + df = DataFrame(data, columns=cols) + df.to_sql('dtype_test', self.conn) + df.to_sql('dtype_test2', self.conn, dtype={'B': 'bool'}) + + def get_column_type(table, column): + recs = self.conn.execute('PRAGMA table_info(%s)' % table) + for cid, name, ctype, not_null, default, pk in recs: + if name == column: + return ctype + raise ValueError('Table %s, column %s not found' % (table, column)) + + self.assertEqual(get_column_type('dtype_test', 'B'), 'TEXT') + self.assertEqual(get_column_type('dtype_test2', 'B'), 'bool') + self.assertRaises(ValueError, df.to_sql, + 'error', self.conn, dtype={'B': bool}) + class TestMySQLLegacy(TestSQLiteFallback): """ Test the legacy mode against a MySQL database.