Skip to content

Commit dcf6dc3

Browse files
committed
feature: handler for stopping transform job
1 parent a6585e2 commit dcf6dc3

File tree

4 files changed

+92
-0
lines changed

4 files changed

+92
-0
lines changed

src/sagemaker/session.py

+21
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,27 @@ def wait_for_transform_job(self, job, poll=5):
10061006
self._check_job_status(job, desc, "TransformJobStatus")
10071007
return desc
10081008

1009+
def stop_transform_job(self, name):
1010+
"""Stop the Amazon SageMaker hyperparameter tuning job with the specified name.
1011+
1012+
Args:
1013+
name (str): Name of the Amazon SageMaker batch transform job.
1014+
1015+
Raises:
1016+
ClientError: If an error occurs while trying to stop the batch transform job.
1017+
"""
1018+
try:
1019+
LOGGER.info('Stopping transform job: {}'.format(name))
1020+
self.sagemaker_client.stop_transform_job(TransformJobName=name)
1021+
except ClientError as e:
1022+
error_code = e.response['Error']['Code']
1023+
# allow to pass if the job already stopped
1024+
if error_code == 'ValidationException':
1025+
LOGGER.info('Transform job: {} is already stopped or not running.'.format(name))
1026+
else:
1027+
LOGGER.error('Error occurred while attempting to stop transform job: {}.'.format(name))
1028+
raise
1029+
10091030
def _check_job_status(self, job, desc, status_key_name):
10101031
"""Check to see if the job completed successfully and, if not, construct and
10111032
raise a ValueError.

src/sagemaker/transformer.py

+9
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ def wait(self):
206206
self._ensure_last_transform_job()
207207
self.latest_transform_job.wait()
208208

209+
def stop_transform_job(self):
210+
"""Stop latest running batch transform job.
211+
"""
212+
self._ensure_last_transform_job()
213+
self.latest_transform_job.stop()
214+
209215
def _ensure_last_transform_job(self):
210216
if self.latest_transform_job is None:
211217
raise ValueError("No transform job available")
@@ -305,6 +311,9 @@ def start_new(
305311
def wait(self):
306312
self.sagemaker_session.wait_for_transform_job(self.job_name)
307313

314+
def stop(self):
315+
self.sagemaker_session.stop_transform_job(name=self.job_name)
316+
308317
@staticmethod
309318
def _load_config(data, data_type, content_type, compression_type, split_type, transformer):
310319
input_config = _TransformJob._format_inputs_to_input_config(

tests/integ/test_transformer.py

+47
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import os
1717
import pickle
1818
import sys
19+
import time
1920

2021
import pytest
2122

@@ -364,4 +365,50 @@ def _create_transformer_and_transform_job(
364365
output_filter=output_filter,
365366
join_source=join_source,
366367
)
368+
369+
370+
def test_stop_transform_job(sagemaker_session, mxnet_full_version):
371+
data_path = os.path.join(DATA_DIR, 'mxnet_mnist')
372+
script_path = os.path.join(data_path, 'mnist.py')
373+
tags = [{'Key': 'some-tag', 'Value': 'value-for-tag'}]
374+
375+
mx = MXNet(entry_point=script_path, role='SageMakerRole', train_instance_count=1,
376+
train_instance_type='ml.c4.xlarge', sagemaker_session=sagemaker_session,
377+
framework_version=mxnet_full_version)
378+
379+
train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'),
380+
key_prefix='integ-test-data/mxnet_mnist/train')
381+
test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'),
382+
key_prefix='integ-test-data/mxnet_mnist/test')
383+
job_name = unique_name_from_base('test-mxnet-transform')
384+
385+
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
386+
mx.fit({'train': train_input, 'test': test_input}, job_name=job_name)
387+
388+
transform_input_path = os.path.join(data_path, 'transform', 'data.csv')
389+
transform_input_key_prefix = 'integ-test-data/mxnet_mnist/transform'
390+
transform_input = mx.sagemaker_session.upload_data(path=transform_input_path,
391+
key_prefix=transform_input_key_prefix)
392+
393+
transformer = mx.transformer(1, 'ml.m4.xlarge', tags=tags)
394+
transformer.transform(transform_input, content_type='text/csv')
395+
396+
time.sleep(15)
397+
398+
latest_transform_job_name = transformer.latest_transform_job.name
399+
400+
print('Attempting to stop {}'.format(latest_transform_job_name))
401+
402+
transformer.stop_transform_job()
403+
404+
desc = transformer.latest_transform_job.sagemaker_session.sagemaker_client \
405+
.describe_transform_job(TransformJobName=latest_transform_job_name)
406+
assert desc['TransformJobStatus'] == 'Stopping'
407+
408+
409+
def _create_transformer_and_transform_job(estimator, transform_input, volume_kms_key=None,
410+
input_filter=None, output_filter=None, join_source=None):
411+
transformer = estimator.transformer(1, 'ml.m4.xlarge', volume_kms_key=volume_kms_key)
412+
transformer.transform(transform_input, content_type='text/csv', input_filter=input_filter,
413+
output_filter=output_filter, join_source=join_source)
367414
return transformer

tests/unit/test_transformer.py

+15
Original file line numberDiff line numberDiff line change
@@ -449,3 +449,18 @@ def test_restart_output_path(start_new_job, transformer, sagemaker_session):
449449

450450
transformer.transform(DATA, job_name="job-2")
451451
assert transformer.output_path == "s3://{}/{}".format(S3_BUCKET, "job-2")
452+
453+
454+
def test_stop_transform_job(sagemaker_session, transformer):
455+
sagemaker_session.stop_transform_job = Mock(name='stop_transform_job')
456+
transformer.latest_transform_job = _TransformJob(sagemaker_session, JOB_NAME)
457+
458+
transformer.stop_transform_job()
459+
460+
sagemaker_session.stop_transform_job.assert_called_once_with(name=JOB_NAME)
461+
462+
463+
def test_stop_transform_job_no_transform_job(transformer):
464+
with pytest.raises(ValueError) as e:
465+
transformer.stop_transform_job()
466+
assert 'No transform job available' in str(e)

0 commit comments

Comments
 (0)