3
3
from warnings import catch_warnings
4
4
from distutils .version import LooseVersion
5
5
from pandas import DataFrame , RangeIndex , Int64Index , get_option
6
- from pandas .compat import range
6
+ from pandas .compat import range , string_types
7
+ from pandas .core .common import AbstractMethodError
7
8
from pandas .io .common import get_filepath_or_buffer
8
9
9
10
@@ -34,82 +35,152 @@ def get_engine(engine):
34
35
return FastParquetImpl ()
35
36
36
37
37
- class PyArrowImpl (object ):
38
+ class BaseImpl (object ):
39
+
40
+ api = None # module
41
+
42
+ @staticmethod
43
+ def validate_dataframe (df ):
44
+ if not isinstance (df , DataFrame ):
45
+ raise ValueError ("to_parquet only supports IO with DataFrames" )
46
+ # must have value column names (strings only)
47
+ if df .columns .inferred_type not in {'string' , 'unicode' }:
48
+ raise ValueError ("parquet must have string column names" )
49
+ # index level names must be strings
50
+ valid_names = all (
51
+ isinstance (name , string_types )
52
+ for name in df .index .names
53
+ if name is not None
54
+ )
55
+ if not valid_names :
56
+ raise ValueError ("Index level names must be strings" )
57
+
58
+ def write (self , df , path , compression , ** kwargs ):
59
+ raise AbstractMethodError (self )
60
+
61
+ def read (self , path , columns = None , ** kwargs ):
62
+ raise AbstractMethodError (self )
63
+
64
+
65
+ class PyArrowImpl (BaseImpl ):
38
66
39
67
def __init__ (self ):
40
68
# since pandas is a dependency of pyarrow
41
69
# we need to import on first use
42
-
43
70
try :
44
71
import pyarrow
45
72
import pyarrow .parquet
46
73
except ImportError :
47
- raise ImportError ("pyarrow is required for parquet support \n \n "
48
- "you can install via conda \n "
49
- "conda install pyarrow -c conda-forge \n "
50
- " \n or via pip \n "
51
- "pip install -U pyarrow \n ")
52
-
53
- if LooseVersion ( pyarrow . __version__ ) < LooseVersion ( '0.4.1' ):
54
- raise ImportError ( " pyarrow >= 0.4.1 is required for parquet"
55
- "support \n \n "
56
- "you can install via conda \n "
57
- "conda install pyarrow -c conda-forge \n "
58
- " \n or via pip \n "
59
- "pip install -U pyarrow \n ")
60
-
61
- self . _pyarrow_lt_050 = ( LooseVersion ( pyarrow . __version__ ) <
62
- LooseVersion ( '0.5.0' ))
63
- self . _pyarrow_lt_060 = ( LooseVersion (pyarrow .__version__ ) <
64
- LooseVersion ( '0.6.0' ) )
74
+ raise ImportError (
75
+ "pyarrow is required for parquet support \n \n "
76
+ "you can install via conda\n "
77
+ "conda install pyarrow -c conda-forge \n "
78
+ " \n or via pip \n "
79
+ "pip install -U pyarrow \n "
80
+ )
81
+ if LooseVersion ( pyarrow . __version__ ) < ' 0.4.1' :
82
+ raise ImportError (
83
+ "pyarrow >= 0.4.1 is required for parquet support \n \n "
84
+ "you can install via conda\n "
85
+ "conda install pyarrow -c conda-forge \n "
86
+ " \n or via pip \n "
87
+ "pip install -U pyarrow \n "
88
+ )
89
+ self . _pyarrow_lt_070 = (
90
+ LooseVersion (pyarrow .__version__ ) < LooseVersion ( '0.7.0' )
91
+ )
65
92
self .api = pyarrow
66
93
67
94
def write (self , df , path , compression = 'snappy' ,
68
95
coerce_timestamps = 'ms' , ** kwargs ):
96
+ self .validate_dataframe (df )
97
+ if self ._pyarrow_lt_070 :
98
+ self ._validate_write_lt_070 (
99
+ df , path , compression , coerce_timestamps , ** kwargs
100
+ )
69
101
path , _ , _ = get_filepath_or_buffer (path )
70
- if self ._pyarrow_lt_060 :
71
- table = self .api .Table .from_pandas (df , timestamps_to_ms = True )
72
- self .api .parquet .write_table (
73
- table , path , compression = compression , ** kwargs )
74
-
75
- else :
76
- table = self .api .Table .from_pandas (df )
77
- self .api .parquet .write_table (
78
- table , path , compression = compression ,
79
- coerce_timestamps = coerce_timestamps , ** kwargs )
102
+ table = self .api .Table .from_pandas (df )
103
+ self .api .parquet .write_table (
104
+ table , path , compression = compression ,
105
+ coerce_timestamps = coerce_timestamps , ** kwargs )
80
106
81
107
def read (self , path , columns = None , ** kwargs ):
82
108
path , _ , _ = get_filepath_or_buffer (path )
83
- return self .api .parquet .read_table (path , columns = columns ,
84
- ** kwargs ).to_pandas ()
85
-
86
-
87
- class FastParquetImpl (object ):
109
+ parquet_file = self .api .parquet .ParquetFile (path )
110
+ if self ._pyarrow_lt_070 :
111
+ parquet_file .path = path
112
+ return self ._read_lt_070 (parquet_file , columns , ** kwargs )
113
+ kwargs ['use_pandas_metadata' ] = True
114
+ return parquet_file .read (columns = columns , ** kwargs ).to_pandas ()
115
+
116
+
117
+ def _validate_write_lt_070 (self , df , path , compression = 'snappy' ,
118
+ coerce_timestamps = 'ms' , ** kwargs ):
119
+ # Compatibility shim for pyarrow < 0.7.0
120
+ # TODO: Remove in pandas 0.22.0
121
+ from pandas .core .indexes .multi import MultiIndex
122
+ if isinstance (df .index , MultiIndex ):
123
+ msg = "Mulit-index DataFrames are only supported with pyarrow >= 0.7.0"
124
+ raise ValueError (msg )
125
+ # Validate index
126
+ if not isinstance (df .index , Int64Index ):
127
+ msg = (
128
+ "parquet does not support serializing {} for the index;"
129
+ "you can .reset_index() to make the index into column(s)"
130
+ )
131
+ raise ValueError (msg .format (type (df .index )))
132
+ if not df .index .equals (RangeIndex (len (df ))):
133
+ raise ValueError (
134
+ "parquet does not support serializing a non-default index "
135
+ "for the index; you can .reset_index() to make the index "
136
+ "into column(s)"
137
+ )
138
+ if df .index .name is not None :
139
+ raise ValueError (
140
+ "parquet does not serialize index meta-data "
141
+ "on a default index"
142
+ )
143
+
144
+ def _read_lt_070 (self , parquet_file , columns , ** kwargs ):
145
+ # Compatibility shim for pyarrow < 0.7.0
146
+ # TODO: Remove in pandas 0.22.0
147
+ from itertools import chain
148
+ import json
149
+ if columns is not None :
150
+ metadata = json .loads (parquet_file .metadata .metadata [b'pandas' ])
151
+ columns = set (chain (columns , metadata ['index_columns' ]))
152
+ kwargs ['columns' ] = columns
153
+ return self .api .parquet .read_table (parquet_file .path , ** kwargs ).to_pandas ()
154
+
155
+
156
+ class FastParquetImpl (BaseImpl ):
88
157
89
158
def __init__ (self ):
90
159
# since pandas is a dependency of fastparquet
91
160
# we need to import on first use
92
-
93
161
try :
94
162
import fastparquet
95
163
except ImportError :
96
- raise ImportError ("fastparquet is required for parquet support\n \n "
97
- "you can install via conda\n "
98
- "conda install fastparquet -c conda-forge\n "
99
- "\n or via pip\n "
100
- "pip install -U fastparquet" )
101
-
102
- if LooseVersion (fastparquet .__version__ ) < LooseVersion ('0.1.0' ):
103
- raise ImportError ("fastparquet >= 0.1.0 is required for parquet "
104
- "support\n \n "
105
- "you can install via conda\n "
106
- "conda install fastparquet -c conda-forge\n "
107
- "\n or via pip\n "
108
- "pip install -U fastparquet" )
109
-
164
+ raise ImportError (
165
+ "fastparquet is required for parquet support\n \n "
166
+ "you can install via conda\n "
167
+ "conda install fastparquet -c conda-forge\n "
168
+ "\n or via pip\n "
169
+ "pip install -U fastparquet"
170
+ )
171
+ if LooseVersion (fastparquet .__version__ ) < '0.1.0' :
172
+ raise ImportError (
173
+ "fastparquet >= 0.1.0 is required for parquet "
174
+ "support\n \n "
175
+ "you can install via conda\n "
176
+ "conda install fastparquet -c conda-forge\n "
177
+ "\n or via pip\n "
178
+ "pip install -U fastparquet"
179
+ )
110
180
self .api = fastparquet
111
181
112
182
def write (self , df , path , compression = 'snappy' , ** kwargs ):
183
+ self .validate_dataframe (df )
113
184
# thriftpy/protocol/compact.py:339:
114
185
# DeprecationWarning: tostring() is deprecated.
115
186
# Use tobytes() instead.
@@ -120,7 +191,8 @@ def write(self, df, path, compression='snappy', **kwargs):
120
191
121
192
def read (self , path , columns = None , ** kwargs ):
122
193
path , _ , _ = get_filepath_or_buffer (path )
123
- return self .api .ParquetFile (path ).to_pandas (columns = columns , ** kwargs )
194
+ parquet_file = self .api .ParquetFile (path )
195
+ return parquet_file .to_pandas (columns = columns , ** kwargs )
124
196
125
197
126
198
def to_parquet (df , path , engine = 'auto' , compression = 'snappy' , ** kwargs ):
@@ -141,43 +213,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
141
213
kwargs
142
214
Additional keyword arguments passed to the engine
143
215
"""
144
-
145
216
impl = get_engine (engine )
146
-
147
- if not isinstance (df , DataFrame ):
148
- raise ValueError ("to_parquet only support IO with DataFrames" )
149
-
150
- valid_types = {'string' , 'unicode' }
151
-
152
- # validate index
153
- # --------------
154
-
155
- # validate that we have only a default index
156
- # raise on anything else as we don't serialize the index
157
-
158
- if not isinstance (df .index , Int64Index ):
159
- raise ValueError ("parquet does not support serializing {} "
160
- "for the index; you can .reset_index()"
161
- "to make the index into column(s)" .format (
162
- type (df .index )))
163
-
164
- if not df .index .equals (RangeIndex .from_range (range (len (df )))):
165
- raise ValueError ("parquet does not support serializing a "
166
- "non-default index for the index; you "
167
- "can .reset_index() to make the index "
168
- "into column(s)" )
169
-
170
- if df .index .name is not None :
171
- raise ValueError ("parquet does not serialize index meta-data on a "
172
- "default index" )
173
-
174
- # validate columns
175
- # ----------------
176
-
177
- # must have value column names (strings only)
178
- if df .columns .inferred_type not in valid_types :
179
- raise ValueError ("parquet must have string column names" )
180
-
181
217
return impl .write (df , path , compression = compression , ** kwargs )
182
218
183
219
0 commit comments