Skip to content

Commit b5abd11

Browse files
authored
Merge branch 'master' into iss96247
2 parents 2c035d8 + 536ba56 commit b5abd11

File tree

10 files changed

+109
-27
lines changed

10 files changed

+109
-27
lines changed

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,35 @@
11
# Changelog
22

3+
## v2.25.0 (2021-02-19)
4+
5+
### Features
6+
7+
* Enable step caching
8+
* Add other Neo supported regions for Inferentia inference images
9+
10+
### Bug Fixes and Other Changes
11+
12+
* remove FailStep from pipelines
13+
* use sagemaker_session in workflow tests
14+
* use ECR public for multidatamodel tests
15+
* add the mapping from py3 to cuda11 images
16+
* Add 30s cap time for tag tests
17+
* add build spec for slow tests
18+
* mark top 10 slow tests
19+
* remove slow test_run_xxx_monitor_baseline tests
20+
* pin astroid to 2.4.2
21+
22+
### Testing and Release Infrastructure
23+
24+
* unmark more flaky integ tests
25+
* remove canary_quick pytest mark from flaky/unnecessary tests
26+
* remove python3.8 from buildspec
27+
* remove py38 tox env
28+
* fix release buildspec typo
29+
* unblock regional release builds
30+
* lower test TPS for experiment analytics
31+
* move package preparation and publishing to the deploy step
32+
333
## v2.24.5 (2021-02-12)
434

535
### Bug Fixes and Other Changes

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.24.6.dev0
1+
2.25.1.dev0

src/sagemaker/workflow/steps.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,38 @@ def ref(self) -> Dict[str, str]:
9292
return {"Name": self.name}
9393

9494

95+
@attr.s
96+
class CacheConfig:
97+
"""Configuration class to enable caching in pipeline workflow.
98+
99+
If caching is enabled, the pipeline attempts to find a previous execution of a step
100+
that was called with the same arguments. Step caching only considers successful execution.
101+
If a successful previous execution is found, the pipeline propagates the values
102+
from previous execution rather than recomputing the step. When multiple successful executions
103+
exist within the timeout period, it uses the result for the most recent successful execution.
104+
105+
106+
Attributes:
107+
enable_caching (bool): To enable step caching. Defaults to `False`.
108+
expire_after (str): If step caching is enabled, a timeout also needs to defined.
109+
It defines how old a previous execution can be to be considered for reuse.
110+
Value should be an ISO 8601 duration string. Defaults to `None`.
111+
"""
112+
113+
enable_caching: bool = attr.ib(default=False)
114+
expire_after = attr.ib(
115+
default=None, validator=attr.validators.optional(attr.validators.instance_of(str))
116+
)
117+
118+
@property
119+
def config(self):
120+
"""Configures caching in pipeline steps."""
121+
config = {"Enabled": self.enable_caching}
122+
if self.expire_after is not None:
123+
config["ExpireAfter"] = self.expire_after
124+
return {"CacheConfig": config}
125+
126+
95127
class TrainingStep(Step):
96128
"""Training step for workflow."""
97129

@@ -100,6 +132,7 @@ def __init__(
100132
name: str,
101133
estimator: EstimatorBase,
102134
inputs: TrainingInput = None,
135+
cache_config: CacheConfig = None,
103136
):
104137
"""Construct a TrainingStep, given an `EstimatorBase` instance.
105138
@@ -110,14 +143,15 @@ def __init__(
110143
name (str): The name of the training step.
111144
estimator (EstimatorBase): A `sagemaker.estimator.EstimatorBase` instance.
112145
inputs (TrainingInput): A `sagemaker.inputs.TrainingInput` instance. Defaults to `None`.
146+
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
113147
"""
114148
super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING)
115149
self.estimator = estimator
116150
self.inputs = inputs
117-
118151
self._properties = Properties(
119152
path=f"Steps.{name}", shape_name="DescribeTrainingJobResponse"
120153
)
154+
self.cache_config = cache_config
121155

122156
@property
123157
def arguments(self) -> RequestType:
@@ -144,6 +178,14 @@ def properties(self):
144178
"""A Properties object representing the DescribeTrainingJobResponse data model."""
145179
return self._properties
146180

181+
def to_request(self) -> RequestType:
182+
"""Updates the dictionary with cache configuration."""
183+
request_dict = super().to_request()
184+
if self.cache_config:
185+
request_dict.update(self.cache_config.config)
186+
187+
return request_dict
188+
147189

