@@ -34,7 +34,50 @@ def get_engine(engine):
34
34
return FastParquetImpl ()
35
35
36
36
37
- class PyArrowImpl (object ):
37
+ class BaseImpl (object ):
38
+
39
+ api = None # module
40
+
41
+ @staticmethod
42
+ def _validate_index (df ):
43
+ if not isinstance (df .index , Int64Index ):
44
+ msg = (
45
+ "parquet does not support serializing {} for the index;"
46
+ "you can .reset_index() to make the index into column(s)"
47
+ )
48
+ raise ValueError (msg .format (type (df .index )))
49
+ if not df .index .equals (RangeIndex (len (df ))):
50
+ raise ValueError (
51
+ "parquet does not support serializing a non-default index "
52
+ "for the index; you can .reset_index() to make the index "
53
+ "into column(s)"
54
+ )
55
+ if df .index .name is not None :
56
+ raise ValueError (
57
+ "parquet does not serialize index meta-data "
58
+ "on a default index"
59
+ )
60
+
61
+ @staticmethod
62
+ def _validate_columns (df ):
63
+ # must have value column names (strings only)
64
+ if df .columns .inferred_type not in {'string' , 'unicode' }:
65
+ raise ValueError ("parquet must have string column names" )
66
+
67
+ def validate_dataframe (self , df ):
68
+ if not isinstance (df , DataFrame ):
69
+ raise ValueError ("to_parquet only support IO with DataFrames" )
70
+ self ._validate_columns (df )
71
+ self ._validate_index (df )
72
+
73
+ def write (self , df , path , compression , ** kwargs ):
74
+ raise NotImplementedError ()
75
+
76
+ def read (self , path , columns = None , ** kwargs ):
77
+ raise NotImplementedError ()
78
+
79
+
80
+ class PyArrowImpl (BaseImpl ):
38
81
39
82
def __init__ (self ):
40
83
# since pandas is a dependency of pyarrow
@@ -63,8 +106,14 @@ def __init__(self):
63
106
self ._pyarrow_lt_070 = LooseVersion (pyarrow .__version__ ) < '0.7.0'
64
107
self .api = pyarrow
65
108
109
+ def _validate_index (self , df ):
110
+ # pyarrow >= 0.7.0 supports multi-indexes so no need to validate
111
+ if self ._pyarrow_lt_070 :
112
+ super (PyArrowImpl , self )._validate_index (df )
113
+
66
114
def write (self , df , path , compression = 'snappy' ,
67
115
coerce_timestamps = 'ms' , ** kwargs ):
116
+ self .validate_dataframe (df )
68
117
path , _ , _ = get_filepath_or_buffer (path )
69
118
if self ._pyarrow_lt_060 :
70
119
table = self .api .Table .from_pandas (df , timestamps_to_ms = True )
@@ -83,12 +132,11 @@ def read(self, path, columns=None, **kwargs):
83
132
** kwargs ).to_pandas ()
84
133
85
134
86
- class FastParquetImpl (object ):
135
+ class FastParquetImpl (BaseImpl ):
87
136
88
137
def __init__ (self ):
89
138
# since pandas is a dependency of fastparquet
90
139
# we need to import on first use
91
-
92
140
try :
93
141
import fastparquet
94
142
except ImportError :
@@ -109,6 +157,7 @@ def __init__(self):
109
157
self .api = fastparquet
110
158
111
159
def write (self , df , path , compression = 'snappy' , ** kwargs ):
160
+ self .validate_dataframe (df )
112
161
# thriftpy/protocol/compact.py:339:
113
162
# DeprecationWarning: tostring() is deprecated.
114
163
# Use tobytes() instead.
@@ -140,46 +189,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
140
189
kwargs
141
190
Additional keyword arguments passed to the engine
142
191
"""
143
-
144
192
impl = get_engine (engine )
145
-
146
- if not isinstance (df , DataFrame ):
147
- raise ValueError ("to_parquet only support IO with DataFrames" )
148
-
149
- valid_types = {'string' , 'unicode' }
150
-
151
- # validate that we have only a default index
152
- # raise on anything else as we don't serialize the index
153
- # *unless* we're using pyarrow >= 0.7.1 which does support multi-indexes
154
- if impl .api .__name__ == 'pyarrow' and not impl ._pyarrow_lt_070 :
155
- validate_index = False
156
- else :
157
- validate_index = True
158
-
159
- if validate_index :
160
- if not isinstance (df .index , Int64Index ):
161
- raise ValueError ("parquet does not support serializing {} "
162
- "for the index; you can .reset_index()"
163
- "to make the index into column(s)" .format (
164
- type (df .index )))
165
-
166
- if not df .index .equals (RangeIndex .from_range (range (len (df )))):
167
- raise ValueError ("parquet does not support serializing a "
168
- "non-default index for the index; you "
169
- "can .reset_index() to make the index "
170
- "into column(s)" )
171
-
172
- if df .index .name is not None :
173
- raise ValueError ("parquet does not serialize index meta-data on a "
174
- "default index" )
175
-
176
- # validate columns
177
- # ----------------
178
-
179
- # must have value column names (strings only)
180
- if df .columns .inferred_type not in valid_types :
181
- raise ValueError ("parquet must have string column names" )
182
-
183
193
return impl .write (df , path , compression = compression , ** kwargs )
184
194
185
195
0 commit comments