Skip to content

Commit df107d2

Browse files
ddelangeandrewsg
authored andcommitted
fix: cancel upload when BlobWriter exits with exception (#1243)
1 parent 22b8c30 commit df107d2

File tree

4 files changed

+113
-7
lines changed

4 files changed

+113
-7
lines changed

README.rst

+6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ setup.py file. Applications which do not import directly from
7272
`google-resumable-media` can safely disregard this dependency. This backwards
7373
compatibility feature will be removed in a future major version update.
7474

75+
Miscellaneous
76+
~~~~~~~~~~~~~
77+
78+
- The BlobWriter class now attempts to terminate an ongoing resumable upload if
79+
the writer exits with an exception.
80+
7581
Quick Start
7682
-----------
7783

google/cloud/storage/fileio.py

+13
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,19 @@ def close(self):
437437
self._upload_chunks_from_buffer(1)
438438
self._buffer.close()
439439

440+
def terminate(self):
441+
"""Cancel the ResumableUpload."""
442+
if self._upload_and_transport:
443+
upload, transport = self._upload_and_transport
444+
transport.delete(upload.upload_url)
445+
self._buffer.close()
446+
447+
def __exit__(self, exc_type, exc_val, exc_tb):
448+
if exc_type is not None:
449+
self.terminate()
450+
else:
451+
self.close()
452+
440453
@property
441454
def closed(self):
442455
return self._buffer.closed

tests/system/test_fileio.py

+40
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
# limitations under the License.
1515

1616

17+
import pytest
18+
19+
from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE
1720
from .test_blob import _check_blob_hash
1821

1922

@@ -76,3 +79,40 @@ def test_blobwriter_and_blobreader_text_mode(
7679
assert text_data[:100] == reader.read(100)
7780
assert 0 == reader.seek(0)
7881
assert reader.read() == text_data
82+
83+
84+
def test_blobwriter_exit(
85+
shared_bucket,
86+
blobs_to_delete,
87+
service_account,
88+
):
89+
blob = shared_bucket.blob("NeverUploaded")
90+
91+
# no-op when nothing was uploaded yet
92+
with pytest.raises(ValueError, match="SIGTERM received"):
93+
with blob.open("wb") as writer:
94+
writer.write(b"first chunk") # not yet uploaded
95+
raise ValueError("SIGTERM received") # no upload to cancel in __exit__
96+
# blob should not exist
97+
assert not blob.exists()
98+
99+
# unhandled exceptions should cancel the upload
100+
with pytest.raises(ValueError, match="SIGTERM received"):
101+
with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
102+
writer.write(b"first chunk") # not yet uploaded
103+
writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded
104+
raise ValueError("SIGTERM received") # upload is cancelled in __exit__
105+
# blob should not exist
106+
assert not blob.exists()
107+
108+
# handled exceptions should not cancel the upload
109+
with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
110+
writer.write(b"first chunk") # not yet uploaded
111+
writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded
112+
try:
113+
raise ValueError("This is fine")
114+
except ValueError:
115+
pass # no exception context passed to __exit__
116+
blobs_to_delete.append(blob)
117+
# blob should have been uploaded
118+
assert blob.exists()

tests/unit/test_fileio.py

+54-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import mock
2222

2323
from google.api_core.exceptions import RequestRangeNotSatisfiable
24+
from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE
2425
from google.cloud.storage.retry import DEFAULT_RETRY
2526

2627
TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n"
@@ -377,7 +378,7 @@ def test_write(self, mock_warn):
377378
# Write under chunk_size. This should be buffered and the upload not
378379
# initiated.
379380
writer.write(TEST_BINARY_DATA[0:4])
380-
blob.initiate_resumable_upload.assert_not_called()
381+
blob._initiate_resumable_upload.assert_not_called()
381382

382383
# Write over chunk_size. This should result in upload initialization
383384
# and multiple chunks uploaded.
@@ -426,6 +427,52 @@ def test_close_errors(self):
426427
with self.assertRaises(ValueError):
427428
writer.write(TEST_BINARY_DATA)
428429

430+
def test_terminate_after_initiate(self):
431+
blob = mock.Mock()
432+
433+
upload = mock.Mock(upload_url="dummy")
434+
transport = mock.Mock()
435+
436+
blob._initiate_resumable_upload.return_value = (upload, transport)
437+
438+
with self.assertRaises(RuntimeError):
439+
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
440+
writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # initiate upload
441+
raise RuntimeError # should terminate the upload
442+
blob._initiate_resumable_upload.assert_called_once() # upload initiated
443+
self.assertTrue(writer.closed) # terminate called
444+
transport.delete.assert_called_with("dummy") # resumable upload terminated
445+
446+
def test_terminate_before_initiate(self):
447+
blob = mock.Mock()
448+
449+
upload = mock.Mock()
450+
transport = mock.Mock()
451+
452+
blob._initiate_resumable_upload.return_value = (upload, transport)
453+
454+
with self.assertRaises(RuntimeError):
455+
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
456+
writer.write(bytes(CHUNK_SIZE_MULTIPLE - 1)) # upload not yet initiated
457+
raise RuntimeError # there is no resumable upload to terminate
458+
blob._initiate_resumable_upload.assert_not_called() # upload not yet initiated
459+
self.assertTrue(writer.closed) # terminate called
460+
transport.delete.assert_not_called() # there's no resumable upload to terminate
461+
462+
def test_terminate_skipped(self):
463+
blob = mock.Mock()
464+
465+
upload = mock.Mock()
466+
transport = mock.Mock()
467+
468+
blob._initiate_resumable_upload.return_value = (upload, transport)
469+
470+
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
471+
writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # upload initiated
472+
blob._initiate_resumable_upload.assert_called() # upload initiated
473+
self.assertTrue(writer.closed) # close called
474+
transport.delete.assert_not_called() # terminate not called
475+
429476
def test_flush_fails(self):
430477
blob = mock.Mock(chunk_size=None)
431478
writer = self._make_blob_writer(blob)
@@ -468,7 +515,7 @@ def test_conditional_retry_failure(self):
468515
# Write under chunk_size. This should be buffered and the upload not
469516
# initiated.
470517
writer.write(TEST_BINARY_DATA[0:4])
471-
blob.initiate_resumable_upload.assert_not_called()
518+
blob._initiate_resumable_upload.assert_not_called()
472519

473520
# Write over chunk_size. This should result in upload initialization
474521
# and multiple chunks uploaded.
@@ -520,7 +567,7 @@ def test_conditional_retry_pass(self):
520567
# Write under chunk_size. This should be buffered and the upload not
521568
# initiated.
522569
writer.write(TEST_BINARY_DATA[0:4])
523-
blob.initiate_resumable_upload.assert_not_called()
570+
blob._initiate_resumable_upload.assert_not_called()
524571

525572
# Write over chunk_size. This should result in upload initialization
526573
# and multiple chunks uploaded.
@@ -573,7 +620,7 @@ def test_forced_default_retry(self):
573620
# Write under chunk_size. This should be buffered and the upload not
574621
# initiated.
575622
writer.write(TEST_BINARY_DATA[0:4])
576-
blob.initiate_resumable_upload.assert_not_called()
623+
blob._initiate_resumable_upload.assert_not_called()
577624

578625
# Write over chunk_size. This should result in upload initialization
579626
# and multiple chunks uploaded.
@@ -619,7 +666,7 @@ def test_num_retries_and_retry_conflict(self, mock_warn):
619666
# Write under chunk_size. This should be buffered and the upload not
620667
# initiated.
621668
writer.write(TEST_BINARY_DATA[0:4])
622-
blob.initiate_resumable_upload.assert_not_called()
669+
blob._initiate_resumable_upload.assert_not_called()
623670

624671
# Write over chunk_size. The mock will raise a ValueError, simulating
625672
# actual behavior when num_retries and retry are both specified.
@@ -673,7 +720,7 @@ def test_num_retries_only(self, mock_warn):
673720
# Write under chunk_size. This should be buffered and the upload not
674721
# initiated.
675722
writer.write(TEST_BINARY_DATA[0:4])
676-
blob.initiate_resumable_upload.assert_not_called()
723+
blob._initiate_resumable_upload.assert_not_called()
677724

678725
# Write over chunk_size. This should result in upload initialization
679726
# and multiple chunks uploaded.
@@ -965,7 +1012,7 @@ def test_write(self, mock_warn):
9651012
# Write under chunk_size. This should be buffered and the upload not
9661013
# initiated.
9671014
writer.write(TEST_MULTIBYTE_TEXT_DATA[0:2])
968-
blob.initiate_resumable_upload.assert_not_called()
1015+
blob._initiate_resumable_upload.assert_not_called()
9691016

9701017
# Write all data and close.
9711018
writer.write(TEST_MULTIBYTE_TEXT_DATA[2:])

0 commit comments

Comments
 (0)