@@ -104,7 +104,8 @@ def __init__(self):
104
104
self .api = pyarrow
105
105
106
106
def write (self , df , path , compression = 'snappy' ,
107
- coerce_timestamps = 'ms' , index = None , ** kwargs ):
107
+ coerce_timestamps = 'ms' , index = None , partition_cols = None ,
108
+ ** kwargs ):
108
109
self .validate_dataframe (df )
109
110
110
111
# Only validate the index if we're writing it.
@@ -125,10 +126,11 @@ def write(self, df, path, compression='snappy',
125
126
126
127
else :
127
128
table = self .api .Table .from_pandas (df , ** from_pandas_kwargs )
128
- if ' partition_cols' in kwargs :
129
+ if partition_cols is not None :
129
130
self .api .parquet .write_to_dataset (
130
131
table , path , compression = compression ,
131
- coerce_timestamps = coerce_timestamps , ** kwargs )
132
+ coerce_timestamps = coerce_timestamps ,
133
+ partition_cols = partition_cols , ** kwargs )
132
134
else :
133
135
self .api .parquet .write_table (
134
136
table , path , compression = compression ,
@@ -211,12 +213,16 @@ def __init__(self):
211
213
)
212
214
self .api = fastparquet
213
215
214
- def write (self , df , path , compression = 'snappy' , index = None , ** kwargs ):
216
+ def write (self , df , path , compression = 'snappy' , index = None ,
217
+ partition_cols = None , ** kwargs ):
215
218
self .validate_dataframe (df )
216
219
# thriftpy/protocol/compact.py:339:
217
220
# DeprecationWarning: tostring() is deprecated.
218
221
# Use tobytes() instead.
219
222
223
+ if partition_cols is not None :
224
+ kwargs ['file_scheme' ] = 'hive'
225
+
220
226
if is_s3_url (path ):
221
227
# path is s3:// so we need to open the s3file in 'wb' mode.
222
228
# TODO: Support 'ab'
@@ -229,7 +235,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs):
229
235
230
236
with catch_warnings (record = True ):
231
237
self .api .write (path , df , compression = compression ,
232
- write_index = index , ** kwargs )
238
+ write_index = index , partition_on = partition_cols ,
239
+ ** kwargs )
233
240
234
241
def read (self , path , columns = None , ** kwargs ):
235
242
if is_s3_url (path ):
@@ -249,16 +256,15 @@ def read(self, path, columns=None, **kwargs):
249
256
250
257
251
258
def to_parquet (df , path , engine = 'auto' , compression = 'snappy' , index = None ,
252
- ** kwargs ):
259
+ partition_cols = None , ** kwargs ):
253
260
"""
254
261
Write a DataFrame to the parquet format.
255
262
256
263
Parameters
257
264
----------
258
- df : DataFrame
259
- path : string
260
- File path ( Will be used as `root_path` if
261
- `partition_cols` is provided as parameter for 'pyarrow' engine).
265
+ path : str
266
+ File path or Root Directory path. Will be used as Root Directory path
267
+ while writing a partitioned dataset.
262
268
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
263
269
Parquet library to use. If 'auto', then the option
264
270
``io.parquet.engine`` is used. The default ``io.parquet.engine``
@@ -272,11 +278,18 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None,
272
278
engine's default behavior will be used.
273
279
274
280
.. versionadded 0.24.0
281
+ partition_cols : list, optional
282
+ Column names by which to partition the dataset
283
+ Columns are partitioned in the order they are given
284
+ The behaviour applies only to pyarrow >= 0.7.0 and fastparquet
285
+ For other versions, this argument will be ignored.
286
+ .. versionadded:: 0.24.0
275
287
kwargs
276
288
Additional keyword arguments passed to the engine
277
289
"""
278
290
impl = get_engine (engine )
279
- return impl .write (df , path , compression = compression , index = index , ** kwargs )
291
+ return impl .write (df , path , compression = compression , index = index ,
292
+ partition_cols = partition_cols , ** kwargs )
280
293
281
294
282
295
def read_parquet (path , engine = 'auto' , columns = None , ** kwargs ):
0 commit comments