Skip to content

Commit 1e90ba3

Browse files
ENH: add schema support to sql functions + add tests
1 parent a997356 commit 1e90ba3

File tree

6 files changed

+121
-34
lines changed

6 files changed

+121
-34
lines changed

ci/requirements-2.6.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pytz==2013b
55
http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz
66
html5lib==1.0b2
77
numexpr==1.4.2
8-
sqlalchemy==0.7.1
8+
sqlalchemy==0.7.4
99
pymysql==0.6.0
1010
psycopg2==2.5
1111
scipy==0.11.0

doc/source/io.rst

+14
Original file line numberDiff line numberDiff line change
@@ -3320,6 +3320,20 @@ to pass to :func:`pandas.to_datetime`:
33203320
33213321
You can check if a table exists using :func:`~pandas.io.sql.has_table`
33223322

3323+
Schema support
3324+
~~~~~~~~~~~~~~
3325+
3326+
.. versionadded:: 0.15.0
3327+
3328+
Reading from and writing to different schema's is supported through the ``schema``
3329+
keyword in the :func:`~pandas.read_sql_table` and :func:`~pandas.DataFrame.to_sql`
3330+
functions. Note however that this depends on the database flavor (sqlite does not
3331+
have schema's). For example:
3332+
3333+
.. code-block:: python
3334+
3335+
df.to_sql('table', engine, schema='other_schema')
3336+
pd.read_sql_table('table', engine, schema='other_schema')
33233337
33243338
Querying
33253339
~~~~~~~~

doc/source/v0.15.0.txt

+7
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,13 @@ Enhancements
429429

430430
- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`)
431431
- Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`).
432+
- Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`).
433+
For example:
434+
435+
.. code-block:: python
436+
437+
df.to_sql('table', engine, schema='other_schema')
438+
pd.read_sql_table('table', engine, schema='other_schema')
432439

433440
- Added support for bool, uint8, uint16 and uint32 datatypes in ``to_stata`` (:issue:`7097`, :issue:`7365`)
434441

pandas/core/generic.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -915,8 +915,8 @@ def to_msgpack(self, path_or_buf=None, **kwargs):
915915
from pandas.io import packers
916916
return packers.to_msgpack(path_or_buf, self, **kwargs)
917917

918-
def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
919-
index_label=None, chunksize=None):
918+
def to_sql(self, name, con, flavor='sqlite', schema=None, if_exists='fail',
919+
index=True, index_label=None, chunksize=None):
920920
"""
921921
Write records stored in a DataFrame to a SQL database.
922922
@@ -932,6 +932,9 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
932932
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
933933
'mysql' is deprecated and will be removed in future versions, but it
934934
will be further supported through SQLAlchemy engines.
935+
schema : string, default None
936+
Specify the schema (if database flavor supports this). If None, use
937+
default schema.
935938
if_exists : {'fail', 'replace', 'append'}, default 'fail'
936939
- fail: If table exists, do nothing.
937940
- replace: If table exists, drop it, recreate it, and insert data.
@@ -949,8 +952,8 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
949952
"""
950953
from pandas.io import sql
951954
sql.to_sql(
952-
self, name, con, flavor=flavor, if_exists=if_exists, index=index,
953-
index_label=index_label, chunksize=chunksize)
955+
self, name, con, flavor=flavor, schema=schema, if_exists=if_exists,
956+
index=index, index_label=index_label, chunksize=chunksize)
954957

955958
def to_pickle(self, path):
956959
"""

pandas/io/sql.py

+28-20
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None):
253253
#------------------------------------------------------------------------------
254254
#--- Read and write to DataFrames
255255

256-
def read_sql_table(table_name, con, schema=None, index_col=None, coerce_float=True,
257-
parse_dates=None, columns=None):
256+
def read_sql_table(table_name, con, schema=None, index_col=None,
257+
coerce_float=True, parse_dates=None, columns=None):
258258
"""Read SQL database table into a DataFrame.
259259
260260
Given a table name and an SQLAlchemy engine, returns a DataFrame.
@@ -266,7 +266,9 @@ def read_sql_table(table_name, con, schema=None, index_col=None, coerce_float=Tr
266266
Name of SQL table in database
267267
con : SQLAlchemy engine
268268
Sqlite DBAPI connection mode not supported
269-
schema : Name of SQL schema in database.
269+
schema : string, default None
270+
Name of SQL schema in database to query (if database flavor supports this).
271+
If None, use default schema (default).
270272
index_col : string, optional
271273
Column to set as index
272274
coerce_float : boolean, default True
@@ -299,16 +301,16 @@ def read_sql_table(table_name, con, schema=None, index_col=None, coerce_float=Tr
299301
"SQLAlchemy engines.")
300302
import sqlalchemy
301303
from sqlalchemy.schema import MetaData
302-
meta = MetaData(con)
304+
meta = MetaData(con, schema=schema)
303305
try:
304-
meta.reflect(only=[table_name], schema=schema)
306+
meta.reflect(only=[table_name])
305307
except sqlalchemy.exc.InvalidRequestError:
306308
raise ValueError("Table %s not found" % table_name)
307309

308310
pandas_sql = PandasSQLAlchemy(con, meta=meta)
309311
table = pandas_sql.read_table(
310312
table_name, index_col=index_col, coerce_float=coerce_float,
311-
parse_dates=parse_dates, columns=columns, schema=schema)
313+
parse_dates=parse_dates, columns=columns)
312314

313315
if table is not None:
314316
return table
@@ -456,7 +458,9 @@ def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail',
456458
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
457459
'mysql' is deprecated and will be removed in future versions, but it
458460
will be further supported through SQLAlchemy engines.
459-
schema : Name of SQL schema in database.
461+
schema : string, default None
462+
Name of SQL schema in database to write to (if database flavor supports
463+
this). If None, use default schema (default).
460464
if_exists : {'fail', 'replace', 'append'}, default 'fail'
461465
- fail: If table exists, do nothing.
462466
- replace: If table exists, drop it, recreate it, and insert data.
@@ -475,7 +479,7 @@ def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail',
475479
if if_exists not in ('fail', 'replace', 'append'):
476480
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
477481

478-
pandas_sql = pandasSQL_builder(con, flavor=flavor)
482+
pandas_sql = pandasSQL_builder(con, schema=schema, flavor=flavor)
479483

480484
if isinstance(frame, Series):
481485
frame = frame.to_frame()
@@ -503,14 +507,16 @@ def has_table(table_name, con, flavor='sqlite', schema=None):
503507
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
504508
'mysql' is deprecated and will be removed in future versions, but it
505509
will be further supported through SQLAlchemy engines.
506-
schema : Name of SQL schema in database.
510+
schema : string, default None
511+
Name of SQL schema in database to write to (if database flavor supports
512+
this). If None, use default schema (default).
507513
508514
Returns
509515
-------
510516
boolean
511517
"""
512-
pandas_sql = pandasSQL_builder(con, flavor=flavor)
513-
return pandas_sql.has_table(table_name, schema=schema)
518+
pandas_sql = pandasSQL_builder(con, flavor=flavor, schema=schema)
519+
return pandas_sql.has_table(table_name)
514520

515521
table_exists = has_table
516522

@@ -519,15 +525,15 @@ def has_table(table_name, con, flavor='sqlite', schema=None):
519525
"and will be removed in future versions. "
520526
"MySQL will be further supported with SQLAlchemy engines.")
521527

522-
def pandasSQL_builder(con, flavor=None, meta=None, is_cursor=False):
528+
def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False):
523529
"""
524530
Convenience function to return the correct PandasSQL subclass based on the
525531
provided parameters
526532
"""
527533
# When support for DBAPI connections is removed,
528534
# is_cursor should not be necessary.
529535
if _is_sqlalchemy_engine(con):
530-
return PandasSQLAlchemy(con, meta=meta)
536+
return PandasSQLAlchemy(con, schema=schema, meta=meta)
531537
else:
532538
if flavor == 'mysql':
533539
warnings.warn(_MYSQL_WARNING, FutureWarning)
@@ -836,11 +842,11 @@ class PandasSQLAlchemy(PandasSQL):
836842
using SQLAlchemy to handle DataBase abstraction
837843
"""
838844

839-
def __init__(self, engine, meta=None):
845+
def __init__(self, engine, schema=None, meta=None):
840846
self.engine = engine
841847
if not meta:
842848
from sqlalchemy.schema import MetaData
843-
meta = MetaData(self.engine)
849+
meta = MetaData(self.engine, schema=schema)
844850

845851
self.meta = meta
846852

@@ -886,15 +892,17 @@ def tables(self):
886892
return self.meta.tables
887893

888894
def has_table(self, name, schema=None):
889-
return self.engine.has_table(name, schema)
895+
return self.engine.has_table(name, schema or self.meta.schema)
890896

891897
def get_table(self, table_name, schema=None):
898+
schema = schema or self.meta.schema
892899
if schema:
893900
return self.meta.tables.get('.'.join([schema, table_name]))
894901
else:
895902
return self.meta.tables.get(table_name)
896903

897904
def drop_table(self, table_name, schema=None):
905+
schema = schema or self.meta.schema
898906
if self.engine.has_table(table_name, schema):
899907
self.meta.reflect(only=[table_name], schema=schema)
900908
self.get_table(table_name, schema).drop()
@@ -1123,7 +1131,7 @@ def _fetchall_as_list(self, cur):
11231131
return result
11241132

11251133
def to_sql(self, frame, name, if_exists='fail', index=True,
1126-
index_label=None, chunksize=None):
1134+
index_label=None, schema=None, chunksize=None):
11271135
"""
11281136
Write records stored in a DataFrame to a SQL database.
11291137
@@ -1143,7 +1151,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
11431151
index_label=index_label)
11441152
table.insert(chunksize)
11451153

1146-
def has_table(self, name):
1154+
def has_table(self, name, schema=None):
11471155
flavor_map = {
11481156
'sqlite': ("SELECT name FROM sqlite_master "
11491157
"WHERE type='table' AND name='%s';") % name,
@@ -1152,10 +1160,10 @@ def has_table(self, name):
11521160

11531161
return len(self.execute(query).fetchall()) > 0
11541162

1155-
def get_table(self, table_name):
1163+
def get_table(self, table_name, schema=None):
11561164
return None # not supported in Legacy mode
11571165

1158-
def drop_table(self, name):
1166+
def drop_table(self, name, schema=None):
11591167
drop_sql = "DROP TABLE %s" % name
11601168
self.execute(drop_sql)
11611169

pandas/io/tests/test_sql.py

+64-9
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
from datetime import datetime, date, time
3030

31-
from pandas import DataFrame, Series, Index, MultiIndex, isnull
31+
from pandas import DataFrame, Series, Index, MultiIndex, isnull, concat
3232
from pandas import date_range, to_datetime, to_timedelta
3333
import pandas.compat as compat
3434
from pandas.compat import StringIO, range, lrange, string_types
@@ -457,12 +457,12 @@ def test_roundtrip(self):
457457
tm.assert_frame_equal(result, self.test_frame1)
458458

459459
def test_roundtrip_chunksize(self):
460-
sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn,
460+
sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn,
461461
index=False, flavor='sqlite', chunksize=2)
462462
result = sql.read_sql_query(
463463
'SELECT * FROM test_frame_roundtrip',
464464
con=self.conn)
465-
tm.assert_frame_equal(result, self.test_frame1)
465+
tm.assert_frame_equal(result, self.test_frame1)
466466

467467
def test_execute_sql(self):
468468
# drop_sql = "DROP TABLE IF EXISTS test" # should already be done
@@ -591,13 +591,13 @@ def test_to_sql_index_label_multiindex(self):
591591
index_label='C')
592592

593593
def test_multiindex_roundtrip(self):
594-
df = DataFrame.from_records([(1,2.1,'line1'), (2,1.5,'line2')],
594+
df = DataFrame.from_records([(1,2.1,'line1'), (2,1.5,'line2')],
595595
columns=['A','B','C'], index=['A','B'])
596596

597597
df.to_sql('test_multiindex_roundtrip', self.conn)
598-
result = sql.read_sql_query('SELECT * FROM test_multiindex_roundtrip',
598+
result = sql.read_sql_query('SELECT * FROM test_multiindex_roundtrip',
599599
self.conn, index_col=['A','B'])
600-
tm.assert_frame_equal(df, result, check_index_type=True)
600+
tm.assert_frame_equal(df, result, check_index_type=True)
601601

602602
def test_integer_col_names(self):
603603
df = DataFrame([[1, 2], [3, 4]], columns=[0, 1])
@@ -1196,8 +1196,8 @@ class TestPostgreSQLAlchemy(_TestSQLAlchemy):
11961196
flavor = 'postgresql'
11971197

11981198
def connect(self):
1199-
return sqlalchemy.create_engine(
1200-
'postgresql+{driver}://postgres@localhost/pandas_nosetest'.format(driver=self.driver))
1199+
url = 'postgresql+{driver}://postgres@localhost/pandas_nosetest'
1200+
return sqlalchemy.create_engine(url.format(driver=self.driver))
12011201

12021202
def setup_driver(self):
12031203
try:
@@ -1213,6 +1213,61 @@ def tearDown(self):
12131213
for table in c.fetchall():
12141214
self.conn.execute("DROP TABLE %s" % table[0])
12151215

1216+
def test_schema_support(self):
1217+
# only test this for postgresql (schema's not supported in mysql/sqlite)
1218+
df = DataFrame({'col1':[1, 2], 'col2':[0.1, 0.2], 'col3':['a', 'n']})
1219+
1220+
# create a schema
1221+
self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;")
1222+
self.conn.execute("CREATE SCHEMA other;")
1223+
1224+
# write dataframe to different schema's
1225+
df.to_sql('test_schema_public', self.conn, index=False)
1226+
df.to_sql('test_schema_public_explicit', self.conn, index=False,
1227+
schema='public')
1228+
df.to_sql('test_schema_other', self.conn, index=False, schema='other')
1229+
1230+
# read dataframes back in
1231+
res1 = sql.read_sql_table('test_schema_public', self.conn)
1232+
tm.assert_frame_equal(df, res1)
1233+
res2 = sql.read_sql_table('test_schema_public_explicit', self.conn)
1234+
tm.assert_frame_equal(df, res2)
1235+
res3 = sql.read_sql_table('test_schema_public_explicit', self.conn,
1236+
schema='public')
1237+
tm.assert_frame_equal(df, res3)
1238+
res4 = sql.read_sql_table('test_schema_other', self.conn,
1239+
schema='other')
1240+
tm.assert_frame_equal(df, res4)
1241+
self.assertRaises(ValueError, sql.read_sql_table, 'test_schema_other',
1242+
self.conn, schema='public')
1243+
1244+
## different if_exists options
1245+
1246+
# create a schema
1247+
self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;")
1248+
self.conn.execute("CREATE SCHEMA other;")
1249+
1250+
# write dataframe with different if_exists options
1251+
df.to_sql('test_schema_other', self.conn, schema='other', index=False)
1252+
df.to_sql('test_schema_other', self.conn, schema='other', index=False,
1253+
if_exists='replace')
1254+
df.to_sql('test_schema_other', self.conn, schema='other', index=False,
1255+
if_exists='append')
1256+
res = sql.read_sql_table('test_schema_other', self.conn, schema='other')
1257+
tm.assert_frame_equal(concat([df, df], ignore_index=True), res)
1258+
1259+
## specifying schema in user-provided meta
1260+
1261+
engine2 = self.connect()
1262+
meta = sqlalchemy.MetaData(engine2, schema='other')
1263+
pdsql = sql.PandasSQLAlchemy(engine2, meta=meta)
1264+
pdsql.to_sql(df, 'test_schema_other2', index=False)
1265+
pdsql.to_sql(df, 'test_schema_other2', index=False, if_exists='replace')
1266+
pdsql.to_sql(df, 'test_schema_other2', index=False, if_exists='append')
1267+
res1 = sql.read_sql_table('test_schema_other2', self.conn, schema='other')
1268+
res2 = pdsql.read_table('test_schema_other2')
1269+
tm.assert_frame_equal(res1, res2)
1270+
12161271

12171272
#------------------------------------------------------------------------------
12181273
#--- Test Sqlite / MySQL fallback
@@ -1295,7 +1350,7 @@ def test_datetime_date(self):
12951350
tm.assert_frame_equal(res, df.astype(str))
12961351
elif self.flavor == 'mysql':
12971352
tm.assert_frame_equal(res, df)
1298-
1353+
12991354
def test_datetime_time(self):
13001355
# test support for datetime.time
13011356
raise nose.SkipTest("datetime.time not supported for sqlite fallback")

0 commit comments

Comments
 (0)