Skip to content

fix: Improve Pipeline integ tests and fix resource leak #3577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions tests/integ/sagemaker/workflow/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import absolute_import

from botocore.exceptions import WaiterError

from sagemaker.workflow.pipeline import _PipelineExecution


def wait_pipeline_execution(execution: _PipelineExecution, delay: int = 30, max_attempts: int = 60):
try:
execution.wait(delay=delay, max_attempts=max_attempts)
except WaiterError:
pass
7 changes: 2 additions & 5 deletions tests/integ/sagemaker/workflow/test_automl_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import re

import pytest
from botocore.exceptions import WaiterError

from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution
from sagemaker.workflow import ParameterString
from sagemaker.workflow.automl_step import AutoMLStep
from sagemaker.automl.automl import AutoML, AutoMLInput
Expand Down Expand Up @@ -133,10 +133,7 @@ def test_automl_step(pipeline_session, role, pipeline_name):
try:
_ = pipeline.create(role)
execution = pipeline.start(parameters={})
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)

execution_steps = execution.list_steps()
has_automl_job = False
Expand Down
12 changes: 3 additions & 9 deletions tests/integ/sagemaker/workflow/test_clarify_check_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import os

import pytest
from botocore.exceptions import WaiterError

from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution
from sagemaker.clarify import (
BiasConfig,
DataConfig,
Expand Down Expand Up @@ -182,10 +182,7 @@ def test_one_step_data_bias_pipeline_happycase(

assert response["PipelineArn"] == create_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 1
Expand Down Expand Up @@ -272,10 +269,7 @@ def test_one_step_data_bias_pipeline_constraint_violation(

assert response["PipelineArn"] == create_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 1
Expand Down
12 changes: 3 additions & 9 deletions tests/integ/sagemaker/workflow/test_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pytest

from botocore.exceptions import WaiterError
from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution
from sagemaker.processing import ProcessingInput
from sagemaker.session import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
Expand Down Expand Up @@ -120,10 +120,7 @@ def test_pipeline_execution_with_default_experiment_config(
pipeline.create(role)
execution = pipeline.start(parameters={})

try:
execution.wait(delay=30, max_attempts=3)
except WaiterError:
pass
wait_pipeline_execution(execution=execution, max_attempts=3)
execution_steps = execution.list_steps()
assert len(execution_steps) == 1
assert execution_steps[0]["StepName"] == "sklearn-process"
Expand Down Expand Up @@ -195,10 +192,7 @@ def test_pipeline_execution_with_custom_experiment_config(
pipeline.create(role)
execution = pipeline.start(parameters={})

try:
execution.wait(delay=30, max_attempts=3)
except WaiterError:
pass
wait_pipeline_execution(execution=execution, max_attempts=3)
execution_steps = execution.list_steps()
assert len(execution_steps) == 1
assert execution_steps[0]["StepName"] == "sklearn-process"
Expand Down
22 changes: 5 additions & 17 deletions tests/integ/sagemaker/workflow/test_fail_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from __future__ import absolute_import

import pytest
from botocore.exceptions import WaiterError

from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution
from sagemaker import get_execution_role, utils
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionEquals
Expand Down Expand Up @@ -62,10 +62,7 @@ def test_two_step_fail_pipeline_with_str_err_msg(sagemaker_session, role, pipeli
response = execution.describe()
assert response["PipelineArn"] == pipeline_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 2
Expand Down Expand Up @@ -130,10 +127,7 @@ def test_two_step_fail_pipeline_with_parameter_err_msg(sagemaker_session, role,
response = execution.describe()
assert response["PipelineArn"] == pipeline_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 2
Expand Down Expand Up @@ -196,10 +190,7 @@ def test_two_step_fail_pipeline_with_join_fn(sagemaker_session, role, pipeline_n
response = execution.describe()
assert response["PipelineArn"] == pipeline_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 2
Expand Down Expand Up @@ -257,10 +248,7 @@ def test_two_step_fail_pipeline_with_no_err_msg(sagemaker_session, role, pipelin
response = execution.describe()
assert response["PipelineArn"] == pipeline_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import re

import pytest
from botocore.exceptions import WaiterError

import tests
from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution
from sagemaker.tensorflow import TensorFlow, TensorFlowModel
from tests.integ.retry import retries
from sagemaker.drift_check_baselines import DriftCheckBaselines
Expand Down Expand Up @@ -180,12 +180,14 @@ def test_conditional_pytorch_training_model_registration(
)

execution = pipeline.start(parameters={})
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
)

execution = pipeline.start(parameters={"GoodEnoughInput": 0})
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
Expand Down Expand Up @@ -259,12 +261,14 @@ def test_mxnet_model_registration(
)

execution = pipeline.start(parameters={})
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
)

execution = pipeline.start()
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
Expand Down Expand Up @@ -470,12 +474,14 @@ def test_sklearn_xgboost_sip_model_registration(
)

execution = pipeline.start(parameters={})
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
)

execution = pipeline.start()
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
Expand Down Expand Up @@ -656,10 +662,7 @@ def test_model_registration_with_drift_check_baselines(

assert response["PipelineArn"] == create_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 1
Expand Down Expand Up @@ -797,12 +800,14 @@ def test_model_registration_with_model_repack(
)

execution = pipeline.start(parameters={})
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
)

execution = pipeline.start(parameters={"GoodEnoughInput": 0})
wait_pipeline_execution(execution=execution)
assert re.match(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
Expand Down Expand Up @@ -889,10 +894,7 @@ def test_model_registration_with_tensorflow_model_with_pipeline_model(
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/",
execution.arn,
)
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

for step in execution_steps:
Expand Down
37 changes: 8 additions & 29 deletions tests/integ/sagemaker/workflow/test_model_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import os

import pytest
from botocore.exceptions import WaiterError

from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from tests.integ.timeout import timeout_and_delete_endpoint_by_name
Expand Down Expand Up @@ -155,10 +155,7 @@ def test_pytorch_training_model_registration_and_creation_without_custom_inferen
seconds_to_sleep=10,
):
execution = pipeline.start(parameters={})
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()
is_execution_fail = False
for step in execution_steps:
Expand Down Expand Up @@ -284,10 +281,7 @@ def test_pytorch_training_model_registration_and_creation_with_custom_inference(
seconds_to_sleep=10,
):
execution = pipeline.start(parameters={})
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()
is_execution_fail = False
for step in execution_steps:
Expand Down Expand Up @@ -372,10 +366,7 @@ def test_mxnet_model_registration_with_custom_inference(
seconds_to_sleep=10,
):
execution = pipeline.start()
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 1
Expand Down Expand Up @@ -550,10 +541,7 @@ def test_model_registration_with_drift_check_baselines_and_model_metrics(

assert response["PipelineArn"] == create_arn

try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()

assert len(execution_steps) == 1
Expand Down Expand Up @@ -667,10 +655,7 @@ def test_model_registration_with_tensorflow_model_with_pipeline_model(
seconds_to_sleep=10,
):
execution = pipeline.start(parameters={})
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)
execution_steps = execution.list_steps()
is_execution_fail = False
for step in execution_steps:
Expand Down Expand Up @@ -750,10 +735,7 @@ def test_xgboost_model_register_and_deploy_with_runtime_repack(
seconds_to_sleep=10,
):
execution = pipeline.start(parameters={})
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)

# Verify the pipeline execution succeeded
step_register_model = None
Expand Down Expand Up @@ -866,10 +848,7 @@ def test_tensorflow_model_register_and_deploy_with_runtime_repack(
seconds_to_sleep=10,
):
execution = pipeline.start(parameters={})
try:
execution.wait(delay=30, max_attempts=60)
except WaiterError:
pass
wait_pipeline_execution(execution=execution)

# Verify the pipeline execution succeeded
step_register_model = None
Expand Down
Loading