diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 184c1a0104703..29ff08391e0e4 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -1,13 +1,14 @@ +# -*- coding: utf-8 -*- """ Collection of query wrappers / abstractions to both facilitate data retrieval and to reduce dependency on DB-specific API. """ + from __future__ import print_function, division -from datetime import datetime, date, timedelta +from datetime import datetime, date import warnings import traceback -import itertools import re import numpy as np @@ -15,12 +16,13 @@ import pandas.core.common as com from pandas.compat import lzip, map, zip, raise_with_traceback, string_types from pandas.core.api import DataFrame, Series -from pandas.core.common import notnull, isnull +from pandas.core.common import isnull from pandas.core.base import PandasObject from pandas.tseries.tools import to_datetime from contextlib import contextmanager + class SQLAlchemyRequired(ImportError): pass @@ -34,6 +36,7 @@ class DatabaseError(IOError): _SQLALCHEMY_INSTALLED = None + def _is_sqlalchemy_engine(con): global _SQLALCHEMY_INSTALLED if _SQLALCHEMY_INSTALLED is None: @@ -80,7 +83,8 @@ def _handle_date_column(col, format=None): else: if format in ['D', 's', 'ms', 'us', 'ns']: return to_datetime(col, coerce=True, unit=format) - elif issubclass(col.dtype.type, np.floating) or issubclass(col.dtype.type, np.integer): + elif (issubclass(col.dtype.type, np.floating) + or issubclass(col.dtype.type, np.integer)): # parse dates as timestamp format = 's' if format is None else format return to_datetime(col, coerce=True, unit=format) @@ -89,8 +93,9 @@ def _handle_date_column(col, format=None): def _parse_date_columns(data_frame, parse_dates): - """ Force non-datetime columns to be read as such. - Supports both string formatted and integer timestamp columns + """ + Force non-datetime columns to be read as such. + Supports both string formatted and integer timestamp columns """ # handle non-list entries for parse_dates gracefully if parse_dates is True or parse_dates is None or parse_dates is False: @@ -152,6 +157,7 @@ def _safe_fetch(cur): if excName == 'OperationalError': return [] + def tquery(sql, con=None, cur=None, retry=True): """ DEPRECATED. Returns list of tuples corresponding to each row in given sql @@ -209,8 +215,8 @@ def tquery(sql, con=None, cur=None, retry=True): def uquery(sql, con=None, cur=None, retry=True, params=None): """ - DEPRECATED. Does the same thing as tquery, but instead of returning results, it - returns the number of rows affected. Good for update queries. + DEPRECATED. Does the same thing as tquery, but instead of returning + results, it returns the number of rows affected. Good for update queries. To obtain the same result in the future, you can use the following: @@ -269,8 +275,8 @@ def read_sql_table(table_name, con, schema=None, index_col=None, con : SQLAlchemy engine Sqlite DBAPI connection mode not supported schema : string, default None - Name of SQL schema in database to query (if database flavor supports this). - If None, use default schema (default). + Name of SQL schema in database to query (if database flavor + supports this). If None, use default schema (default). index_col : string, optional Column to set as index coerce_float : boolean, default True @@ -343,7 +349,7 @@ def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None, decimal.Decimal) to floating point, useful for SQL result sets params : list, tuple or dict, optional List of parameters to pass to execute method. The syntax used - to pass parameters is database driver dependent. Check your + to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249's paramstyle, is supported. Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'} @@ -393,7 +399,7 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, decimal.Decimal) to floating point, useful for SQL result sets params : list, tuple or dict, optional List of parameters to pass to execute method. The syntax used - to pass parameters is database driver dependent. Check your + to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249's paramstyle, is supported. Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'} @@ -469,8 +475,8 @@ def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy engines. schema : string, default None - Name of SQL schema in database to write to (if database flavor supports - this). If None, use default schema (default). + Name of SQL schema in database to write to (if database flavor + supports this). If None, use default schema (default). if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. @@ -482,7 +488,7 @@ def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. chunksize : int, default None - If not None, then rows will be written in batches of this size at a + If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. """ @@ -535,7 +541,9 @@ def has_table(table_name, con, flavor='sqlite', schema=None): "and will be removed in future versions. " "MySQL will be further supported with SQLAlchemy engines.") -def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False): + +def pandasSQL_builder(con, flavor=None, schema=None, meta=None, + is_cursor=False): """ Convenience function to return the correct PandasSQL subclass based on the provided parameters @@ -622,7 +630,7 @@ def insert_data(self): "duplicate name in index/columns: {0}".format(err)) else: temp = self.frame - + column_names = list(map(str, temp.columns)) ncols = len(column_names) data_list = [None] * ncols @@ -631,7 +639,8 @@ def insert_data(self): for i in range(len(blocks)): b = blocks[i] if b.is_datetime: - # convert to microsecond resolution so this yields datetime.datetime + # convert to microsecond resolution so this yields + # datetime.datetime d = b.values.astype('M8[us]').astype(object) else: d = np.array(b.values, dtype=object) @@ -647,7 +656,7 @@ def insert_data(self): return column_names, data_list def _execute_insert(self, conn, keys, data_iter): - data = [dict( (k, v) for k, v in zip(keys, row) ) for row in data_iter] + data = [dict((k, v) for k, v in zip(keys, row)) for row in data_iter] conn.execute(self.insert_statement(), data) def insert(self, chunksize=None): @@ -658,11 +667,11 @@ def insert(self, chunksize=None): if nrows == 0: return - if chunksize is None: + if chunksize is None: chunksize = nrows elif chunksize == 0: raise ValueError('chunksize argument should be non-zero') - + chunks = int(nrows / chunksize) + 1 with self.pd_sql.run_transaction() as conn: @@ -715,7 +724,8 @@ def _index_name(self, index, index_label): else: return index_label # return the used column labels for the index columns - if nlevels == 1 and 'index' not in self.frame.columns and self.frame.index.name is None: + if (nlevels == 1 and 'index' not in self.frame.columns + and self.frame.index.name is None): return ['index'] else: return [l if l is not None else "level_{0}".format(i) @@ -739,7 +749,7 @@ def _get_column_names_and_types(self, dtype_mapper): column_names_and_types += [ (str(self.frame.columns[i]), - dtype_mapper(self.frame.iloc[:,i]), + dtype_mapper(self.frame.iloc[:, i]), False) for i in range(len(self.frame.columns)) ] @@ -756,9 +766,8 @@ def _create_table_setup(self): for name, typ, is_index in column_names_and_types] if self.keys is not None: - columns.append(PrimaryKeyConstraint(self.keys, - name=self.name+'_pk')) - + pkc = PrimaryKeyConstraint(self.keys, name=self.name + '_pk') + columns.append(pkc) schema = self.schema or self.pd_sql.meta.schema @@ -770,17 +779,16 @@ def _create_table_setup(self): return Table(self.name, meta, *columns, schema=schema) def _harmonize_columns(self, parse_dates=None): - """ Make a data_frame's column type align with an sql_table - column types - Need to work around limited NA value support. - Floats are always fine, ints must always - be floats if there are Null values. - Booleans are hard because converting bool column with None replaces - all Nones with false. Therefore only convert bool if there are no - NA values. - Datetimes should already be converted - to np.datetime if supported, but here we also force conversion - if required + """ + Make the DataFrame's column types align with the SQL table + column types. + Need to work around limited NA value support. Floats are always + fine, ints must always be floats if there are Null values. + Booleans are hard because converting bool column with None replaces + all Nones with false. Therefore only convert bool if there are no + NA values. + Datetimes should already be converted to np.datetime64 if supported, + but here we also force conversion if required """ # handle non-list entries for parse_dates gracefully if parse_dates is True or parse_dates is None or parse_dates is False: @@ -823,7 +831,7 @@ def _harmonize_columns(self, parse_dates=None): def _sqlalchemy_type(self, col): from sqlalchemy.types import (BigInteger, Float, Text, Boolean, - DateTime, Date, Time, Interval) + DateTime, Date, Time) if com.is_datetime64_dtype(col): try: @@ -874,12 +882,12 @@ class PandasSQL(PandasObject): """ def read_sql(self, *args, **kwargs): - raise ValueError( - "PandasSQL must be created with an SQLAlchemy engine or connection+sql flavor") + raise ValueError("PandasSQL must be created with an SQLAlchemy engine" + " or connection+sql flavor") def to_sql(self, *args, **kwargs): - raise ValueError( - "PandasSQL must be created with an SQLAlchemy engine or connection+sql flavor") + raise ValueError("PandasSQL must be created with an SQLAlchemy engine" + " or connection+sql flavor") class PandasSQLAlchemy(PandasSQL): @@ -897,7 +905,7 @@ def __init__(self, engine, schema=None, meta=None): self.meta = meta def run_transaction(self): - return self.engine.begin() + return self.engine.begin() def execute(self, *args, **kwargs): """Simple passthrough to SQLAlchemy engine""" @@ -964,8 +972,8 @@ def drop_table(self, table_name, schema=None): self.meta.clear() def _create_sql_schema(self, frame, table_name, keys=None): - table = PandasSQLTable(table_name, self, frame=frame, index=False, - keys=keys) + table = PandasSQLTable(table_name, self, frame=frame, index=False, + keys=keys) return str(table.sql_schema()) @@ -1025,9 +1033,11 @@ def _create_sql_schema(self, frame, table_name, keys=None): class PandasSQLTableLegacy(PandasSQLTable): - """Patch the PandasSQLTable for legacy support. - Instead of a table variable just use the Create Table - statement""" + """ + Patch the PandasSQLTable for legacy support. + Instead of a table variable just use the Create Table statement. + """ + def sql_schema(self): return str(";\n".join(self.table)) @@ -1058,11 +1068,11 @@ def _execute_insert(self, conn, keys, data_iter): conn.executemany(self.insert_statement(), data_list) def _create_table_setup(self): - """Return a list of SQL statement that create a table reflecting the + """ + Return a list of SQL statement that create a table reflecting the structure of a DataFrame. The first entry will be a CREATE TABLE statement while the rest will be CREATE INDEX statements """ - column_names_and_types = \ self._get_column_names_and_types(self._sql_type_name) @@ -1159,15 +1169,15 @@ def execute(self, *args, **kwargs): else: cur.execute(*args) return cur - except Exception as e: + except Exception as exc: try: self.con.rollback() except Exception: # pragma: no cover - ex = DatabaseError( - "Execution failed on sql: %s\n%s\nunable to rollback" % (args[0], e)) + ex = DatabaseError("Execution failed on sql: %s\n%s\nunable" + " to rollback" % (args[0], exc)) raise_with_traceback(ex) - ex = DatabaseError("Execution failed on sql '%s': %s" % (args[0], e)) + ex = DatabaseError("Execution failed on sql '%s': %s" % (args[0], exc)) raise_with_traceback(ex) def read_sql(self, sql, index_col=None, coerce_float=True, params=None, @@ -1213,11 +1223,11 @@ def to_sql(self, frame, name, if_exists='fail', index=True, `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. schema : string, default None - Ignored parameter included for compatability with SQLAlchemy version - of `to_sql`. + Ignored parameter included for compatability with SQLAlchemy + version of ``to_sql``. chunksize : int, default None - If not None, then rows will be written in batches of this size at a - time. If None, all rows will be written at once. + If not None, then rows will be written in batches of this + size at a time. If None, all rows will be written at once. """ table = PandasSQLTableLegacy( @@ -1243,8 +1253,8 @@ def drop_table(self, name, schema=None): self.execute(drop_sql) def _create_sql_schema(self, frame, table_name, keys=None): - table = PandasSQLTableLegacy(table_name, self, frame=frame, index=False, - keys=keys) + table = PandasSQLTableLegacy(table_name, self, frame=frame, + index=False, keys=keys) return str(table.sql_schema())