Skip to content

Commit 2c1b887

Browse files
jiapinwAlex Tangcan-sunalex-tangsuryans-commit
authored
feature: add SageMaker FeatureStore feature processing (#3944)
Co-authored-by: Alex Tang <[email protected]> Co-authored-by: cansun <[email protected]> Co-authored-by: Alex Tang <[email protected]> Co-authored-by: Can Sun <[email protected]> Co-authored-by: jiapinw <[email protected]> Co-authored-by: suryans-commit <[email protected]>
1 parent 7860f16 commit 2c1b887

File tree

79 files changed

+13754
-137
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+13754
-137
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pyspark==3.3.1
2+
sagemaker-feature-store-pyspark-3.3

requirements/extras/test_requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ scipy==1.7.3
2727
urllib3>=1.26.8,<1.26.15
2828
docker>=5.0.2,<7.0.0
2929
PyYAML==6.0
30+
pyspark==3.3.1
31+
sagemaker-feature-store-pyspark-3.3

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ def read_requirements(filename):
7171
extras = {
7272
"local": read_requirements("requirements/extras/local_requirements.txt"),
7373
"scipy": read_requirements("requirements/extras/scipy_requirements.txt"),
74+
"feature-processor": read_requirements(
75+
"requirements/extras/feature-processor_requirements.txt"
76+
),
7477
}
7578
# Meta dependency groups
7679
extras["all"] = [item for group in extras.values() for item in group]

src/sagemaker/estimator.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ def __init__(
170170
instance_groups: Optional[List[InstanceGroup]] = None,
171171
training_repository_access_mode: Optional[Union[str, PipelineVariable]] = None,
172172
training_repository_credentials_provider_arn: Optional[Union[str, PipelineVariable]] = None,
173+
container_entry_point: Optional[List[str]] = None,
174+
container_arguments: Optional[List[str]] = None,
173175
**kwargs,
174176
):
175177
"""Initialize an ``EstimatorBase`` instance.
@@ -523,6 +525,11 @@ def __init__(
523525
private Docker registry where your training image is hosted (default: None).
524526
When it's set to None, SageMaker will not do authentication before pulling the image
525527
in the private Docker registry.
528+
container_entry_point (List[str]): Optional. The entrypoint script for a Docker
529+
container used to run a training job. This script takes precedence over
530+
the default train processing instructions.
531+
container_arguments (List[str]): Optional. The arguments for a container used to run
532+
a training job.
526533
"""
527534
instance_count = renamed_kwargs(
528535
"train_instance_count", "instance_count", instance_count, kwargs
@@ -647,6 +654,10 @@ def __init__(
647654
training_repository_credentials_provider_arn
648655
)
649656

657+
# container entry point / arguments configs
658+
self.container_entry_point = container_entry_point
659+
self.container_arguments = container_arguments
660+
650661
self.encrypt_inter_container_traffic = resolve_value_from_config(
651662
direct_input=encrypt_inter_container_traffic,
652663
config_path=TRAINING_JOB_INTER_CONTAINER_ENCRYPTION_PATH,
@@ -1786,6 +1797,16 @@ def _prepare_init_params_from_job_description(cls, job_details, model_channel_na
17861797
"MetricsDefinition"
17871798
]
17881799

1800+
if "ContainerEntrypoint" in job_details["AlgorithmSpecification"]:
1801+
init_params["container_entry_point"] = job_details["AlgorithmSpecification"][
1802+
"ContainerEntrypoint"
1803+
]
1804+
1805+
if "ContainerArguments" in job_details["AlgorithmSpecification"]:
1806+
init_params["container_arguments"] = job_details["AlgorithmSpecification"][
1807+
"ContainerArguments"
1808+
]
1809+
17891810
if "EnableInterContainerTrafficEncryption" in job_details:
17901811
init_params["encrypt_inter_container_traffic"] = job_details[
17911812
"EnableInterContainerTrafficEncryption"
@@ -2266,6 +2287,12 @@ def _get_train_args(cls, estimator, inputs, experiment_config):
22662287
] = estimator.training_repository_credentials_provider_arn
22672288
train_args["training_image_config"] = training_image_config
22682289

2290+
if estimator.container_entry_point is not None:
2291+
train_args["container_entry_point"] = estimator.container_entry_point
2292+
2293+
if estimator.container_arguments is not None:
2294+
train_args["container_arguments"] = estimator.container_arguments
2295+
22692296
# encrypt_inter_container_traffic may be a pipeline variable place holder object
22702297
# which is parsed in execution time
22712298
# This does not check config because the EstimatorBase constuctor already did that check
@@ -2472,6 +2499,8 @@ def __init__(
24722499
instance_groups: Optional[List[InstanceGroup]] = None,
24732500
training_repository_access_mode: Optional[Union[str, PipelineVariable]] = None,
24742501
training_repository_credentials_provider_arn: Optional[Union[str, PipelineVariable]] = None,
2502+
container_entry_point: Optional[List[str]] = None,
2503+
container_arguments: Optional[List[str]] = None,
24752504
**kwargs,
24762505
):
24772506
"""Initialize an ``Estimator`` instance.
@@ -2824,6 +2853,11 @@ def __init__(
28242853
private Docker registry where your training image is hosted (default: None).
28252854
When it's set to None, SageMaker will not do authentication before pulling the image
28262855
in the private Docker registry.
2856+
container_entry_point (List[str]): Optional. The entrypoint script for a Docker
2857+
container used to run a training job. This script takes precedence over
2858+
the default train processing instructions.
2859+
container_arguments (List[str]): Optional. The arguments for a container used to run
2860+
a training job.
28272861
"""
28282862
self.image_uri = image_uri
28292863
self._hyperparameters = hyperparameters.copy() if hyperparameters else {}
@@ -2871,6 +2905,8 @@ def __init__(
28712905
instance_groups=instance_groups,
28722906
training_repository_access_mode=training_repository_access_mode,
28732907
training_repository_credentials_provider_arn=training_repository_credentials_provider_arn, # noqa: E501 # pylint: disable=line-too-long
2908+
container_entry_point=container_entry_point,
2909+
container_arguments=container_arguments,
28742910
**kwargs,
28752911
)
28762912

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Exported classes for the sagemaker.feature_store.feature_processor module."""
14+
from __future__ import absolute_import
15+
16+
from sagemaker.feature_store.feature_processor._data_source import ( # noqa: F401
17+
CSVDataSource,
18+
FeatureGroupDataSource,
19+
ParquetDataSource,
20+
)
21+
from sagemaker.feature_store.feature_processor._exceptions import ( # noqa: F401
22+
IngestionError,
23+
)
24+
from sagemaker.feature_store.feature_processor.feature_processor import ( # noqa: F401
25+
feature_processor,
26+
)
27+
from sagemaker.feature_store.feature_processor.feature_scheduler import ( # noqa: F401
28+
to_pipeline,
29+
schedule,
30+
describe,
31+
delete_schedule,
32+
list_pipelines,
33+
execute,
34+
TransformationCode,
35+
)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Contains classes for preparing and uploading configs for a scheduled feature processor."""
14+
from __future__ import absolute_import
15+
from typing import Callable, Dict, Tuple, List
16+
17+
import attr
18+
19+
from sagemaker import Session
20+
from sagemaker.feature_store.feature_processor._constants import (
21+
SPARK_JAR_FILES_PATH,
22+
SPARK_PY_FILES_PATH,
23+
SPARK_FILES_PATH,
24+
S3_DATA_DISTRIBUTION_TYPE,
25+
)
26+
from sagemaker.inputs import TrainingInput
27+
from sagemaker.remote_function.core.stored_function import StoredFunction
28+
from sagemaker.remote_function.job import (
29+
_prepare_and_upload_dependencies,
30+
_prepare_and_upload_runtime_scripts,
31+
_JobSettings,
32+
RUNTIME_SCRIPTS_CHANNEL_NAME,
33+
REMOTE_FUNCTION_WORKSPACE,
34+
SPARK_CONF_WORKSPACE,
35+
_prepare_and_upload_spark_dependent_files,
36+
)
37+
from sagemaker.remote_function.runtime_environment.runtime_environment_manager import (
38+
RuntimeEnvironmentManager,
39+
)
40+
from sagemaker.remote_function.spark_config import SparkConfig
41+
from sagemaker.s3 import s3_path_join
42+
43+
44+
@attr.s
45+
class ConfigUploader:
46+
"""Prepares and uploads customer provided configs to S3"""
47+
48+
remote_decorator_config: _JobSettings = attr.ib()
49+
runtime_env_manager: RuntimeEnvironmentManager = attr.ib()
50+
51+
def prepare_step_input_channel_for_spark_mode(
52+
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
53+
) -> Tuple[Dict, Dict]:
54+
"""Prepares input channels for SageMaker Pipeline Step."""
55+
self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session)
56+
bootstrap_scripts_s3uri = self._prepare_and_upload_runtime_scripts(
57+
self.remote_decorator_config.spark_config,
58+
s3_base_uri,
59+
self.remote_decorator_config.s3_kms_key,
60+
sagemaker_session,
61+
)
62+
dependencies_list_path = self.runtime_env_manager.snapshot(
63+
self.remote_decorator_config.dependencies
64+
)
65+
user_dependencies_s3uri = self._prepare_and_upload_dependencies(
66+
dependencies_list_path,
67+
self.remote_decorator_config.include_local_workdir,
68+
self.remote_decorator_config.pre_execution_commands,
69+
self.remote_decorator_config.pre_execution_script,
70+
s3_base_uri,
71+
self.remote_decorator_config.s3_kms_key,
72+
sagemaker_session,
73+
)
74+
75+
(
76+
submit_jars_s3_paths,
77+
submit_py_files_s3_paths,
78+
submit_files_s3_path,
79+
config_file_s3_uri,
80+
) = self._prepare_and_upload_spark_dependent_files(
81+
self.remote_decorator_config.spark_config,
82+
s3_base_uri,
83+
self.remote_decorator_config.s3_kms_key,
84+
sagemaker_session,
85+
)
86+
87+
input_data_config = {
88+
RUNTIME_SCRIPTS_CHANNEL_NAME: TrainingInput(
89+
s3_data=bootstrap_scripts_s3uri,
90+
s3_data_type="S3Prefix",
91+
distribution=S3_DATA_DISTRIBUTION_TYPE,
92+
)
93+
}
94+
if user_dependencies_s3uri:
95+
input_data_config[REMOTE_FUNCTION_WORKSPACE] = TrainingInput(
96+
s3_data=s3_path_join(s3_base_uri, REMOTE_FUNCTION_WORKSPACE),
97+
s3_data_type="S3Prefix",
98+
distribution=S3_DATA_DISTRIBUTION_TYPE,
99+
)
100+
101+
if config_file_s3_uri:
102+
input_data_config[SPARK_CONF_WORKSPACE] = TrainingInput(
103+
s3_data=config_file_s3_uri,
104+
s3_data_type="S3Prefix",
105+
distribution=S3_DATA_DISTRIBUTION_TYPE,
106+
)
107+
108+
return input_data_config, {
109+
SPARK_JAR_FILES_PATH: submit_jars_s3_paths,
110+
SPARK_PY_FILES_PATH: submit_py_files_s3_paths,
111+
SPARK_FILES_PATH: submit_files_s3_path,
112+
}
113+
114+
def _prepare_and_upload_callable(
115+
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
116+
) -> None:
117+
"""Prepares and uploads callable to S3"""
118+
stored_function = StoredFunction(
119+
sagemaker_session=sagemaker_session,
120+
s3_base_uri=s3_base_uri,
121+
hmac_key=self.remote_decorator_config.environment_variables[
122+
"REMOTE_FUNCTION_SECRET_KEY"
123+
],
124+
s3_kms_key=self.remote_decorator_config.s3_kms_key,
125+
)
126+
stored_function.save(func)
127+
128+
def _prepare_and_upload_dependencies(
129+
self,
130+
local_dependencies_path: str,
131+
include_local_workdir: bool,
132+
pre_execution_commands: List[str],
133+
pre_execution_script_local_path: str,
134+
s3_base_uri: str,
135+
s3_kms_key: str,
136+
sagemaker_session: Session,
137+
) -> str:
138+
"""Upload the training step dependencies to S3 if present"""
139+
return _prepare_and_upload_dependencies(
140+
local_dependencies_path=local_dependencies_path,
141+
include_local_workdir=include_local_workdir,
142+
pre_execution_commands=pre_execution_commands,
143+
pre_execution_script_local_path=pre_execution_script_local_path,
144+
s3_base_uri=s3_base_uri,
145+
s3_kms_key=s3_kms_key,
146+
sagemaker_session=sagemaker_session,
147+
)
148+
149+
def _prepare_and_upload_runtime_scripts(
150+
self,
151+
spark_config: SparkConfig,
152+
s3_base_uri: str,
153+
s3_kms_key: str,
154+
sagemaker_session: Session,
155+
) -> str:
156+
"""Copy runtime scripts to a folder and upload to S3"""
157+
return _prepare_and_upload_runtime_scripts(
158+
spark_config=spark_config,
159+
s3_base_uri=s3_base_uri,
160+
s3_kms_key=s3_kms_key,
161+
sagemaker_session=sagemaker_session,
162+
)
163+
164+
def _prepare_and_upload_spark_dependent_files(
165+
self,
166+
spark_config: SparkConfig,
167+
s3_base_uri: str,
168+
s3_kms_key: str,
169+
sagemaker_session: Session,
170+
) -> Tuple:
171+
"""Upload the spark dependencies to S3 if present"""
172+
if not spark_config:
173+
return None, None, None, None
174+
175+
return _prepare_and_upload_spark_dependent_files(
176+
spark_config=spark_config,
177+
s3_base_uri=s3_base_uri,
178+
s3_kms_key=s3_kms_key,
179+
sagemaker_session=sagemaker_session,
180+
)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Module containing constants for feature_processor and feature_scheduler module."""
14+
from __future__ import absolute_import
15+
16+
from sagemaker.workflow.parameters import Parameter, ParameterTypeEnum
17+
18+
DEFAULT_INSTANCE_TYPE = "ml.m5.xlarge"
19+
DEFAULT_SCHEDULE_STATE = "ENABLED"
20+
UNDERSCORE = "_"
21+
RESOURCE_NOT_FOUND_EXCEPTION = "ResourceNotFoundException"
22+
RESOURCE_NOT_FOUND = "ResourceNotFound"
23+
EXECUTION_TIME_PIPELINE_PARAMETER = "scheduled_time"
24+
VALIDATION_EXCEPTION = "ValidationException"
25+
EVENT_BRIDGE_INVOCATION_TIME = "<aws.scheduler.scheduled-time>"
26+
SCHEDULED_TIME_PIPELINE_PARAMETER = Parameter(
27+
name=EXECUTION_TIME_PIPELINE_PARAMETER, parameter_type=ParameterTypeEnum.STRING
28+
)
29+
EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # 2023-01-01T07:00:00Z
30+
NO_FLEXIBLE_TIME_WINDOW = dict(Mode="OFF")
31+
PIPELINE_NAME_MAXIMUM_LENGTH = 80
32+
PIPELINE_CONTEXT_TYPE = "FeatureEngineeringPipeline"
33+
SPARK_JAR_FILES_PATH = "submit_jars_s3_paths"
34+
SPARK_PY_FILES_PATH = "submit_py_files_s3_paths"
35+
SPARK_FILES_PATH = "submit_files_s3_path"
36+
FEATURE_PROCESSOR_TAG_KEY = "sm-fs-fe:created-from"
37+
FEATURE_PROCESSOR_TAG_VALUE = "fp-to-pipeline"
38+
FEATURE_GROUP_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):feature-group/(.*?)$"
39+
SAGEMAKER_WHL_FILE_S3_PATH = "s3://ada-private-beta/sagemaker-2.151.1.dev0-py2.py3-none-any.whl"
40+
S3_DATA_DISTRIBUTION_TYPE = "FullyReplicated"
41+
PIPELINE_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-context-name"
42+
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-version-context-name"

0 commit comments

Comments
 (0)