148190
class CreateModelStep(Step):
149191
"""CreateModel step for workflow."""
@@ -207,6 +249,7 @@ def __init__(
207249
name: str,
208250
transformer: Transformer,
209251
inputs: TransformInput,
252+
cache_config: CacheConfig = None,
210253
):
211254
"""Constructs a TransformStep, given an `Transformer` instance.
212255
@@ -217,11 +260,12 @@ def __init__(
217260
name (str): The name of the transform step.
218261
transformer (Transformer): A `sagemaker.transformer.Transformer` instance.
219262
inputs (TransformInput): A `sagemaker.inputs.TransformInput` instance.
263+
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
220264
"""
221265
super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM)
222266
self.transformer = transformer
223267
self.inputs = inputs
224-
268+
self.cache_config = cache_config
225269
self._properties = Properties(
226270
path=f"Steps.{name}", shape_name="DescribeTransformJobResponse"
227271
)
@@ -257,6 +301,14 @@ def properties(self):
257301
"""A Properties object representing the DescribeTransformJobResponse data model."""
258302
return self._properties
259303

304+
def to_request(self) -> RequestType:
305+
"""Updates the dictionary with cache configuration."""
306+
request_dict = super().to_request()
307+
if self.cache_config:
308+
request_dict.update(self.cache_config.config)
309+
310+
return request_dict
311+
260312

261313
class ProcessingStep(Step):
262314
"""Processing step for workflow."""
@@ -270,6 +322,7 @@ def __init__(
270322
job_arguments: List[str] = None,
271323
code: str = None,
272324
property_files: List[PropertyFile] = None,
325+
cache_config: CacheConfig = None,
273326
):
274327
"""Construct a ProcessingStep, given a `Processor` instance.
275328
@@ -289,6 +342,7 @@ def __init__(
289342
script to run. Defaults to `None`.
290343
property_files (List[PropertyFile]): A list of property files that workflow looks
291344
for and resolves from the configured processing output list.
345+
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
292346
"""
293347
super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING)
294348
self.processor = processor
@@ -305,6 +359,7 @@ def __init__(
305359
self._properties = Properties(
306360
path=f"Steps.{name}", shape_name="DescribeProcessingJobResponse"
307361
)
362+
self.cache_config = cache_config
308363

309364
@property
310365
def arguments(self) -> RequestType:
@@ -335,6 +390,8 @@ def properties(self):
335390
def to_request(self) -> RequestType:
336391
"""Get the request structure for workflow service calls."""
337392
request_dict = super(ProcessingStep, self).to_request()
393+
if self.cache_config:
394+
request_dict.update(self.cache_config.config)
338395
if self.property_files:
339396
request_dict["PropertyFiles"] = [
340397
property_file.expr for property_file in self.property_files

tests/integ/test_airflow_config.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
SINGLE_INSTANCE_COUNT = 1
6161

6262

63-
@pytest.mark.canary_quick
6463
def test_byo_airflow_config_uploads_data_source_to_s3_when_inputs_provided(
6564
sagemaker_session, cpu_instance_type
6665
):
@@ -92,7 +91,6 @@ def test_byo_airflow_config_uploads_data_source_to_s3_when_inputs_provided(
9291
)
9392

9493

95-
@pytest.mark.canary_quick
9694
def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
9795
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
9896
kmeans = KMeans(
@@ -153,7 +151,6 @@ def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_inst
153151
)
154152

155153

156-
@pytest.mark.canary_quick
157154
def test_ipinsights_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
158155
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
159156
data_path = os.path.join(DATA_DIR, "ipinsights")
@@ -214,7 +211,6 @@ def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
214211
tests.integ.test_region() in tests.integ.NO_LDA_REGIONS,
215212
reason="LDA image is not supported in certain regions",
216213
)
217-
@pytest.mark.canary_quick
218214
def test_lda_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
219215
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
220216
data_path = os.path.join(DATA_DIR, "lda")
@@ -247,7 +243,6 @@ def test_lda_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
247243
)
248244

249245

250-
@pytest.mark.canary_quick
251246
def test_linearlearner_airflow_config_uploads_data_source_to_s3(
252247
sagemaker_session, cpu_instance_type
253248
):
@@ -312,7 +307,6 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3(
312307
)
313308

314309

315-
@pytest.mark.canary_quick
316310
def test_ntm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
317311
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
318312
data_path = os.path.join(DATA_DIR, "ntm")
@@ -346,7 +340,6 @@ def test_ntm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
346340
)
347341

348342

349-
@pytest.mark.canary_quick
350343
def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
351344
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
352345
pca = PCA(
@@ -373,7 +366,6 @@ def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
373366
)
374367

375368

376-
@pytest.mark.canary_quick
377369
def test_rcf_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
378370
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
379371
# Generate a thousand 14-dimensional datapoints.
@@ -402,7 +394,6 @@ def test_rcf_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
402394
)
403395

404396

405-
@pytest.mark.canary_quick
406397
def test_chainer_airflow_config_uploads_data_source_to_s3(
407398
sagemaker_local_session, cpu_instance_type, chainer_latest_version, chainer_latest_py_version
408399
):
@@ -440,7 +431,6 @@ def test_chainer_airflow_config_uploads_data_source_to_s3(
440431
)
441432

