diff --git a/src/sagemaker/remote_function/job.py b/src/sagemaker/remote_function/job.py index 44a26b3e04..a3d6a1d780 100644 --- a/src/sagemaker/remote_function/job.py +++ b/src/sagemaker/remote_function/job.py @@ -891,7 +891,7 @@ def wait(self, timeout: int = None): """ self._last_describe_response = _logs_for_job( - boto_session=self.sagemaker_session.boto_session, + sagemaker_session=self.sagemaker_session, job_name=self.job_name, wait=True, timeout=timeout, diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index c8c37f87ff..ad1c1a7a20 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -5447,7 +5447,7 @@ def logs_for_job(self, job_name, wait=False, poll=10, log_type="All", timeout=No exceptions.CapacityError: If the training job fails with CapacityError. exceptions.UnexpectedStatusException: If waiting and the training job fails. """ - _logs_for_job(self.boto_session, job_name, wait, poll, log_type, timeout) + _logs_for_job(self, job_name, wait, poll, log_type, timeout) def logs_for_processing_job(self, job_name, wait=False, poll=10): """Display logs for a given processing job, optionally tailing them until the is complete. @@ -7330,7 +7330,7 @@ def _rule_statuses_changed(current_statuses, last_statuses): def _logs_for_job( # noqa: C901 - suppress complexity warning for this method - boto_session, job_name, wait=False, poll=10, log_type="All", timeout=None + sagemaker_session, job_name, wait=False, poll=10, log_type="All", timeout=None ): """Display logs for a given training job, optionally tailing them until job is complete. @@ -7338,9 +7338,8 @@ def _logs_for_job( # noqa: C901 - suppress complexity warning for this method based on which instance the log entry is from. Args: - boto_session (boto3.session.Session): The underlying Boto3 session which AWS service - calls are delegated to (default: None). If not provided, one is created with - default AWS configuration chain. + sagemaker_session (sagemaker.session.Session): A SageMaker Session + object, used for SageMaker interactions. job_name (str): Name of the training job to display the logs for. wait (bool): Whether to keep looking for new log entries until the job completes (default: False). @@ -7357,13 +7356,13 @@ def _logs_for_job( # noqa: C901 - suppress complexity warning for this method exceptions.CapacityError: If the training job fails with CapacityError. exceptions.UnexpectedStatusException: If waiting and the training job fails. """ - sagemaker_client = boto_session.client("sagemaker") + sagemaker_client = sagemaker_session.sagemaker_client request_end_time = time.time() + timeout if timeout else None description = sagemaker_client.describe_training_job(TrainingJobName=job_name) print(secondary_training_status_message(description, None), end="") instance_count, stream_names, positions, client, log_group, dot, color_wrap = _logs_init( - boto_session, description, job="Training" + sagemaker_session.boto_session, description, job="Training" ) state = _get_initial_job_state(description, "TrainingJobStatus", wait) diff --git a/tests/integ/sagemaker/remote_function/test_decorator.py b/tests/integ/sagemaker/remote_function/test_decorator.py index 27b11b0899..c1094c3ca5 100644 --- a/tests/integ/sagemaker/remote_function/test_decorator.py +++ b/tests/integ/sagemaker/remote_function/test_decorator.py @@ -207,7 +207,7 @@ def test_with_additional_dependencies( def cuberoot(x): from scipy.special import cbrt - return cbrt(27) + return cbrt(x) assert cuberoot(27) == 3 @@ -742,7 +742,7 @@ def test_with_user_and_workdir_set_in_the_image( def cuberoot(x): from scipy.special import cbrt - return cbrt(27) + return cbrt(x) assert cuberoot(27) == 3 diff --git a/tests/integ/test_local_mode.py b/tests/integ/test_local_mode.py index 64318678a0..a143e05012 100644 --- a/tests/integ/test_local_mode.py +++ b/tests/integ/test_local_mode.py @@ -24,6 +24,7 @@ import stopit import tests.integ.lock as lock +from sagemaker.remote_function import remote from sagemaker.workflow.step_outputs import get_step from tests.integ.sagemaker.conftest import _build_container, DOCKERFILE_TEMPLATE from sagemaker.config import SESSION_DEFAULT_S3_BUCKET_PATH @@ -58,6 +59,7 @@ LOCK_PATH = os.path.join(tempfile.gettempdir(), "sagemaker_test_local_mode_lock") DATA_PATH = os.path.join(DATA_DIR, "iris", "data") DEFAULT_REGION = "us-west-2" +ROLE = "SageMakerRole" class LocalNoS3Session(LocalSession): @@ -147,7 +149,7 @@ def _create_model(output_path): mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, instance_count=1, instance_type="local", output_path=output_path, @@ -218,7 +220,7 @@ def test_mxnet_local_mode( mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, py_version=mxnet_training_latest_py_version, instance_count=1, instance_type="local", @@ -254,7 +256,7 @@ def test_mxnet_distributed_local_mode( mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, py_version=mxnet_training_latest_py_version, instance_count=2, instance_type="local", @@ -289,7 +291,7 @@ def test_mxnet_local_data_local_script( mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, instance_count=1, instance_type="local", framework_version=mxnet_training_latest_version, @@ -324,7 +326,7 @@ def test_mxnet_local_training_env(mxnet_training_latest_version, mxnet_training_ mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, instance_count=1, instance_type="local", framework_version=mxnet_training_latest_version, @@ -347,7 +349,7 @@ def test_mxnet_training_failure( mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, framework_version=mxnet_training_latest_version, py_version=mxnet_training_latest_py_version, instance_count=1, @@ -377,7 +379,7 @@ def test_local_transform_mxnet( mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, instance_count=1, instance_type="local", framework_version=mxnet_inference_latest_version, @@ -426,7 +428,7 @@ def test_local_processing_sklearn(sagemaker_local_session_no_local_code, sklearn sklearn_processor = SKLearnProcessor( framework_version=sklearn_latest_version, - role="SageMakerRole", + role=ROLE, instance_type="local", instance_count=1, command=["python3"], @@ -457,7 +459,7 @@ def test_local_processing_script_processor(sagemaker_local_session, sklearn_imag input_file_path = os.path.join(DATA_DIR, "dummy_input.txt") script_processor = ScriptProcessor( - role="SageMakerRole", + role=ROLE, image_uri=sklearn_image_uri, command=["python3"], instance_count=1, @@ -527,7 +529,7 @@ def test_local_pipeline_with_processing_step(sklearn_latest_version, local_pipel string_container_arg = ParameterString(name="ProcessingContainerArg", default_value="foo") sklearn_processor = SKLearnProcessor( framework_version=sklearn_latest_version, - role="SageMakerRole", + role=ROLE, instance_type="local", instance_count=1, command=["python3"], @@ -549,7 +551,7 @@ def test_local_pipeline_with_processing_step(sklearn_latest_version, local_pipel sagemaker_session=local_pipeline_session, parameters=[string_container_arg], ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start() @@ -586,7 +588,7 @@ def test_local_pipeline_with_training_and_transform_steps( # define Estimator mx = MXNet( entry_point=script_path, - role="SageMakerRole", + role=ROLE, instance_count=instance_count, instance_type="local", framework_version=mxnet_training_latest_version, @@ -614,7 +616,7 @@ def test_local_pipeline_with_training_and_transform_steps( image_uri=inference_image_uri, model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=session, - role="SageMakerRole", + role=ROLE, ) # define create model step @@ -647,7 +649,7 @@ def test_local_pipeline_with_training_and_transform_steps( sagemaker_session=session, ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start(parameters={"InstanceCountParam": 1}) @@ -667,7 +669,7 @@ def test_local_pipeline_with_training_and_transform_steps( def test_local_pipeline_with_eval_cond_fail_steps(sklearn_image_uri, local_pipeline_session): processor = ScriptProcessor( image_uri=sklearn_image_uri, - role="SageMakerRole", + role=ROLE, instance_count=1, instance_type="local", sagemaker_session=local_pipeline_session, @@ -729,7 +731,7 @@ def test_local_pipeline_with_eval_cond_fail_steps(sklearn_image_uri, local_pipel sagemaker_session=local_pipeline_session, ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start() @@ -763,7 +765,7 @@ def test_local_pipeline_with_step_decorator_and_step_dependency( local_pipeline_session, dummy_container ): step_settings = dict( - role="SageMakerRole", + role=ROLE, instance_type="ml.m5.xlarge", image_uri=dummy_container, keep_alive_period_in_seconds=60, @@ -787,7 +789,7 @@ def sum(a, b): sagemaker_session=local_pipeline_session, ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start() @@ -808,7 +810,7 @@ def test_local_pipeline_with_step_decorator_and_pre_exe_script( local_pipeline_session, dummy_container ): step_settings = dict( - role="SageMakerRole", + role=ROLE, instance_type="local", image_uri=dummy_container, keep_alive_period_in_seconds=60, @@ -833,7 +835,7 @@ def validate_file_exists(files_exists, files_does_not_exist): sagemaker_session=local_pipeline_session, ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start() @@ -851,7 +853,7 @@ def test_local_pipeline_with_step_decorator_and_condition_step( local_pipeline_session, dummy_container ): step_settings = dict( - role="SageMakerRole", + role=ROLE, instance_type="local", image_uri=dummy_container, keep_alive_period_in_seconds=60, @@ -888,7 +890,7 @@ def else_step(): sagemaker_session=local_pipeline_session, ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start() @@ -916,7 +918,7 @@ def test_local_pipeline_with_step_decorator_data_referenced_by_other_steps( @step( name="step1", image_uri=dummy_container, - role="SageMakerRole", + role=ROLE, instance_type="ml.m5.xlarge", keep_alive_period_in_seconds=60, ) @@ -933,7 +935,7 @@ def func(var: int): sklearn_processor = SKLearnProcessor( framework_version=sklearn_latest_version, - role="SageMakerRole", + role=ROLE, instance_type="local", instance_count=step_output[1], command=["python3"], @@ -967,7 +969,7 @@ def func(var: int): sagemaker_session=local_pipeline_session, ) - pipeline.create("SageMakerRole", "pipeline for sdk integ testing") + pipeline.create(ROLE, "pipeline for sdk integ testing") with lock.lock(LOCK_PATH): execution = pipeline.start() @@ -983,3 +985,23 @@ def func(var: int): assert exe_step_result["StepStatus"] == "Succeeded" if exe_step_result["StepName"] == cond_step.name: assert exe_step_result["Metadata"]["Condition"]["Outcome"] is True + + +def test_local_remote_function_with_additional_dependencies( + local_pipeline_session, dummy_container +): + dependencies_path = os.path.join(DATA_DIR, "remote_function", "requirements.txt") + + @remote( + role=ROLE, + image_uri=dummy_container, + dependencies=dependencies_path, + instance_type="local", + sagemaker_session=local_pipeline_session, + ) + def cuberoot(x): + from scipy.special import cbrt + + return cbrt(x) + + assert cuberoot(27) == 3 diff --git a/tests/unit/sagemaker/remote_function/test_job.py b/tests/unit/sagemaker/remote_function/test_job.py index 991146eba3..f025276634 100644 --- a/tests/unit/sagemaker/remote_function/test_job.py +++ b/tests/unit/sagemaker/remote_function/test_job.py @@ -1108,7 +1108,7 @@ def test_wait(session, mock_stored_function, mock_logs_for_job, *args): job.wait(timeout=10) mock_logs_for_job.assert_called_with( - boto_session=ANY, job_name=job.job_name, wait=True, timeout=10 + sagemaker_session=ANY, job_name=job.job_name, wait=True, timeout=10 ) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index d6079a098e..336c7c887b 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -2497,9 +2497,7 @@ def sagemaker_session_full_lifecycle(boto_session_full_lifecycle): def test_logs_for_job_no_wait(cw, sagemaker_session_complete): ims = sagemaker_session_complete ims.logs_for_job(JOB_NAME) - ims.boto_session.client.return_value.describe_training_job.assert_called_once_with( - TrainingJobName=JOB_NAME - ) + ims.sagemaker_client.describe_training_job.assert_called_once_with(TrainingJobName=JOB_NAME) cw().assert_called_with(0, "hi there #1") @@ -2507,9 +2505,7 @@ def test_logs_for_job_no_wait(cw, sagemaker_session_complete): def test_logs_for_job_no_wait_stopped_job(cw, sagemaker_session_stopped): ims = sagemaker_session_stopped ims.logs_for_job(JOB_NAME) - ims.boto_session.client.return_value.describe_training_job.assert_called_once_with( - TrainingJobName=JOB_NAME - ) + ims.sagemaker_client.describe_training_job.assert_called_once_with(TrainingJobName=JOB_NAME) cw().assert_called_with(0, "hi there #1") @@ -2517,7 +2513,7 @@ def test_logs_for_job_no_wait_stopped_job(cw, sagemaker_session_stopped): def test_logs_for_job_wait_on_completed(cw, sagemaker_session_complete): ims = sagemaker_session_complete ims.logs_for_job(JOB_NAME, wait=True, poll=0) - assert ims.boto_session.client.return_value.describe_training_job.call_args_list == [ + assert ims.sagemaker_client.describe_training_job.call_args_list == [ call(TrainingJobName=JOB_NAME) ] cw().assert_called_with(0, "hi there #1") @@ -2527,7 +2523,7 @@ def test_logs_for_job_wait_on_completed(cw, sagemaker_session_complete): def test_logs_for_job_wait_on_stopped(cw, sagemaker_session_stopped): ims = sagemaker_session_stopped ims.logs_for_job(JOB_NAME, wait=True, poll=0) - assert ims.boto_session.client.return_value.describe_training_job.call_args_list == [ + assert ims.sagemaker_client.describe_training_job.call_args_list == [ call(TrainingJobName=JOB_NAME) ] cw().assert_called_with(0, "hi there #1") @@ -2537,7 +2533,7 @@ def test_logs_for_job_wait_on_stopped(cw, sagemaker_session_stopped): def test_logs_for_job_no_wait_on_running(cw, sagemaker_session_ready_lifecycle): ims = sagemaker_session_ready_lifecycle ims.logs_for_job(JOB_NAME) - assert ims.boto_session.client.return_value.describe_training_job.call_args_list == [ + assert ims.sagemaker_client.describe_training_job.call_args_list == [ call(TrainingJobName=JOB_NAME) ] cw().assert_called_with(0, "hi there #1") @@ -2549,7 +2545,7 @@ def test_logs_for_job_full_lifecycle(time, cw, sagemaker_session_full_lifecycle) ims = sagemaker_session_full_lifecycle ims.logs_for_job(JOB_NAME, wait=True, poll=0) assert ( - ims.boto_session.client.return_value.describe_training_job.call_args_list + ims.sagemaker_client.describe_training_job.call_args_list == [call(TrainingJobName=JOB_NAME)] * 3 ) assert cw().call_args_list == [