diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 9baae0330926d..90af526acc17b 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -293,8 +293,8 @@ def read_sql_table(table_name, con, schema=None, index_col=None, schema : string, default None Name of SQL schema in database to query (if database flavor supports this). If None, use default schema (default). - index_col : string, optional - Column to set as index + index_col : string or list of strings, optional + Column(s) to set as index coerce_float : boolean, default True Attempt to convert values to non-string, non-numeric objects (like decimal.Decimal) to floating point. Can result in loss of Precision. @@ -323,6 +323,7 @@ def read_sql_table(table_name, con, schema=None, index_col=None, read_sql """ + if not _is_sqlalchemy_engine(con): raise NotImplementedError("read_sql_table only supported for " "SQLAlchemy engines.") @@ -361,8 +362,8 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None, Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. - index_col : string, optional - Column name to use as index for the returned DataFrame object. + index_col : string or list of strings, optional + Column(s) name to use as index for the returned DataFrame object. coerce_float : boolean, default True Attempt to convert values to non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets @@ -414,8 +415,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. - index_col : string, optional - column name to use as index for the returned DataFrame object. + index_col : string or list of strings, optional + column(s) name to use as index for the returned DataFrame object. coerce_float : boolean, default True Attempt to convert values to non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets @@ -572,7 +573,7 @@ def has_table(table_name, con, flavor='sqlite', schema=None): def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False): """ - Convenience function to return the correct PandasSQL subclass based on the + Convenience function to return the correct SQLBackend subclass based on the provided parameters """ # When support for DBAPI connections is removed, @@ -588,8 +589,6 @@ def pandasSQL_builder(con, flavor=None, schema=None, meta=None, class SQLTable(PandasObject): """ For mapping Pandas tables to SQL tables. - Uses fact that table is reflected by SQLAlchemy to - do better type convertions. Also holds various flags needed to avoid having to pass them between functions all the time. """ @@ -601,52 +600,57 @@ def __init__(self, name, pandas_sql_engine, frame=None, index=True, self.pd_sql = pandas_sql_engine self.prefix = prefix self.frame = frame - self.index = self._index_name(index, index_label) self.schema = schema self.if_exists = if_exists self.keys = keys - if frame is not None: - # We want to initialize based on a dataframe - self.table = self._create_table_setup() - else: - # no data provided, read-only mode - self.table = self.pd_sql.get_table(self.name, self.schema) + self.index = None + + # We want to initialize based on a dataframe + if index is True: + # Use indexes from dataframe + nlevels = self.frame.index.nlevels + # if index_label is specified, set this as index name(s) + if index_label is not None: + if not isinstance(index_label, list): + index_label = [index_label] + if len(index_label) != nlevels: + raise ValueError( + "Length of 'index_label' should match number of " + "levels, which is {0}".format(nlevels)) + else: + self.index = index_label + else: + # return the used column labels for the index columns + if (nlevels == 1 and 'index' not in self.frame.columns + and self.frame.index.name is None): + self.index = ['index'] + else: + self.index = [l if l is not None else "level_{0}".format(i) + for i, l in enumerate(self.frame.index.names)] - if self.table is None: - raise ValueError("Could not init table '%s'" % name) + self.backend_table = self.pd_sql.get_backend_table_object(name, + frame, keys=keys, schema=schema, index=self.index) def exists(self): return self.pd_sql.has_table(self.name, self.schema) - def sql_schema(self): - from sqlalchemy.schema import CreateTable - return str(CreateTable(self.table).compile(self.pd_sql.engine)) - - def _execute_create(self): - # Inserting table into database, add to MetaData object - self.table = self.table.tometadata(self.pd_sql.meta) - self.table.create() - def create(self): if self.exists(): if self.if_exists == 'fail': raise ValueError("Table '%s' already exists." % self.name) elif self.if_exists == 'replace': self.pd_sql.drop_table(self.name, self.schema) - self._execute_create() + self.pd_sql.create_table(self.backend_table) elif self.if_exists == 'append': pass else: raise ValueError( "'{0}' is not valid for if_exists".format(self.if_exists)) else: - self._execute_create() + self.pd_sql.create_table(self.backend_table) - def insert_statement(self): - return self.table.insert() - - def insert_data(self): + def _data_for_insert(self): if self.index is not None: temp = self.frame.copy() temp.index.names = self.index @@ -682,12 +686,8 @@ def insert_data(self): return column_names, data_list - def _execute_insert(self, conn, keys, data_iter): - data = [dict((k, v) for k, v in zip(keys, row)) for row in data_iter] - conn.execute(self.insert_statement(), data) - def insert(self, chunksize=None): - keys, data_list = self.insert_data() + cols, data_list = self._data_for_insert() nrows = len(self.frame) @@ -701,7 +701,7 @@ def insert(self, chunksize=None): chunks = int(nrows / chunksize) + 1 - with self.pd_sql.run_transaction() as conn: + with self.pd_sql.run_transaction() as trans: for i in range(chunks): start_i = i * chunksize end_i = min((i + 1) * chunksize, nrows) @@ -709,240 +709,178 @@ def insert(self, chunksize=None): break chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list]) - self._execute_insert(conn, keys, chunk_iter) + self.pd_sql.insert_data(trans, self.backend_table, cols, chunk_iter) + + +class SQLBackend(PandasObject): + """Base class providing methods to read and write frames to/form a + database. These calls methods that backend-specific subclasses + must implement. + """ + - def _query_iterator(self, result, chunksize, columns, coerce_float=True, - parse_dates=None): + def _query_iterator(self, result, chunksize, columns, index_col=None, + coerce_float=True, parse_dates=None): """Return generator through chunked result set""" while True: data = result.fetchmany(chunksize) if not data: + self._close_result(result) break else: - self.frame = DataFrame.from_records( - data, columns=columns, coerce_float=coerce_float) + yield _wrap_result(data, columns, index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) - self._harmonize_columns(parse_dates=parse_dates) + def read_query(self, sql, index_col=None, coerce_float=True, + parse_dates=None, params=None, chunksize=None): + """Read SQL query into a DataFrame. - if self.index is not None: - self.frame.set_index(self.index, inplace=True) + Parameters + ---------- + sql : string + SQL query to be executed + index_col : string, optional + Column name to use as index for the returned DataFrame object. + coerce_float : boolean, default True + Attempt to convert values to non-string, non-numeric objects (like + decimal.Decimal) to floating point, useful for SQL result sets + params : list, tuple or dict, optional + List of parameters to pass to execute method. The syntax used + to pass parameters is database driver dependent. Check your + database driver documentation for which of the five syntax styles, + described in PEP 249's paramstyle, is supported. + Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'} + parse_dates : list or dict + - List of column names to parse as dates + - Dict of ``{column_name: format string}`` where format string is + strftime compatible in case of parsing string times or is one of + (D, s, ns, ms, us) in case of parsing integer timestamps + - Dict of ``{column_name: arg dict}``, where the arg dict corresponds + to the keyword arguments of :func:`pandas.to_datetime` + Especially useful with databases without native Datetime support, + such as SQLite - yield self.frame + Returns + ------- + DataFrame - def read(self, coerce_float=True, parse_dates=None, columns=None, - chunksize=None): + See also + -------- + read_sql_table : Read SQL database table into a DataFrame + read_sql - if columns is not None and len(columns) > 0: - from sqlalchemy import select - cols = [self.table.c[n] for n in columns] - if self.index is not None: - [cols.insert(0, self.table.c[idx]) for idx in self.index[::-1]] - sql_select = select(cols) - else: - sql_select = self.table.select() + """ + args = _convert_params(sql, params) - result = self.pd_sql.execute(sql_select) - column_names = result.keys() + result = self.execute(*args) + columns = self._get_result_columns(result) if chunksize is not None: - return self._query_iterator(result, chunksize, column_names, + return self._query_iterator(result, chunksize, columns, + index_col=index_col, coerce_float=coerce_float, parse_dates=parse_dates) else: - data = result.fetchall() - self.frame = DataFrame.from_records( - data, columns=column_names, coerce_float=coerce_float) - - self._harmonize_columns(parse_dates=parse_dates) - - if self.index is not None: - self.frame.set_index(self.index, inplace=True) - - return self.frame - - def _index_name(self, index, index_label): - # for writing: index=True to include index in sql table - if index is True: - nlevels = self.frame.index.nlevels - # if index_label is specified, set this as index name(s) - if index_label is not None: - if not isinstance(index_label, list): - index_label = [index_label] - if len(index_label) != nlevels: - raise ValueError( - "Length of 'index_label' should match number of " - "levels, which is {0}".format(nlevels)) - else: - return index_label - # return the used column labels for the index columns - if (nlevels == 1 and 'index' not in self.frame.columns - and self.frame.index.name is None): - return ['index'] - else: - return [l if l is not None else "level_{0}".format(i) - for i, l in enumerate(self.frame.index.names)] - - # for reading: index=(list of) string to specify column to set as index - elif isinstance(index, string_types): - return [index] - elif isinstance(index, list): - return index - else: - return None - - def _get_column_names_and_types(self, dtype_mapper): - column_names_and_types = [] - if self.index is not None: - for i, idx_label in enumerate(self.index): - idx_type = dtype_mapper( - self.frame.index.get_level_values(i)) - column_names_and_types.append((idx_label, idx_type, True)) + data = self._fetchall_as_list(result) + self._close_result(result) - column_names_and_types += [ - (str(self.frame.columns[i]), - dtype_mapper(self.frame.iloc[:, i]), - False) - for i in range(len(self.frame.columns)) - ] - - return column_names_and_types - - def _create_table_setup(self): - from sqlalchemy import Table, Column, PrimaryKeyConstraint - - column_names_and_types = \ - self._get_column_names_and_types(self._sqlalchemy_type) - - columns = [Column(name, typ, index=is_index) - for name, typ, is_index in column_names_and_types] - - if self.keys is not None: - pkc = PrimaryKeyConstraint(self.keys, name=self.name + '_pk') - columns.append(pkc) - - schema = self.schema or self.pd_sql.meta.schema - - # At this point, attach to new metadata, only attach to self.meta - # once table is created. - from sqlalchemy.schema import MetaData - meta = MetaData(self.pd_sql, schema=schema) + frame = _wrap_result(data, columns, index_col=index_col, + coerce_float=coerce_float, + parse_dates=parse_dates) + return frame - return Table(self.name, meta, *columns, schema=schema) + read_sql = read_query - def _harmonize_columns(self, parse_dates=None): - """ - Make the DataFrame's column types align with the SQL table - column types. - Need to work around limited NA value support. Floats are always - fine, ints must always be floats if there are Null values. - Booleans are hard because converting bool column with None replaces - all Nones with false. Therefore only convert bool if there are no - NA values. - Datetimes should already be converted to np.datetime64 if supported, - but here we also force conversion if required + def to_sql(self, frame, name, if_exists='fail', index=True, + index_label=None, schema=None, chunksize=None): """ - # handle non-list entries for parse_dates gracefully - if parse_dates is True or parse_dates is None or parse_dates is False: - parse_dates = [] - - if not hasattr(parse_dates, '__iter__'): - parse_dates = [parse_dates] - - for sql_col in self.table.columns: - col_name = sql_col.name - try: - df_col = self.frame[col_name] - # the type the dataframe column should have - col_type = self._numpy_type(sql_col.type) - - if col_type is datetime or col_type is date: - if not issubclass(df_col.dtype.type, np.datetime64): - self.frame[col_name] = _handle_date_column(df_col) + Write records stored in a DataFrame to a SQL database. - elif col_type is float: - # floats support NA, can always convert! - self.frame[col_name] = df_col.astype(col_type, copy=False) + Parameters + ---------- + frame : DataFrame + name : string + Name of SQL table + if_exists : {'fail', 'replace', 'append'}, default 'fail' + - fail: If table exists, do nothing. + - replace: If table exists, drop it, recreate it, and insert data. + - append: If table exists, insert data. Create if does not exist. + index : boolean, default True + Write DataFrame index as a column + index_label : string or sequence, default None + Column label for index column(s). If None is given (default) and + `index` is True, then the index names are used. + A sequence should be given if the DataFrame uses MultiIndex. + schema : string, default None + Name of SQL schema in database to write to (if database flavor + supports this). If specified, this overwrites the default + schema of the SQLDatabase object. Not supported for fallback + database mode. + chunksize : int, default None + If not None, then rows will be written in batches of this size at a + time. If None, all rows will be written at once. + + """ + table = SQLTable(name, self, frame=frame, index=index, + if_exists=if_exists, index_label=index_label, + schema=schema) + table.create() + table.insert(chunksize) - elif len(df_col) == df_col.count(): - # No NA values, can convert ints and bools - if col_type is np.dtype('int64') or col_type is bool: - self.frame[col_name] = df_col.astype(col_type, copy=False) + def has_table(self, table_name, schema=None): + # Check whether database has a table. Subclasses must implement. + raise NotImplementedError + + def drop_table(self, table_name, schema=None): + # Drop table table_name. Subclasses must implement. + raise NotImplementedError - # Handle date parsing - if col_name in parse_dates: - try: - fmt = parse_dates[col_name] - except TypeError: - fmt = None - self.frame[col_name] = _handle_date_column( - df_col, format=fmt) + def execute(self, *args, **kwargs): + # Execute statement on the database. Subclasses must implement. + raise NotImplementedError - except KeyError: - pass # this column not in results + def run_transaction(self): + # Returns a contextmanager for manager a transaction on the database. + raise NotImplementedError - def _sqlalchemy_type(self, col): - from sqlalchemy.types import (BigInteger, Float, Text, Boolean, - DateTime, Date, Time) + def get_backend_table_object(self, table_name, schema=None): + # Return a backend table object. + raise NotImplementedError - if com.is_datetime64_dtype(col): - try: - tz = col.tzinfo - return DateTime(timezone=True) - except: - return DateTime - if com.is_timedelta64_dtype(col): - warnings.warn("the 'timedelta' type is not supported, and will be " - "written as integer values (ns frequency) to the " - "database.", UserWarning) - return BigInteger - elif com.is_float_dtype(col): - return Float - elif com.is_integer_dtype(col): - # TODO: Refine integer size. - return BigInteger - elif com.is_bool_dtype(col): - return Boolean - inferred = lib.infer_dtype(com._ensure_object(col)) - if inferred == 'date': - return Date - if inferred == 'time': - return Time - return Text + def create_statement(self, backend_table): + # Return a SQL statement to create table corresponding to backend_table. + raise NotImplementedError - def _numpy_type(self, sqltype): - from sqlalchemy.types import Integer, Float, Boolean, DateTime, Date + def create_table(self, backend_table): + # Create table in database correspond to backend_table object. + raise NotImplementedError - if isinstance(sqltype, Float): - return float - if isinstance(sqltype, Integer): - # TODO: Refine integer size. - return np.dtype('int64') - if isinstance(sqltype, DateTime): - # Caution: np.datetime64 is also a subclass of np.number. - return datetime - if isinstance(sqltype, Date): - return date - if isinstance(sqltype, Boolean): - return bool - return object + def insert_data(self, trans, backend_table, keys, data_iter): + # Insert data from data_iter into backend_table within transaction trans. + raise NotImplementedError -class PandasSQL(PandasObject): - """ - Subclasses Should define read_sql and to_sql - """ +def _get_column_names_and_types(frame, dtype_mapper, index): + column_names_and_types = [] + if index is not None: + for i, idx_label in enumerate(index): + idx_type = dtype_mapper( + frame.index.get_level_values(i)) + column_names_and_types.append((idx_label, idx_type, True)) - def read_sql(self, *args, **kwargs): - raise ValueError("PandasSQL must be created with an SQLAlchemy engine" - " or connection+sql flavor") + column_names_and_types += [ + (str(frame.columns[i]), + dtype_mapper(frame.iloc[:, i]), + False) + for i in range(len(frame.columns)) + ] - def to_sql(self, *args, **kwargs): - raise ValueError("PandasSQL must be created with an SQLAlchemy engine" - " or connection+sql flavor") + return column_names_and_types -class SQLDatabase(PandasSQL): +class SQLDatabase(SQLBackend): """ This class enables convertion between DataFrame and SQL databases using SQLAlchemy to handle DataBase abstraction @@ -970,13 +908,6 @@ def __init__(self, engine, schema=None, meta=None): self.meta = meta - def run_transaction(self): - return self.engine.begin() - - def execute(self, *args, **kwargs): - """Simple passthrough to SQLAlchemy engine""" - return self.engine.execute(*args, **kwargs) - def read_table(self, table_name, index_col=None, coerce_float=True, parse_dates=None, columns=None, schema=None, chunksize=None): @@ -986,8 +917,8 @@ def read_table(self, table_name, index_col=None, coerce_float=True, ---------- table_name : string Name of SQL table in database - index_col : string, optional - Column to set as index + index_col : string or list of strings, optional + Column(s) to set as index coerce_float : boolean, default True Attempt to convert values to non-string, non-numeric objects (like decimal.Decimal) to floating point. This can result in @@ -1021,148 +952,224 @@ def read_table(self, table_name, index_col=None, coerce_float=True, SQLDatabase.read_query """ - table = SQLTable(table_name, self, index=index_col, schema=schema) - return table.read(coerce_float=coerce_float, - parse_dates=parse_dates, columns=columns, - chunksize=chunksize) - @staticmethod - def _query_iterator(result, chunksize, columns, index_col=None, - coerce_float=True, parse_dates=None): - """Return generator through chunked result set""" + table = self.get_table(table_name) - while True: - data = result.fetchmany(chunksize) - if not data: - break - else: - yield _wrap_result(data, columns, index_col=index_col, - coerce_float=coerce_float, - parse_dates=parse_dates) - - def read_query(self, sql, index_col=None, coerce_float=True, - parse_dates=None, params=None, chunksize=None): - """Read SQL query into a DataFrame. - - Parameters - ---------- - sql : string - SQL query to be executed - index_col : string, optional - Column name to use as index for the returned DataFrame object. - coerce_float : boolean, default True - Attempt to convert values to non-string, non-numeric objects (like - decimal.Decimal) to floating point, useful for SQL result sets - params : list, tuple or dict, optional - List of parameters to pass to execute method. The syntax used - to pass parameters is database driver dependent. Check your - database driver documentation for which of the five syntax styles, - described in PEP 249's paramstyle, is supported. - Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'} - parse_dates : list or dict - - List of column names to parse as dates - - Dict of ``{column_name: format string}`` where format string is - strftime compatible in case of parsing string times or is one of - (D, s, ns, ms, us) in case of parsing integer timestamps - - Dict of ``{column_name: arg dict}``, where the arg dict corresponds - to the keyword arguments of :func:`pandas.to_datetime` - Especially useful with databases without native Datetime support, - such as SQLite - - Returns - ------- - DataFrame - - See also - -------- - read_sql_table : Read SQL database table into a DataFrame - read_sql + if isinstance(index_col, string_types): + index = [index_col,] + elif isinstance(index_col, list): + index = index_col + else: + index = None - """ - args = _convert_params(sql, params) + if columns is not None and len(columns) > 0: + from sqlalchemy import select + cols = [table.c[n] for n in columns] + if index is not None: + [cols.insert(0, table.c[idx]) for idx in index[::-1]] + sql_select = select(cols) + else: + sql_select = table.select() - result = self.execute(*args) - columns = result.keys() + result = self.execute(sql_select) + column_names = result.keys() if chunksize is not None: - return self._query_iterator(result, chunksize, columns, - index_col=index_col, + return self._query_iterator(result, chunksize, column_names, coerce_float=coerce_float, parse_dates=parse_dates) else: data = result.fetchall() - frame = _wrap_result(data, columns, index_col=index_col, - coerce_float=coerce_float, - parse_dates=parse_dates) + frame = DataFrame.from_records( + data, columns=column_names, coerce_float=coerce_float) + + frame = self._harmonize_columns(table, frame, parse_dates=parse_dates) + + if index is not None: + frame.set_index(index, inplace=True) + return frame - read_sql = read_query + def current_schema(self, schema=None): + return schema or self.meta.schema - def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None, schema=None, chunksize=None): - """ - Write records stored in a DataFrame to a SQL database. + def _get_result_columns(self, result): + return result.keys() - Parameters - ---------- - frame : DataFrame - name : string - Name of SQL table - if_exists : {'fail', 'replace', 'append'}, default 'fail' - - fail: If table exists, do nothing. - - replace: If table exists, drop it, recreate it, and insert data. - - append: If table exists, insert data. Create if does not exist. - index : boolean, default True - Write DataFrame index as a column - index_label : string or sequence, default None - Column label for index column(s). If None is given (default) and - `index` is True, then the index names are used. - A sequence should be given if the DataFrame uses MultiIndex. - schema : string, default None - Name of SQL schema in database to write to (if database flavor - supports this). If specified, this overwrites the default - schema of the SQLDatabase object. - chunksize : int, default None - If not None, then rows will be written in batches of this size at a - time. If None, all rows will be written at once. - - """ - table = SQLTable(name, self, frame=frame, index=index, - if_exists=if_exists, index_label=index_label, - schema=schema) - table.create() - table.insert(chunksize) - # check for potentially case sensitivity issues (GH7815) - if name not in self.engine.table_names(schema=schema or self.meta.schema): - warnings.warn("The provided table name '{0}' is not found exactly " - "as such in the database after writing the table, " - "possibly due to case sensitivity issues. Consider " - "using lower case table names.".format(name), UserWarning) + def _fetchall_as_list(self, result): + return result.fetchall() - @property - def tables(self): - return self.meta.tables + def _close_result(self, result): + pass - def has_table(self, name, schema=None): - return self.engine.has_table(name, schema or self.meta.schema) + def has_table(self, table_name, schema=None): + return self.engine.has_table(table_name, self.current_schema(schema)) def get_table(self, table_name, schema=None): - schema = schema or self.meta.schema + schema = self.current_schema(schema) if schema: return self.meta.tables.get('.'.join([schema, table_name])) else: return self.meta.tables.get(table_name) def drop_table(self, table_name, schema=None): - schema = schema or self.meta.schema + schema = self.current_schema(schema) if self.engine.has_table(table_name, schema): self.meta.reflect(only=[table_name], schema=schema) self.get_table(table_name, schema).drop() self.meta.clear() - def _create_sql_schema(self, frame, table_name, keys=None): - table = SQLTable(table_name, self, frame=frame, index=False, keys=keys) - return str(table.sql_schema()) + def execute(self, *args, **kwargs): + """Simple passthrough to SQLAlchemy engine""" + return self.engine.execute(*args, **kwargs) + + def run_transaction(self): + return self.engine.begin() + + def get_backend_table_object(self, name, frame, keys=None, schema=None, + index=None): + from sqlalchemy import Table, Column, PrimaryKeyConstraint + + column_names_and_types = \ + _get_column_names_and_types(frame, self._sqlalchemy_type, index) + + columns = [Column(colname, typ, index=is_index) + for colname, typ, is_index in column_names_and_types] + + if keys is not None: + pkc = PrimaryKeyConstraint(keys, name=self.name + '_pk') + columns.append(pkc) + + schema = self.current_schema(schema) + + # At this point, attach to new metadata, only attach to self.meta + # once table is created. + from sqlalchemy.schema import MetaData + meta = MetaData(self.engine, schema=schema) + + return Table(name, meta, *columns, schema=schema) + + def create_statement(self, backend_table): + from sqlalchemy.schema import CreateTable + return str(CreateTable(backend_table).compile(self.engine)) + + def insert_data(self, trans, backend_table, cols, data_iter): + data = [dict((k, v) for k, v in zip(cols, row)) for row in data_iter] + trans.execute(backend_table.insert(), data) + + def create_table(self, backend_table): + # Inserting table into database, add to MetaData object + backend_table = backend_table.tometadata(self.meta) + backend_table.create() + + # check for potentially case sensitivity issues (GH7815) + if not self.has_table(backend_table.name, + schema=self.current_schema(backend_table.schema)): + warnings.warn("The provided table name '{0}' is not found exactly " + "as such in the database after writing the table, " + "possibly due to case sensitivity issues. Consider " + "using lower case table names.".format(backend_table.name), + UserWarning) + + def _sqlalchemy_type(self, col): + from sqlalchemy.types import (BigInteger, Float, Text, Boolean, + DateTime, Date, Time) + + if com.is_datetime64_dtype(col): + try: + tz = col.tzinfo + return DateTime(timezone=True) + except: + return DateTime + if com.is_timedelta64_dtype(col): + warnings.warn("the 'timedelta' type is not supported, and will be " + "written as integer values (ns frequency) to the " + "database.", UserWarning) + return BigInteger + elif com.is_float_dtype(col): + return Float + elif com.is_integer_dtype(col): + # TODO: Refine integer size. + return BigInteger + elif com.is_bool_dtype(col): + return Boolean + inferred = lib.infer_dtype(com._ensure_object(col)) + if inferred == 'date': + return Date + if inferred == 'time': + return Time + return Text + + def _harmonize_columns(self, table, frame, parse_dates=None): + """ + Make the DataFrame's column types align with the SQL table + column types. + Need to work around limited NA value support. Floats are always + fine, ints must always be floats if there are Null values. + Booleans are hard because converting bool column with None replaces + all Nones with false. Therefore only convert bool if there are no + NA values. + Datetimes should already be converted to np.datetime64 if supported, + but here we also force conversion if required + """ + # handle non-list entries for parse_dates gracefully + if parse_dates is True or parse_dates is None or parse_dates is False: + parse_dates = [] + + if not hasattr(parse_dates, '__iter__'): + parse_dates = [parse_dates] + + for sql_col in table.columns: + col_name = sql_col.name + try: + df_col = frame[col_name] + # the type the dataframe column should have + col_type = self._numpy_type(sql_col.type) + + if col_type is datetime or col_type is date: + if not issubclass(df_col.dtype.type, np.datetime64): + frame[col_name] = _handle_date_column(df_col) + + elif col_type is float: + # floats support NA, can always convert! + frame[col_name] = df_col.astype(col_type, copy=False) + + elif len(df_col) == df_col.count(): + # No NA values, can convert ints and bools + if col_type is np.dtype('int64') or col_type is bool: + frame[col_name] = df_col.astype(col_type, + copy=False) + + # Handle date parsing + if col_name in parse_dates: + try: + fmt = parse_dates[col_name] + except TypeError: + fmt = None + frame[col_name] = _handle_date_column( + df_col, format=fmt) + + except KeyError: + pass # this column not in results + + return frame + + def _numpy_type(self, sqltype): + from sqlalchemy.types import Integer, Float, Boolean, DateTime, Date + + if isinstance(sqltype, Float): + return float + if isinstance(sqltype, Integer): + # TODO: Refine integer size. + return np.dtype('int64') + if isinstance(sqltype, DateTime): + # Caution: np.datetime64 is also a subclass of np.number. + return datetime + if isinstance(sqltype, Date): + return date + if isinstance(sqltype, Boolean): + return bool + return object # ---- SQL without SQLAlchemy --- @@ -1220,110 +1227,15 @@ def _create_sql_schema(self, frame, table_name, keys=None): "underscores.") -class SQLiteTable(SQLTable): - """ - Patch the SQLTable for fallback support. - Instead of a table variable just use the Create Table statement. - """ - - def sql_schema(self): - return str(";\n".join(self.table)) - - def _execute_create(self): - with self.pd_sql.run_transaction() as conn: - for stmt in self.table: - conn.execute(stmt) - - def insert_statement(self): - names = list(map(str, self.frame.columns)) - flv = self.pd_sql.flavor - br_l = _SQL_SYMB[flv]['br_l'] # left val quote char - br_r = _SQL_SYMB[flv]['br_r'] # right val quote char - wld = _SQL_SYMB[flv]['wld'] # wildcard char - - if self.index is not None: - [names.insert(0, idx) for idx in self.index[::-1]] - - bracketed_names = [br_l + column + br_r for column in names] - col_names = ','.join(bracketed_names) - wildcards = ','.join([wld] * len(names)) - insert_statement = 'INSERT INTO %s (%s) VALUES (%s)' % ( - self.name, col_names, wildcards) - return insert_statement - - def _execute_insert(self, conn, keys, data_iter): - data_list = list(data_iter) - conn.executemany(self.insert_statement(), data_list) - - def _create_table_setup(self): - """ - Return a list of SQL statement that create a table reflecting the - structure of a DataFrame. The first entry will be a CREATE TABLE - statement while the rest will be CREATE INDEX statements - """ - column_names_and_types = \ - self._get_column_names_and_types(self._sql_type_name) - - pat = re.compile('\s+') - column_names = [col_name for col_name, _, _ in column_names_and_types] - if any(map(pat.search, column_names)): - warnings.warn(_SAFE_NAMES_WARNING) - - flv = self.pd_sql.flavor - - br_l = _SQL_SYMB[flv]['br_l'] # left val quote char - br_r = _SQL_SYMB[flv]['br_r'] # right val quote char - - create_tbl_stmts = [(br_l + '%s' + br_r + ' %s') % (cname, ctype) - for cname, ctype, _ in column_names_and_types] - if self.keys is not None and len(self.keys): - cnames_br = ",".join([br_l + c + br_r for c in self.keys]) - create_tbl_stmts.append( - "CONSTRAINT {tbl}_pk PRIMARY KEY ({cnames_br})".format( - tbl=self.name, cnames_br=cnames_br)) - - create_stmts = ["CREATE TABLE " + self.name + " (\n" + - ',\n '.join(create_tbl_stmts) + "\n)"] - - ix_cols = [cname for cname, _, is_index in column_names_and_types - if is_index] - if len(ix_cols): - cnames = "_".join(ix_cols) - cnames_br = ",".join([br_l + c + br_r for c in ix_cols]) - create_stmts.append( - "CREATE INDEX ix_{tbl}_{cnames} ON {tbl} ({cnames_br})".format( - tbl=self.name, cnames=cnames, cnames_br=cnames_br)) - - return create_stmts - - def _sql_type_name(self, col): - pytype = col.dtype.type - pytype_name = "text" - if issubclass(pytype, np.floating): - pytype_name = "float" - elif com.is_timedelta64_dtype(pytype): - warnings.warn("the 'timedelta' type is not supported, and will be " - "written as integer values (ns frequency) to the " - "database.", UserWarning) - pytype_name = "int" - elif issubclass(pytype, np.integer): - pytype_name = "int" - elif issubclass(pytype, np.datetime64) or pytype is datetime: - # Caution: np.datetime64 is also a subclass of np.number. - pytype_name = "datetime" - elif issubclass(pytype, np.bool_): - pytype_name = "bool" - elif issubclass(pytype, np.object): - pytype = lib.infer_dtype(com._ensure_object(col)) - if pytype == "date": - pytype_name = "date" - elif pytype == "time": - pytype_name = "time" - - return _SQL_TYPES[pytype_name][self.pd_sql.flavor] +class SQLiteBackendTable(PandasObject): + def __init__(self, name, frame, keys, index): + self.name = name + self.frame = frame + self.keys = keys + self.index = index -class SQLiteDatabase(PandasSQL): +class SQLiteDatabase(SQLBackend): """ Version of SQLDatabase to support sqlite connections (fallback without sqlalchemy). This should only be used internally. @@ -1378,88 +1290,27 @@ def execute(self, *args, **kwargs): " to rollback" % (args[0], exc)) raise_with_traceback(ex) - ex = DatabaseError("Execution failed on sql '%s': %s" % (args[0], exc)) + ex = DatabaseError("Execution failed on sql '%s': %s" % + (args[0], exc)) raise_with_traceback(ex) - @staticmethod - def _query_iterator(cursor, chunksize, columns, index_col=None, - coerce_float=True, parse_dates=None): - """Return generator through chunked result set""" - - while True: - data = cursor.fetchmany(chunksize) - if not data: - cursor.close() - break - else: - yield _wrap_result(data, columns, index_col=index_col, - coerce_float=coerce_float, - parse_dates=parse_dates) - - def read_query(self, sql, index_col=None, coerce_float=True, params=None, - parse_dates=None, chunksize=None): - - args = _convert_params(sql, params) - cursor = self.execute(*args) - columns = [col_desc[0] for col_desc in cursor.description] - - if chunksize is not None: - return self._query_iterator(cursor, chunksize, columns, - index_col=index_col, - coerce_float=coerce_float, - parse_dates=parse_dates) - else: - data = self._fetchall_as_list(cursor) - cursor.close() - - frame = _wrap_result(data, columns, index_col=index_col, - coerce_float=coerce_float, - parse_dates=parse_dates) - return frame - + def _get_result_columns(self, result): + return [col_desc[0] for col_desc in result.description] + def _fetchall_as_list(self, cur): result = cur.fetchall() if not isinstance(result, list): result = list(result) return result - def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None, schema=None, chunksize=None): - """ - Write records stored in a DataFrame to a SQL database. - - Parameters - ---------- - frame: DataFrame - name: name of SQL table - if_exists: {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. - index : boolean, default True - Write DataFrame index as a column - index_label : string or sequence, default None - Column label for index column(s). If None is given (default) and - `index` is True, then the index names are used. - A sequence should be given if the DataFrame uses MultiIndex. - schema : string, default None - Ignored parameter included for compatability with SQLAlchemy - version of ``to_sql``. - chunksize : int, default None - If not None, then rows will be written in batches of this - size at a time. If None, all rows will be written at once. - - """ - table = SQLiteTable(name, self, frame=frame, index=index, - if_exists=if_exists, index_label=index_label) - table.create() - table.insert(chunksize) + def _close_result(self, cur): + cur.close() - def has_table(self, name, schema=None): + def has_table(self, table_name, schema=None): flavor_map = { 'sqlite': ("SELECT name FROM sqlite_master " - "WHERE type='table' AND name='%s';") % name, - 'mysql': "SHOW TABLES LIKE '%s'" % name} + "WHERE type='table' AND name='%s';") % table_name, + 'mysql': "SHOW TABLES LIKE '%s'" % table_name} query = flavor_map.get(self.flavor) return len(self.execute(query).fetchall()) > 0 @@ -1471,10 +1322,106 @@ def drop_table(self, name, schema=None): drop_sql = "DROP TABLE %s" % name self.execute(drop_sql) - def _create_sql_schema(self, frame, table_name, keys=None): - table = SQLiteTable(table_name, self, frame=frame, index=False, - keys=keys) - return str(table.sql_schema()) + def create_statement(self, backend_table): + return str(";\n".join(self._table_create_statements(backend_table))) + + def create_table(self, backend_table): + with self.run_transaction() as conn: + for stmt in self._table_create_statements(backend_table): + conn.execute(stmt) + + def insert_data(self, trans, backend_table, cols, data_iter): + data_list = list(data_iter) + + names = list(map(str, backend_table.frame.columns)) + flv = self.flavor + br_l = _SQL_SYMB[flv]['br_l'] # left val quote char + br_r = _SQL_SYMB[flv]['br_r'] # right val quote char + wld = _SQL_SYMB[flv]['wld'] # wildcard char + + if backend_table.index is not None: + [names.insert(0, idx) for idx in backend_table.index[::-1]] + + bracketed_names = [br_l + column + br_r for column in names] + col_names = ','.join(bracketed_names) + wildcards = ','.join([wld] * len(names)) + insert_statement = 'INSERT INTO %s (%s) VALUES (%s)' % ( + backend_table.name, col_names, wildcards) + + trans.executemany(insert_statement, data_list) + + + def _sql_type_name(self, col): + pytype = col.dtype.type + pytype_name = "text" + if issubclass(pytype, np.floating): + pytype_name = "float" + elif com.is_timedelta64_dtype(pytype): + warnings.warn("the 'timedelta' type is not supported, and will be " + "written as integer values (ns frequency) to the " + "database.", UserWarning) + pytype_name = "int" + elif issubclass(pytype, np.integer): + pytype_name = "int" + elif issubclass(pytype, np.datetime64) or pytype is datetime: + # Caution: np.datetime64 is also a subclass of np.number. + pytype_name = "datetime" + elif issubclass(pytype, np.bool_): + pytype_name = "bool" + elif issubclass(pytype, np.object): + pytype = lib.infer_dtype(com._ensure_object(col)) + if pytype == "date": + pytype_name = "date" + elif pytype == "time": + pytype_name = "time" + + return _SQL_TYPES[pytype_name][self.flavor] + + def _table_create_statements(self, backend_table): + """ + Return a list of SQL statement that create a table reflecting the + structure of a DataFrame. The first entry will be a CREATE TABLE + statement while the rest will be CREATE INDEX statements + """ + column_names_and_types = _get_column_names_and_types( + backend_table.frame, self._sql_type_name, backend_table.index) + + pat = re.compile('\s+') + column_names = [col_name for col_name, _, _ in column_names_and_types] + if any(map(pat.search, column_names)): + warnings.warn(_SAFE_NAMES_WARNING) + + flv = self.flavor + + br_l = _SQL_SYMB[flv]['br_l'] # left val quote char + br_r = _SQL_SYMB[flv]['br_r'] # right val quote char + + create_tbl_stmts = [(br_l + '%s' + br_r + ' %s') % (cname, ctype) + for cname, ctype, _ in column_names_and_types] + if backend_table.keys is not None and len(backend_table.keys): + cnames_br = ",".join([br_l + c + br_r for c in backend_table.keys]) + create_tbl_stmts.append( + "CONSTRAINT {tbl}_pk PRIMARY KEY ({cnames_br})".format( + tbl=backend_table.name, cnames_br=cnames_br)) + + create_stmts = ["CREATE TABLE " + backend_table.name + " (\n" + + ',\n '.join(create_tbl_stmts) + "\n)"] + + ix_cols = [cname for cname, _, is_index in column_names_and_types + if is_index] + if len(ix_cols): + cnames = "_".join(ix_cols) + cnames_br = ",".join([br_l + c + br_r for c in ix_cols]) + create_stmts.append( + "CREATE INDEX ix_{tbl}_{cnames} ON {tbl} ({cnames_br})".format( + tbl=backend_table.name, cnames=cnames, cnames_br=cnames_br)) + return create_stmts + + + def get_backend_table_object(self, name, frame, keys=None, schema=None, + index=None): + return SQLiteBackendTable(name, frame, keys, index) + def get_schema(frame, name, flavor='sqlite', keys=None, con=None): @@ -1500,7 +1447,8 @@ def get_schema(frame, name, flavor='sqlite', keys=None, con=None): """ pandas_sql = pandasSQL_builder(con=con, flavor=flavor) - return pandas_sql._create_sql_schema(frame, name, keys=keys) + table = pandas_sql.get_backend_table_object(name, frame, keys=keys) + return pandas_sql.create_statement(table) # legacy names, with depreciation warnings and copied docs