442433

443-
@pytest.mark.canary_quick
444434
def test_mxnet_airflow_config_uploads_data_source_to_s3(
445435
sagemaker_session,
446436
cpu_instance_type,
@@ -476,7 +466,6 @@ def test_mxnet_airflow_config_uploads_data_source_to_s3(
476466
)
477467

478468

479-
@pytest.mark.canary_quick
480469
def test_sklearn_airflow_config_uploads_data_source_to_s3(
481470
sagemaker_session,
482471
cpu_instance_type,
@@ -516,7 +505,6 @@ def test_sklearn_airflow_config_uploads_data_source_to_s3(
516505
)
517506

518507

519-
@pytest.mark.canary_quick
520508
def test_tf_airflow_config_uploads_data_source_to_s3(
521509
sagemaker_session,
522510
cpu_instance_type,
@@ -550,7 +538,6 @@ def test_tf_airflow_config_uploads_data_source_to_s3(
550538
)
551539

552540

553-
@pytest.mark.canary_quick
554541
def test_xgboost_airflow_config_uploads_data_source_to_s3(
555542
sagemaker_session, cpu_instance_type, xgboost_latest_version
556543
):
@@ -576,7 +563,6 @@ def test_xgboost_airflow_config_uploads_data_source_to_s3(
576563
)
577564

578565

579-
@pytest.mark.canary_quick
580566
def test_pytorch_airflow_config_uploads_data_source_to_s3_when_inputs_not_provided(
581567
sagemaker_session,
582568
cpu_instance_type,

tests/integ/test_chainer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ def test_training_with_additional_hyperparameters(
7070
chainer.fit({"train": train_input, "test": test_input})
7171

7272

73-
@pytest.mark.canary_quick
7473
def test_attach_deploy(
7574
sagemaker_session, chainer_latest_version, chainer_latest_py_version, cpu_instance_type
7675
):

tests/integ/test_debugger.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,6 @@ def test_mxnet_with_tensorboard_output_config(
528528
_wait_and_assert_that_no_rule_jobs_errored(training_job=mx.latest_training_job)
529529

530530

531-
@pytest.mark.canary_quick
532531
def test_mxnet_with_all_rules_and_configs(
533532
sagemaker_session,
534533
mxnet_training_latest_version,

tests/integ/test_mxnet.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ def test_deploy_model_and_update_endpoint(
305305
assert new_config["ProductionVariants"][0]["InitialInstanceCount"] == 1
306306

307307

308-
@pytest.mark.canary_quick
309308
@pytest.mark.skipif(
310309
tests.integ.test_region() not in tests.integ.EI_SUPPORTED_REGIONS,
311310
reason="EI isn't supported in that specific region.",

tests/integ/test_tuner_multi_algo.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ def estimator_knn(sagemaker_session, cpu_instance_type):
9797
return estimator
9898

9999

100-
@pytest.mark.canary_quick
101100
def test_multi_estimator_tuning(
102101
sagemaker_session, estimator_fm, estimator_knn, data_set, cpu_instance_type
103102
):

tests/integ/test_workflow.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
CreateModelStep,
4747
ProcessingStep,
4848
TrainingStep,
49+
CacheConfig,
4950
)
5051
from sagemaker.workflow.step_collections import RegisterModel
5152
from sagemaker.workflow.pipeline import Pipeline
@@ -274,6 +275,8 @@ def test_one_step_sklearn_processing_pipeline(
274275
ProcessingInput(dataset_definition=athena_dataset_definition),
275276
]
276277

278+
cache_config = CacheConfig(enable_caching=True, expire_after="T30m")
279+
277280
sklearn_processor = SKLearnProcessor(
278281
framework_version=sklearn_latest_version,
279282
role=role,
@@ -289,6 +292,7 @@ def test_one_step_sklearn_processing_pipeline(
289292
processor=sklearn_processor,
290293
inputs=inputs,
291294
code=script_path,
295+
cache_config=cache_config,
292296
)
293297
pipeline = Pipeline(
294298
name=pipeline_name,
@@ -328,6 +332,11 @@ def test_one_step_sklearn_processing_pipeline(
328332
response = execution.describe()
329333
assert response["PipelineArn"] == create_arn
330334

335+
# Check CacheConfig
336+
response = json.loads(pipeline.describe()["PipelineDefinition"])["Steps"][0]["CacheConfig"]
337+
assert response["Enabled"] == cache_config.enable_caching
338+
assert response["ExpireAfter"] == cache_config.expire_after
339+
331340
try:
332341
execution.wait(delay=30, max_attempts=3)
333342
except WaiterError:

0 commit comments

Comments
 (0)