Skip to content

Fix to merge code from _create_table_statement and get_schema #8232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 15, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 95 additions & 125 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,33 +552,19 @@ class PandasSQLTable(PandasObject):
# TODO: support for multiIndex
def __init__(self, name, pandas_sql_engine, frame=None, index=True,
if_exists='fail', prefix='pandas', index_label=None,
schema=None):
schema=None, keys=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know in the fallback version of get_schema you could specify keys, and I think this functionality is useful, but I don't yet know what would be the best interface for that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe pass this keys kwarg directly to the _create_table_setup method? Then we can more easily change it later if we figured out how to handle this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can definitely do that but I'm not sure what the advantage would be. Since _create_table_setup is getting called by the constructor, that constructor will still have a keys kwarg, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no, I thought you were calling _create_table_setup directly, but this is used in the Table constructor. OK, then leave it.

self.name = name
self.pd_sql = pandas_sql_engine
self.prefix = prefix
self.frame = frame
self.index = self._index_name(index, index_label)
self.schema = schema
self.if_exists = if_exists
self.keys = keys

if frame is not None:
# We want to write a frame
if self.pd_sql.has_table(self.name, self.schema):
if if_exists == 'fail':
raise ValueError("Table '%s' already exists." % name)
elif if_exists == 'replace':
self.pd_sql.drop_table(self.name, self.schema)
self.table = self._create_table_setup()
self.create()
elif if_exists == 'append':
self.table = self.pd_sql.get_table(self.name, self.schema)
if self.table is None:
self.table = self._create_table_setup()
else:
raise ValueError(
"'{0}' is not valid for if_exists".format(if_exists))
else:
self.table = self._create_table_setup()
self.create()
# We want to initialize based on a dataframe
self.table = self._create_table_setup()
else:
# no data provided, read-only mode
self.table = self.pd_sql.get_table(self.name, self.schema)
Expand All @@ -593,9 +579,26 @@ def sql_schema(self):
from sqlalchemy.schema import CreateTable
return str(CreateTable(self.table))

def create(self):
def _execute_create(self):
# Inserting table into database, add to MetaData object
self.table = self.table.tometadata(self.pd_sql.meta)
self.table.create()

def create(self):
if self.exists():
if self.if_exists == 'fail':
raise ValueError("Table '%s' already exists." % self.name)
elif self.if_exists == 'replace':
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == 'append':
pass
else:
raise ValueError(
"'{0}' is not valid for if_exists".format(self.if_exists))
else:
self._execute_create()

def insert_statement(self):
return self.table.insert()

Expand Down Expand Up @@ -634,28 +637,31 @@ def insert_data(self):

return column_names, data_list

def insert(self, chunksize=None):
def get_session(self):
con = self.pd_sql.engine.connect()
return con.begin()

