3
3
from warnings import catch_warnings
4
4
from distutils .version import LooseVersion
5
5
from pandas import DataFrame , RangeIndex , Int64Index , get_option
6
- from pandas .compat import range
6
+ from pandas .compat import string_types
7
+ from pandas .core .common import AbstractMethodError
7
8
from pandas .io .common import get_filepath_or_buffer
8
9
9
10
@@ -39,39 +40,75 @@ def get_engine(engine):
39
40
return FastParquetImpl ()
40
41
41
42
42
- class PyArrowImpl (object ):
43
+ class BaseImpl (object ):
44
+
45
+ api = None # module
46
+
47
+ @staticmethod
48
+ def validate_dataframe (df ):
49
+
50
+ if not isinstance (df , DataFrame ):
51
+ raise ValueError ("to_parquet only supports IO with DataFrames" )
52
+
53
+ # must have value column names (strings only)
54
+ if df .columns .inferred_type not in {'string' , 'unicode' }:
55
+ raise ValueError ("parquet must have string column names" )
56
+
57
+ # index level names must be strings
58
+ valid_names = all (
59
+ isinstance (name , string_types )
60
+ for name in df .index .names
61
+ if name is not None
62
+ )
63
+ if not valid_names :
64
+ raise ValueError ("Index level names must be strings" )
65
+
66
+ def write (self , df , path , compression , ** kwargs ):
67
+ raise AbstractMethodError (self )
68
+
69
+ def read (self , path , columns = None , ** kwargs ):
70
+ raise AbstractMethodError (self )
71
+
72
+
73
+ class PyArrowImpl (BaseImpl ):
43
74
44
75
def __init__ (self ):
45
76
# since pandas is a dependency of pyarrow
46
77
# we need to import on first use
47
-
48
78
try :
49
79
import pyarrow
50
80
import pyarrow .parquet
51
81
except ImportError :
52
- raise ImportError ("pyarrow is required for parquet support\n \n "
53
- "you can install via conda\n "
54
- "conda install pyarrow -c conda-forge\n "
55
- "\n or via pip\n "
56
- "pip install -U pyarrow\n " )
57
-
58
- if LooseVersion (pyarrow .__version__ ) < LooseVersion ('0.4.1' ):
59
- raise ImportError ("pyarrow >= 0.4.1 is required for parquet"
60
- "support\n \n "
61
- "you can install via conda\n "
62
- "conda install pyarrow -c conda-forge\n "
63
- "\n or via pip\n "
64
- "pip install -U pyarrow\n " )
65
-
66
- self ._pyarrow_lt_050 = (LooseVersion (pyarrow .__version__ ) <
67
- LooseVersion ('0.5.0' ))
68
- self ._pyarrow_lt_060 = (LooseVersion (pyarrow .__version__ ) <
69
- LooseVersion ('0.6.0' ))
82
+ raise ImportError (
83
+ "pyarrow is required for parquet support\n \n "
84
+ "you can install via conda\n "
85
+ "conda install pyarrow -c conda-forge\n "
86
+ "\n or via pip\n "
87
+ "pip install -U pyarrow\n "
88
+ )
89
+ if LooseVersion (pyarrow .__version__ ) < '0.4.1' :
90
+ raise ImportError (
91
+ "pyarrow >= 0.4.1 is required for parquet support\n \n "
92
+ "you can install via conda\n "
93
+ "conda install pyarrow -c conda-forge\n "
94
+ "\n or via pip\n "
95
+ "pip install -U pyarrow\n "
96
+ )
97
+
98
+ self ._pyarrow_lt_060 = (
99
+ LooseVersion (pyarrow .__version__ ) < LooseVersion ('0.6.0' ))
100
+ self ._pyarrow_lt_070 = (
101
+ LooseVersion (pyarrow .__version__ ) < LooseVersion ('0.7.0' ))
102
+
70
103
self .api = pyarrow
71
104
72
105
def write (self , df , path , compression = 'snappy' ,
73
106
coerce_timestamps = 'ms' , ** kwargs ):
107
+ self .validate_dataframe (df )
108
+ if self ._pyarrow_lt_070 :
109
+ self ._validate_write_lt_070 (df )
74
110
path , _ , _ = get_filepath_or_buffer (path )
111
+
75
112
if self ._pyarrow_lt_060 :
76
113
table = self .api .Table .from_pandas (df , timestamps_to_ms = True )
77
114
self .api .parquet .write_table (
@@ -85,36 +122,75 @@ def write(self, df, path, compression='snappy',
85
122
86
123
def read (self , path , columns = None , ** kwargs ):
87
124
path , _ , _ = get_filepath_or_buffer (path )
125
+ if self ._pyarrow_lt_070 :
126
+ return self .api .parquet .read_pandas (path , columns = columns ,
127
+ ** kwargs ).to_pandas ()
128
+ kwargs ['use_pandas_metadata' ] = True
88
129
return self .api .parquet .read_table (path , columns = columns ,
89
130
** kwargs ).to_pandas ()
90
131
91
-
92
- class FastParquetImpl (object ):
132
+ def _validate_write_lt_070 (self , df ):
133
+ # Compatibility shim for pyarrow < 0.7.0
134
+ # TODO: Remove in pandas 0.22.0
135
+ from pandas .core .indexes .multi import MultiIndex
136
+ if isinstance (df .index , MultiIndex ):
137
+ msg = (
138
+ "Multi-index DataFrames are only supported "
139
+ "with pyarrow >= 0.7.0"
140
+ )
141
+ raise ValueError (msg )
142
+ # Validate index
143
+ if not isinstance (df .index , Int64Index ):
144
+ msg = (
145
+ "pyarrow < 0.7.0 does not support serializing {} for the "
146
+ "index; you can .reset_index() to make the index into "
147
+ "column(s), or install the latest version of pyarrow or "
148
+ "fastparquet."
149
+ )
150
+ raise ValueError (msg .format (type (df .index )))
151
+ if not df .index .equals (RangeIndex (len (df ))):
152
+ raise ValueError (
153
+ "pyarrow < 0.7.0 does not support serializing a non-default "
154
+ "index; you can .reset_index() to make the index into "
155
+ "column(s), or install the latest version of pyarrow or "
156
+ "fastparquet."
157
+ )
158
+ if df .index .name is not None :
159
+ raise ValueError (
160
+ "pyarrow < 0.7.0 does not serialize indexes with a name; you "
161
+ "can set the index.name to None or install the latest version "
162
+ "of pyarrow or fastparquet."
163
+ )
164
+
165
+
166
+ class FastParquetImpl (BaseImpl ):
93
167
94
168
def __init__ (self ):
95
169
# since pandas is a dependency of fastparquet
96
170
# we need to import on first use
97
-
98
171
try :
99
172
import fastparquet
100
173
except ImportError :
101
- raise ImportError ("fastparquet is required for parquet support\n \n "
102
- "you can install via conda\n "
103
- "conda install fastparquet -c conda-forge\n "
104
- "\n or via pip\n "
105
- "pip install -U fastparquet" )
106
-
107
- if LooseVersion (fastparquet .__version__ ) < LooseVersion ('0.1.0' ):
108
- raise ImportError ("fastparquet >= 0.1.0 is required for parquet "
109
- "support\n \n "
110
- "you can install via conda\n "
111
- "conda install fastparquet -c conda-forge\n "
112
- "\n or via pip\n "
113
- "pip install -U fastparquet" )
114
-
174
+ raise ImportError (
175
+ "fastparquet is required for parquet support\n \n "
176
+ "you can install via conda\n "
177
+ "conda install fastparquet -c conda-forge\n "
178
+ "\n or via pip\n "
179
+ "pip install -U fastparquet"
180
+ )
181
+ if LooseVersion (fastparquet .__version__ ) < '0.1.0' :
182
+ raise ImportError (
183
+ "fastparquet >= 0.1.0 is required for parquet "
184
+ "support\n \n "
185
+ "you can install via conda\n "
186
+ "conda install fastparquet -c conda-forge\n "
187
+ "\n or via pip\n "
188
+ "pip install -U fastparquet"
189
+ )
115
190
self .api = fastparquet
116
191
117
192
def write (self , df , path , compression = 'snappy' , ** kwargs ):
193
+ self .validate_dataframe (df )
118
194
# thriftpy/protocol/compact.py:339:
119
195
# DeprecationWarning: tostring() is deprecated.
120
196
# Use tobytes() instead.
@@ -125,7 +201,8 @@ def write(self, df, path, compression='snappy', **kwargs):
125
201
126
202
def read (self , path , columns = None , ** kwargs ):
127
203
path , _ , _ = get_filepath_or_buffer (path )
128
- return self .api .ParquetFile (path ).to_pandas (columns = columns , ** kwargs )
204
+ parquet_file = self .api .ParquetFile (path )
205
+ return parquet_file .to_pandas (columns = columns , ** kwargs )
129
206
130
207
131
208
def to_parquet (df , path , engine = 'auto' , compression = 'snappy' , ** kwargs ):
@@ -146,43 +223,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
146
223
kwargs
147
224
Additional keyword arguments passed to the engine
148
225
"""
149
-
150
226
impl = get_engine (engine )
151
-
152
- if not isinstance (df , DataFrame ):
153
- raise ValueError ("to_parquet only support IO with DataFrames" )
154
-
155
- valid_types = {'string' , 'unicode' }
156
-
157
- # validate index
158
- # --------------
159
-
160
- # validate that we have only a default index
161
- # raise on anything else as we don't serialize the index
162
-
163
- if not isinstance (df .index , Int64Index ):
164
- raise ValueError ("parquet does not support serializing {} "
165
- "for the index; you can .reset_index()"
166
- "to make the index into column(s)" .format (
167
- type (df .index )))
168
-
169
- if not df .index .equals (RangeIndex .from_range (range (len (df )))):
170
- raise ValueError ("parquet does not support serializing a "
171
- "non-default index for the index; you "
172
- "can .reset_index() to make the index "
173
- "into column(s)" )
174
-
175
- if df .index .name is not None :
176
- raise ValueError ("parquet does not serialize index meta-data on a "
177
- "default index" )
178
-
179
- # validate columns
180
- # ----------------
181
-
182
- # must have value column names (strings only)
183
- if df .columns .inferred_type not in valid_types :
184
- raise ValueError ("parquet must have string column names" )
185
-
186
227
return impl .write (df , path , compression = compression , ** kwargs )
187
228
188
229
0 commit comments