@@ -107,7 +107,7 @@ def write(self, df, path, compression='snappy',
107
107
self .validate_dataframe (df )
108
108
if self ._pyarrow_lt_070 :
109
109
self ._validate_write_lt_070 (df )
110
- path , _ , _ = get_filepath_or_buffer (path , mode = 'wb' )
110
+ path , _ , _ , _ = get_filepath_or_buffer (path , mode = 'wb' )
111
111
112
112
if self ._pyarrow_lt_060 :
113
113
table = self .api .Table .from_pandas (df , timestamps_to_ms = True )
@@ -121,13 +121,21 @@ def write(self, df, path, compression='snappy',
121
121
coerce_timestamps = coerce_timestamps , ** kwargs )
122
122
123
123
def read (self , path , columns = None , ** kwargs ):
124
- path , _ , _ = get_filepath_or_buffer (path )
124
+ path , _ , _ , should_close = get_filepath_or_buffer (path )
125
125
if self ._pyarrow_lt_070 :
126
- return self .api .parquet .read_pandas (path , columns = columns ,
127
- ** kwargs ).to_pandas ()
128
- kwargs ['use_pandas_metadata' ] = True
129
- return self .api .parquet .read_table (path , columns = columns ,
130
- ** kwargs ).to_pandas ()
126
+ result = self .api .parquet .read_pandas (path , columns = columns ,
127
+ ** kwargs ).to_pandas ()
128
+ else :
129
+ kwargs ['use_pandas_metadata' ] = True
130
+ result = self .api .parquet .read_table (path , columns = columns ,
131
+ ** kwargs ).to_pandas ()
132
+ if should_close :
133
+ try :
134
+ path .close ()
135
+ except : # noqa: flake8
136
+ pass
137
+
138
+ return result
131
139
132
140
def _validate_write_lt_070 (self , df ):
133
141
# Compatibility shim for pyarrow < 0.7.0
@@ -199,11 +207,11 @@ def write(self, df, path, compression='snappy', **kwargs):
199
207
# path is s3:// so we need to open the s3file in 'wb' mode.
200
208
# TODO: Support 'ab'
201
209
202
- path , _ , _ = get_filepath_or_buffer (path , mode = 'wb' )
210
+ path , _ , _ , _ = get_filepath_or_buffer (path , mode = 'wb' )
203
211
# And pass the opened s3file to the fastparquet internal impl.
204
212
kwargs ['open_with' ] = lambda path , _ : path
205
213
else :
206
- path , _ , _ = get_filepath_or_buffer (path )
214
+ path , _ , _ , _ = get_filepath_or_buffer (path )
207
215
208
216
with catch_warnings (record = True ):
209
217
self .api .write (path , df ,
@@ -214,13 +222,13 @@ def read(self, path, columns=None, **kwargs):
214
222
# When path is s3:// an S3File is returned.
215
223
# We need to retain the original path(str) while also
216
224
# pass the S3File().open function to fsatparquet impl.
217
- s3 , _ , _ = get_filepath_or_buffer (path )
225
+ s3 , _ , _ , should_close = get_filepath_or_buffer (path )
218
226
try :
219
227
parquet_file = self .api .ParquetFile (path , open_with = s3 .s3 .open )
220
228
finally :
221
229
s3 .close ()
222
230
else :
223
- path , _ , _ = get_filepath_or_buffer (path )
231
+ path , _ , _ , _ = get_filepath_or_buffer (path )
224
232
parquet_file = self .api .ParquetFile (path )
225
233
226
234
return parquet_file .to_pandas (columns = columns , ** kwargs )
0 commit comments