ins = self.insert_statement()
def _execute_insert(self, trans, keys, data_iter):
data = [dict( (k, v) for k, v in zip(keys, row) ) for row in data_iter]
trans.connection.execute(self.insert_statement(), data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also return the connection in get_session with engine.begin(), then you don't need to get back the connection from trans (see http://docs.sqlalchemy.org/en/rel_0_9/core/connections.html#using-transactions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still need to pass in the transaction for the legacy _execute_insert. What is the advantage of not getting connection from trans?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there is an advantage, apart from shorter code while being equivalent. But you are using a connection with the legacy get_session and _execute_insert, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy _execute_insert uses trans, not the connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you call it trans (and it is 'to run a transaction'), but the actual object you pass is a connection (in case of sqlite) or a cursor (in case of pymysql/MySQLdb)


def insert(self, chunksize=None):
keys, data_list = self.insert_data()

nrows = len(self.frame)
if chunksize is None:
chunksize = nrows
chunks = int(nrows / chunksize) + 1

con = self.pd_sql.engine.connect()
with con.begin() as trans:
with self.get_session() as trans:
for i in range(chunks):
start_i = i * chunksize
end_i = min((i + 1) * chunksize, nrows)
if start_i >= end_i:
break

chunk_list = [arr[start_i:end_i] for arr in data_list]
insert_list = [dict((k, v) for k, v in zip(keys, row))
for row in zip(*chunk_list)]
con.execute(ins, insert_list)
chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
self._execute_insert(trans, keys, chunk_iter)

def read(self, coerce_float=True, parse_dates=None, columns=None):

Expand Down Expand Up @@ -729,15 +735,27 @@ def _get_column_names_and_types(self, dtype_mapper):
return column_names_and_types

def _create_table_setup(self):
from sqlalchemy import Table, Column
from sqlalchemy import Table, Column, PrimaryKeyConstraint

column_names_and_types = \
self._get_column_names_and_types(self._sqlalchemy_type)

columns = [Column(name, typ, index=is_index)
for name, typ, is_index in column_names_and_types]

return Table(self.name, self.pd_sql.meta, *columns, schema=self.schema)
if self.keys is not None:
columns.append(PrimaryKeyConstraint(self.keys,
name=self.name+'_pk'))


schema = self.schema or self.pd_sql.meta.schema

# At this point, attach to new metadata, only attach to self.meta
# once table is created.
from sqlalchemy.schema import MetaData
meta = MetaData(self.pd_sql, schema=schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for what is this needed? The meta object is already constructed in PandasSQLEngine (or passed by the user) and should not be recreated here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments above. I create a blank MetaData object to avoid conflicts with the table already existing in self.pd_sql.meta. Only when create() is called to I move it over to self.pd_sql.meta.


return Table(self.name, meta, *columns, schema=schema)

def _harmonize_columns(self, parse_dates=None):
""" Make a data_frame's column type align with an sql_table
Expand Down Expand Up @@ -872,7 +890,6 @@ def execute(self, *args, **kwargs):

def read_table(self, table_name, index_col=None, coerce_float=True,
parse_dates=None, columns=None, schema=None):

table = PandasSQLTable(
table_name, self, index=index_col, schema=schema)
return table.read(coerce_float=coerce_float,
Expand Down Expand Up @@ -901,6 +918,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
table = PandasSQLTable(
name, self, frame=frame, index=index, if_exists=if_exists,
index_label=index_label, schema=schema)
table.create()
table.insert(chunksize)
# check for potentially case sensitivity issues (GH7815)
if name not in self.engine.table_names(schema=schema or self.meta.schema):
Expand Down Expand Up @@ -930,8 +948,9 @@ def drop_table(self, table_name, schema=None):
self.get_table(table_name, schema).drop()
self.meta.clear()

def _create_sql_schema(self, frame, table_name):
table = PandasSQLTable(table_name, self, frame=frame)
def _create_sql_schema(self, frame, table_name, keys=None):
table = PandasSQLTable(table_name, self, frame=frame, index=False,
keys=keys)
return str(table.sql_schema())


Expand Down Expand Up @@ -997,8 +1016,8 @@ class PandasSQLTableLegacy(PandasSQLTable):
def sql_schema(self):
return str(";\n".join(self.table))

def create(self):
with self.pd_sql.con:
def _execute_create(self):
with self.get_session():
for stmt in self.table:
self.pd_sql.execute(stmt)

Expand All @@ -1019,28 +1038,12 @@ def insert_statement(self):
self.name, col_names, wildcards)
return insert_statement

def insert(self, chunksize=None):

ins = self.insert_statement()
keys, data_list = self.insert_data()

nrows = len(self.frame)
if chunksize is None:
chunksize = nrows
chunks = int(nrows / chunksize) + 1
def get_session(self):
return self.pd_sql.con

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cursor (and cursor close) is gone, and you are now using just the connection? (sqlite has a connection.execute, but this is not standard DBAPI, so not sure this will work for mymysql/MySQLdb/...) Or does the context manager return a cursor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this seems a bit different for the different drivers.

sqlite3 returns a connection from the context manager, but has a connection.executemany, so that of course works (otherwise the tests would not pass at all)
pymysql returns a cursor, but does not close that cursor automatically (I don't know if this is a problem)
MySQLdb does now support such a context manager (from 1.2.5, so only the latest version), and does the same as pymysql
mysql.connector does not support it, so the current implementation fails with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right it seems a bit different for different drivers.
However, it seems both sqlite3 support the paradigm:

with conn as a:
  a.execute('blah')

as a way to run a transaction.

see:
https://docs.python.org/2/library/sqlite3.html#using-the-connection-as-a-context-manager
http://stackoverflow.com/questions/15568137/is-enter-and-exit-behaviour-for-connection-objects-specified-in-the-pyth

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, problem is that it is not supported by all mysql-drivers, so we actually shouldn't use it before the mysql support is fully removed (but this was not introduced in this PR, so we can also address this seperately in another PR)

with self.pd_sql.con:
for i in range(chunks):
start_i = i * chunksize
end_i = min((i + 1) * chunksize, nrows)
if start_i >= end_i:
break
chunk_list = [arr[start_i:end_i] for arr in data_list]
insert_list = [tuple((v for v in row))
for row in zip(*chunk_list)]
cur = self.pd_sql.con.cursor()
cur.executemany(ins, insert_list)
cur.close()
def _execute_insert(self, trans, keys, data_iter):
data_list = list(data_iter)
trans.executemany(self.insert_statement(), data_list)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can also be an iterator, see https://docs.python.org/2/library/sqlite3.html#sqlite3.Cursor.executemany, so the list(..) is not needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is only for sqlite of course, and for now we also have to support the mysql drivers, thus ignore my comment.


def _create_table_setup(self):
"""Return a list of SQL statement that create a table reflecting the
Expand All @@ -1061,21 +1064,25 @@ def _create_table_setup(self):
br_l = _SQL_SYMB[flv]['br_l'] # left val quote char
br_r = _SQL_SYMB[flv]['br_r'] # right val quote char

col_template = br_l + '%s' + br_r + ' %s'

columns = ',\n '.join(col_template % (cname, ctype)
for cname, ctype, _ in column_names_and_types)
template = """CREATE TABLE %(name)s (
%(columns)s
)"""
create_stmts = [template % {'name': self.name, 'columns': columns}, ]

ix_tpl = "CREATE INDEX ix_{tbl}_{col} ON {tbl} ({br_l}{col}{br_r})"
for cname, _, is_index in column_names_and_types:
if not is_index:
continue
create_stmts.append(ix_tpl.format(tbl=self.name, col=cname,
br_l=br_l, br_r=br_r))
create_tbl_stmts = [(br_l + '%s' + br_r + ' %s') % (cname, ctype)
for cname, ctype, _ in column_names_and_types]
if self.keys is not None and len(self.keys):
cnames_br = ",".join([br_l + c + br_r for c in self.keys])
create_tbl_stmts.append(
"CONSTRAINT {tbl}_pk PRIMARY KEY ({cnames_br})".format(
tbl=self.name, cnames_br=cnames_br))

create_stmts = ["CREATE TABLE " + self.name + " (\n" +
',\n '.join(create_tbl_stmts) + "\n)"]

ix_cols = [cname for cname, _, is_index in column_names_and_types
if is_index]
if len(ix_cols):
cnames = "_".join(ix_cols)
cnames_br = ",".join([br_l + c + br_r for c in ix_cols])
create_stmts.append(
"CREATE INDEX ix_{tbl}_{cnames} ON {tbl} ({cnames_br})".format(
tbl=self.name, cnames=cnames, cnames_br=cnames_br))

return create_stmts

Expand Down Expand Up @@ -1172,16 +1179,28 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
----------
frame: DataFrame
name: name of SQL table
flavor: {'sqlite', 'mysql'}, default 'sqlite'
if_exists: {'fail', 'replace', 'append'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if does not exist.
index : boolean, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Ignored parameter included for compatability with SQLAlchemy version
of `to_sql`.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.

"""
table = PandasSQLTableLegacy(
name, self, frame=frame, index=index, if_exists=if_exists,
index_label=index_label)
table.create()
table.insert(chunksize)

def has_table(self, name, schema=None):
Expand All @@ -1200,8 +1219,9 @@ def drop_table(self, name, schema=None):
drop_sql = "DROP TABLE %s" % name
self.execute(drop_sql)

def _create_sql_schema(self, frame, table_name):
table = PandasSQLTableLegacy(table_name, self, frame=frame)
def _create_sql_schema(self, frame, table_name, keys=None):
table = PandasSQLTableLegacy(table_name, self, frame=frame, index=False,
keys=keys)
return str(table.sql_schema())


Expand All @@ -1227,58 +1247,8 @@ def get_schema(frame, name, flavor='sqlite', keys=None, con=None):

"""

if con is None:
if flavor == 'mysql':
warnings.warn(_MYSQL_WARNING, FutureWarning)
return _get_schema_legacy(frame, name, flavor, keys)

pandas_sql = pandasSQL_builder(con=con, flavor=flavor)
return pandas_sql._create_sql_schema(frame, name)


def _get_schema_legacy(frame, name, flavor, keys=None):
"""Old function from 0.13.1. To keep backwards compatibility.
When mysql legacy support is dropped, it should be possible to
remove this code
"""

def get_sqltype(dtype, flavor):
pytype = dtype.type
pytype_name = "text"
if issubclass(pytype, np.floating):
pytype_name = "float"
elif issubclass(pytype, np.integer):
pytype_name = "int"
elif issubclass(pytype, np.datetime64) or pytype is datetime:
# Caution: np.datetime64 is also a subclass of np.number.
pytype_name = "datetime"
elif pytype is datetime.date:
pytype_name = "date"
elif issubclass(pytype, np.bool_):
pytype_name = "bool"

return _SQL_TYPES[pytype_name][flavor]

lookup_type = lambda dtype: get_sqltype(dtype, flavor)

column_types = lzip(frame.dtypes.index, map(lookup_type, frame.dtypes))
if flavor == 'sqlite':
columns = ',\n '.join('[%s] %s' % x for x in column_types)
else:
columns = ',\n '.join('`%s` %s' % x for x in column_types)

keystr = ''
if keys is not None:
if isinstance(keys, string_types):
keys = (keys,)
keystr = ', PRIMARY KEY (%s)' % ','.join(keys)
template = """CREATE TABLE %(name)s (
%(columns)s
%(keystr)s
);"""
create_statement = template % {'name': name, 'columns': columns,
'keystr': keystr}
return create_statement
return pandas_sql._create_sql_schema(frame, name, keys=keys)


# legacy names, with depreciation warnings and copied docs
Expand Down
13 changes: 11 additions & 2 deletions pandas/io/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,15 @@ def _get_index_columns(self, tbl_name):
def test_to_sql_save_index(self):
self._to_sql_save_index()

for ix_name, ix_col in zip(ixs.Key_name, ixs.Column_name):
if ix_name not in ix_cols:
ix_cols[ix_name] = []
ix_cols[ix_name].append(ix_col)
return ix_cols.values()

def test_to_sql_save_index(self):
self._to_sql_save_index()


#------------------------------------------------------------------------------
#--- Old tests from 0.13.1 (before refactor using sqlalchemy)
Expand Down Expand Up @@ -1545,7 +1554,7 @@ def test_schema(self):
frame = tm.makeTimeDataFrame()
create_sql = sql.get_schema(frame, 'test', 'sqlite', keys=['A', 'B'],)
lines = create_sql.splitlines()
self.assertTrue('PRIMARY KEY (A,B)' in create_sql)
self.assertTrue('PRIMARY KEY ([A],[B])' in create_sql)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not working? (I mean, the tests passed now. Is there a reason they don't pass anymore?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous get_schema --- which was a different code base --- did not escape column names as sqlalchemy-generated SQL statements do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes, it is in checking of the create statement. I thought it was in running code. OK, but not problem then.

cur = self.db.cursor()
cur.execute(create_sql)

Expand Down Expand Up @@ -1824,7 +1833,7 @@ def test_schema(self):
drop_sql = "DROP TABLE IF EXISTS test"
create_sql = sql.get_schema(frame, 'test', 'mysql', keys=['A', 'B'],)
lines = create_sql.splitlines()
self.assertTrue('PRIMARY KEY (A,B)' in create_sql)
self.assertTrue('PRIMARY KEY (`A`,`B`)' in create_sql)
cur = self.db.cursor()
cur.execute(drop_sql)
cur.execute(create_sql)
Expand Down