Skip to content

Commit 44b04ca

Browse files
authored
Merge pull request #205 from mariusvniekerk/autocommit-missing
Missing autocommit for fs.open()
2 parents 4f8e792 + 44d3554 commit 44b04ca

File tree

2 files changed

+56
-12
lines changed

2 files changed

+56
-12
lines changed

s3fs/core.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def get_delegated_s3pars(self, exp=3600):
253253
'token': cred['SessionToken'], 'anon': False}
254254

255255
def _open(self, path, mode='rb', block_size=None, acl='', version_id=None,
256-
fill_cache=None, cache_type='bytes', **kwargs):
256+
fill_cache=None, cache_type='bytes', autocommit=True, **kwargs):
257257
""" Open a file for reading or writing
258258
259259
Parameters
@@ -299,7 +299,8 @@ def _open(self, path, mode='rb', block_size=None, acl='', version_id=None,
299299

300300
return S3File(self, path, mode, block_size=block_size, acl=acl,
301301
version_id=version_id, fill_cache=fill_cache,
302-
s3_additional_kwargs=kw, cache_type=cache_type)
302+
s3_additional_kwargs=kw, cache_type=cache_type,
303+
autocommit=autocommit)
303304

304305
def _lsdir(self, path, refresh=False, max_items=None):
305306
if path.startswith('s3://'):
@@ -899,8 +900,11 @@ class S3File(AbstractBufferedFile):
899900
def __init__(self, s3, path, mode='rb', block_size=5 * 2 ** 20, acl="",
900901
version_id=None, fill_cache=True, s3_additional_kwargs=None,
901902
autocommit=True, cache_type='bytes'):
902-
if not split_path(path)[1]:
903+
bucket, key = split_path(path)
904+
if not key:
903905
raise ValueError('Attempt to open non key-like path: %s' % path)
906+
self.bucket = bucket
907+
self.key = key
904908
self.version_id = version_id
905909
self.acl = acl
906910
self.mpu = None
@@ -935,7 +939,6 @@ def _call_s3(self, method, *kwarglist, **kwargs):
935939
**kwargs)
936940

937941
def _initiate_upload(self):
938-
bucket, key = split_path(self.path)
939942
if self.acl and self.acl not in key_acls:
940943
raise ValueError('ACL not in %s', key_acls)
941944
self.parts = []
@@ -945,7 +948,7 @@ def _initiate_upload(self):
945948
try:
946949
self.mpu = self._call_s3(
947950
self.fs.s3.create_multipart_upload,
948-
Bucket=bucket, Key=key, ACL=self.acl)
951+
Bucket=self.bucket, Key=self.key, ACL=self.acl)
949952
except ClientError as e:
950953
raise translate_boto_error(e)
951954
except ParamValidationError as e:
@@ -958,8 +961,9 @@ def _initiate_upload(self):
958961
out = self.fs._call_s3(
959962
self.fs.s3.upload_part_copy,
960963
self.s3_additional_kwargs,
961-
Bucket=bucket,
962-
Key=key, PartNumber=1,
964+
Bucket=self.bucket,
965+
Key=self.key,
966+
PartNumber=1,
963967
UploadId=self.mpu['UploadId'],
964968
CopySource=self.path)
965969
self.parts.append({'PartNumber': 1,
@@ -1003,8 +1007,7 @@ def url(self, **kwargs):
10031007
return self.fs.url(self.path, **kwargs)
10041008

10051009
def _fetch_range(self, start, end):
1006-
bucket, key = self.path.split('/', 1)
1007-
return _fetch_range(self.fs.s3, bucket, key, self.version_id, start, end)
1010+
return _fetch_range(self.fs.s3, self.bucket, self.key, self.version_id, start, end)
10081011

10091012
def _upload_chunk(self, final=False):
10101013
bucket, key = split_path(self.path)
@@ -1050,12 +1053,11 @@ def _upload_chunk(self, final=False):
10501053

10511054
def commit(self):
10521055
logger.debug("COMMIT")
1053-
bucket, key = self.path.split('/', 1)
10541056
part_info = {'Parts': self.parts}
10551057
write_result = self._call_s3(
10561058
self.fs.s3.complete_multipart_upload,
1057-
Bucket=bucket,
1058-
Key=key,
1059+
Bucket=self.bucket,
1060+
Key=self.key,
10591061
UploadId=self.mpu['UploadId'],
10601062
MultipartUpload=part_info)
10611063
if self.fs.version_aware:
@@ -1072,6 +1074,16 @@ def commit(self):
10721074
self.fs.invalidate_cache(path)
10731075
path = path + '/' + p
10741076

1077+
def discard(self):
1078+
if self.autocommit:
1079+
raise ValueError("Cannot discard when autocommit is enabled")
1080+
self._call_s3(
1081+
self.fs.s3.abort_multipart_upload,
1082+
Bucket=self.bucket,
1083+
Key=self.key,
1084+
UploadId=self.mpu['UploadId'],
1085+
)
1086+
10751087

10761088
def _fetch_range(client, bucket, key, version_id, start, end, max_attempts=10,
10771089
req_kw=None):

s3fs/tests/test_s3fs.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,3 +1228,35 @@ def test_cache_after_copy(s3):
12281228
assert 'test/afile' in s3.ls('s3://test', False)
12291229
s3.cp('test/afile', 'test/bfile')
12301230
assert 'test/bfile' in s3.ls('s3://test', False)
1231+
1232+
1233+
def test_autocommit(s3):
1234+
auto_file = test_bucket_name + '/auto_file'
1235+
committed_file = test_bucket_name + '/commit_file'
1236+
aborted_file = test_bucket_name + '/aborted_file'
1237+
s3 = S3FileSystem(anon=False, version_aware=True)
1238+
1239+
def write_and_flush(path, autocommit):
1240+
with s3.open(path, 'wb', autocommit=autocommit) as fo:
1241+
fo.write(b'1')
1242+
return fo
1243+
1244+
# regular behavior
1245+
fo = write_and_flush(auto_file, autocommit=True)
1246+
assert fo.autocommit
1247+
assert s3.exists(auto_file)
1248+
1249+
fo = write_and_flush(committed_file, autocommit=False)
1250+
assert not fo.autocommit
1251+
assert not s3.exists(committed_file)
1252+
fo.commit()
1253+
assert s3.exists(committed_file)
1254+
1255+
fo = write_and_flush(aborted_file,autocommit=False)
1256+
assert not s3.exists(aborted_file)
1257+
fo.discard()
1258+
assert not s3.exists(aborted_file)
1259+
# Cannot commit a file that was discarded
1260+
with pytest.raises(Exception):
1261+
fo.commit()
1262+

0 commit comments

Comments
 (0)