8
8
9
9
from pandas import DataFrame , get_option
10
10
11
- from pandas .io .common import get_filepath_or_buffer , is_s3_url
11
+ from pandas .io .common import _get_handle , get_filepath_or_buffer , is_s3_url
12
12
13
13
14
14
def get_engine (engine ):
@@ -104,7 +104,7 @@ def write(self, df, path, compression='snappy',
104
104
coerce_timestamps = 'ms' , index = None , partition_cols = None ,
105
105
** kwargs ):
106
106
self .validate_dataframe (df )
107
- path , _ , _ , _ = get_filepath_or_buffer (path , mode = 'wb' )
107
+ path , _ , _ , should_close = get_filepath_or_buffer (path , mode = 'wb' )
108
108
109
109
if index is None :
110
110
from_pandas_kwargs = {}
@@ -121,6 +121,16 @@ def write(self, df, path, compression='snappy',
121
121
table , path , compression = compression ,
122
122
coerce_timestamps = coerce_timestamps , ** kwargs )
123
123
124
+ if should_close :
125
+ try :
126
+ f , handles = _get_handle (path , mode = 'wb' )
127
+ f .close ()
128
+ for _fh in handles :
129
+ _fh .close ()
130
+ except AttributeError :
131
+ # path is not file-like (e.g. a string)
132
+ pass
133
+
124
134
def read (self , path , columns = None , ** kwargs ):
125
135
path , _ , _ , should_close = get_filepath_or_buffer (path )
126
136
@@ -129,8 +139,12 @@ def read(self, path, columns=None, **kwargs):
129
139
** kwargs ).to_pandas ()
130
140
if should_close :
131
141
try :
132
- path .close ()
133
- except : # noqa: flake8
142
+ f , handles = _get_handle (path , mode = 'rb' )
143
+ f .close ()
144
+ for _fh in handles :
145
+ _fh .close ()
146
+ except AttributeError :
147
+ # path is not file-like (e.g. a string)
134
148
pass
135
149
136
150
return result
@@ -183,17 +197,27 @@ def write(self, df, path, compression='snappy', index=None,
183
197
# path is s3:// so we need to open the s3file in 'wb' mode.
184
198
# TODO: Support 'ab'
185
199
186
- path , _ , _ , _ = get_filepath_or_buffer (path , mode = 'wb' )
200
+ path , _ , _ , should_close = get_filepath_or_buffer (path , mode = 'wb' )
187
201
# And pass the opened s3file to the fastparquet internal impl.
188
202
kwargs ['open_with' ] = lambda path , _ : path
189
203
else :
190
- path , _ , _ , _ = get_filepath_or_buffer (path )
204
+ path , _ , _ , should_close = get_filepath_or_buffer (path )
191
205
192
206
with catch_warnings (record = True ):
193
207
self .api .write (path , df , compression = compression ,
194
208
write_index = index , partition_on = partition_cols ,
195
209
** kwargs )
196
210
211
+ if should_close :
212
+ try :
213
+ f , handles = _get_handle (path , mode = 'wb' )
214
+ f .close ()
215
+ for _fh in handles :
216
+ _fh .close ()
217
+ except AttributeError :
218
+ # path is not file-like (e.g. a string)
219
+ pass
220
+
197
221
def read (self , path , columns = None , ** kwargs ):
198
222
if is_s3_url (path ):
199
223
# When path is s3:// an S3File is returned.
@@ -205,9 +229,19 @@ def read(self, path, columns=None, **kwargs):
205
229
finally :
206
230
s3 .close ()
207
231
else :
208
- path , _ , _ , _ = get_filepath_or_buffer (path )
232
+ path , _ , _ , should_close = get_filepath_or_buffer (path )
209
233
parquet_file = self .api .ParquetFile (path )
210
234
235
+ if should_close :
236
+ try :
237
+ f , handles = _get_handle (path , mode = 'rb' )
238
+ f .close ()
239
+ for _fh in handles :
240
+ _fh .close ()
241
+ except (AttributeError , OSError ):
242
+ # path is not file-like (e.g. a string)
243
+ pass
244
+
211
245
return parquet_file .to_pandas (columns = columns , ** kwargs )
212
246
213
247
0 commit comments