Skip to content

Commit 37ad9e4

Browse files
authored
Merge branch 'master' into feat/jumpstart-instance-types
2 parents c28134a + 849ed29 commit 37ad9e4

16 files changed

+841
-76
lines changed

CHANGELOG.md

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
11
# Changelog
22

3+
## v2.135.1.post0 (2023-03-02)
4+
5+
### Documentation Changes
6+
7+
* update feature store dataset builder docs
8+
9+
## v2.135.1 (2023-03-01)
10+
11+
### Bug Fixes and Other Changes
12+
13+
* Revert back to stable apache-airflow-providers-amazon from 7.2.1 to 4.0.0.
14+
* Typo in graviton algos
15+
* build(deps): bump apache-airflow-providers-amazon from 4.0.0 to 7.2.1 in /requirements/extras
16+
* Support cloning private repo using ssh key
17+
* Create a default SageMaker Session inside FeatureGroup class
18+
19+
### Documentation Changes
20+
21+
* fix typo in README
22+
323
## v2.135.0 (2023-02-23)
424

525
### Features

README.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ To run the unit tests with tox, run:
126126

127127
tox tests/unit
128128

129-
**Integrations tests**
129+
**Integration tests**
130130

131131
To run the integration tests, the following prerequisites must be met
132132

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.135.1.dev0
1+
2.135.2.dev0

doc/amazon_sagemaker_featurestore.rst

+97
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,102 @@ location for the data set to be saved there.
380380
From here you can train a model using this data set and then perform
381381
inference.
382382

383+
.. rubric:: Using the Offline Store SDK: Getting Started
384+
:name: bCe9CA61b79
385+
386+
The Feature Store Offline SDK provides the ability to quickly and easily
387+
build ML-ready datasets for use by ML model training or pre-processing.
388+
The SDK makes it easy to build datasets from SQL join, point-in-time accurate
389+
join, and event range time frames, all without the need to write any SQL code.
390+
This functionality is accessed via the DatasetBuilder class which is the
391+
primary entry point for the SDK functionality.
392+
393+
.. code:: python
394+
395+
from sagemaker.feature_store.feature_store import FeatureStore
396+
397+
feature_store = FeatureStore(sagemaker_session=feature_store_session)
398+
399+
.. code:: python
400+
401+
base_feature_group = identity_feature_group
402+
target_feature_group = transaction_feature_group
403+
404+
You can create dataset using `create_dataset` of feature store API.
405+
`base` can either be a feature group or a pandas dataframe.
406+
407+
.. code:: python
408+
409+
result_df, query = feature_store.create_dataset(
410+
base=base_feature_group,
411+
output_path=f"s3://{s3_bucket_name}"
412+
).to_dataframe()
413+
414+
If you want to join other feature group, you can specify extra
415+
feature group using `with_feature_group` method.
416+
417+
.. code:: python
418+
419+
dataset_builder = feature_store.create_dataset(
420+
base=base_feature_group,
421+
output_path=f"s3://{s3_bucket_name}"
422+
).with_feature_group(target_feature_group, record_identifier_name)
423+
424+
result_df, query = dataset_builder.to_dataframe()
425+
426+
.. rubric:: Using the Offline Store SDK: Configuring the DatasetBuilder
427+
:name: bCe9CA61b80
428+
429+
How the DatasetBuilder produces the resulting dataframe can be configured
430+
in various ways.
431+
432+
By default the Python SDK will exclude all deleted and duplicate records.
433+
However if you need either of them in returned dataset, you can call
434+
`include_duplicated_records` or `include_deleted_records` when creating
435+
dataset builder.
436+
437+
.. code:: python
438+
439+
dataset_builder.include_duplicated_records()
440+
dataset_builder.include_deleted_records()
441+
442+
The DatasetBuilder provides `with_number_of_records_from_query_results` and
443+
`with_number_of_recent_records_by_record_identifier` methods to limit the
444+
number of records returned for the offline snapshot.
445+
446+
`with_number_of_records_from_query_results` will limit the number of records
447+
in the output. For example, when N = 100, only 100 records are going to be
448+
returned in either the csv or dataframe.
449+
450+
.. code:: python
451+
452+
dataset_builder.with_number_of_records_from_query_results(number_of_records=N)
453+
454+
On the other hand, `with_number_of_recent_records_by_record_identifier` is
455+
used to deal with records which have the same identifier. They are going
456+
to be sorted according to `event_time` and return at most N recent records
457+
in the output.
458+
459+
.. code:: python
460+
461+
dataset_builder.with_number_of_recent_records_by_record_identifier(number_of_recent_records=N)
462+
463+
Since these functions return the dataset builder, these functions can
464+
be chained.
465+
466+
.. code:: python
467+
468+
dataset_builder
469+
.with_number_of_records_from_query_results(number_of_records=N)
470+
.include_duplicated_records()
471+
.with_number_of_recent_records_by_record_identifier(number_of_recent_records=N)
472+
.to_dataframe()
473+
474+
There are additional configurations that can be made for various use cases,
475+
such as time travel and point-in-time join. These are outlined in the
476+
Feature Store `DatasetBuilder API Reference
477+
<https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#dataset-builder>`__.
478+
383479
.. rubric:: Delete a feature group
384480
:name: bCe9CA61b78
385481

@@ -395,3 +491,4 @@ The following code example is from the fraud detection example.
395491
396492
identity_feature_group.delete()
397493
transaction_feature_group.delete()
494+

src/sagemaker/feature_store/dataset_builder.py

+27-18
Original file line numberDiff line numberDiff line change
@@ -171,24 +171,33 @@ class DatasetBuilder:
171171
_event_time_identifier_feature_name (str): A string representing the event time identifier
172172
feature if base is a DataFrame (default: None).
173173
_included_feature_names (List[str]): A list of strings representing features to be
174-
included in the output (default: None).
175-
_kms_key_id (str): An KMS key id. If set, will be used to encrypt the result file
174+
included in the output. If not set, all features will be included in the output.
176175
(default: None).
177-
_point_in_time_accurate_join (bool): A boolean representing whether using point in time join
178-
or not (default: False).
179-
_include_duplicated_records (bool): A boolean representing whether including duplicated
180-
records or not (default: False).
181-
_include_deleted_records (bool): A boolean representing whether including deleted records or
182-
not (default: False).
183-
_number_of_recent_records (int): An int that how many records will be returned for each
184-
record identifier (default: 1).
185-
_number_of_records (int): An int that how many records will be returned (default: None).
186-
_write_time_ending_timestamp (datetime.datetime): A datetime that all records' write time in
187-
dataset will be before it (default: None).
188-
_event_time_starting_timestamp (datetime.datetime): A datetime that all records' event time
189-
in dataset will be after it (default: None).
190-
_event_time_ending_timestamp (datetime.datetime): A datetime that all records' event time in
191-
dataset will be before it (default: None).
176+
_kms_key_id (str): A KMS key id. If set, will be used to encrypt the result file
177+
(default: None).
178+
_point_in_time_accurate_join (bool): A boolean representing if point-in-time join
179+
is applied to the resulting dataframe when calling "to_dataframe".
180+
When set to True, users can retrieve data using “row-level time travel”
181+
according to the event times provided to the DatasetBuilder. This requires that the
182+
entity dataframe with event times is submitted as the base in the constructor
183+
(default: False).
184+
_include_duplicated_records (bool): A boolean representing whether the resulting dataframe
185+
when calling "to_dataframe" should include duplicated records (default: False).
186+
_include_deleted_records (bool): A boolean representing whether the resulting
187+
dataframe when calling "to_dataframe" should include deleted records (default: False).
188+
_number_of_recent_records (int): An integer representing how many records will be
189+
returned for each record identifier (default: 1).
190+
_number_of_records (int): An integer representing the number of records that should be
191+
returned in the resulting dataframe when calling "to_dataframe" (default: None).
192+
_write_time_ending_timestamp (datetime.datetime): A datetime that represents the latest
193+
write time for a record to be included in the resulting dataset. Records with a
194+
newer write time will be omitted from the resulting dataset. (default: None).
195+
_event_time_starting_timestamp (datetime.datetime): A datetime that represents the earliest
196+
event time for a record to be included in the resulting dataset. Records
197+
with an older event time will be omitted from the resulting dataset. (default: None).
198+
_event_time_ending_timestamp (datetime.datetime): A datetime that represents the latest
199+
event time for a record to be included in the resulting dataset. Records
200+
with a newer event time will be omitted from the resulting dataset. (default: None).
192201
_feature_groups_to_be_merged (List[FeatureGroupToBeMerged]): A list of
193202
FeatureGroupToBeMerged which will be joined to base (default: []).
194203
_event_time_identifier_feature_type (FeatureTypeEnum): A FeatureTypeEnum representing the
@@ -247,7 +256,7 @@ def with_feature_group(
247256
return self
248257

249258
def point_in_time_accurate_join(self):
250-
"""Set join type as point in time accurate join.
259+
"""Enable point-in-time accurate join.
251260
252261
Returns:
253262
This DatasetBuilder object.

src/sagemaker/fw_utils.py

+66-20
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,17 @@
148148
]
149149

150150

151-
TORCH_DISTRIBUTED_SUPPORTED_FRAMEWORK_VERSIONS = ["1.11", "1.11.0"]
152-
151+
TORCH_DISTRIBUTED_GPU_SUPPORTED_FRAMEWORK_VERSIONS = ["1.13.1"]
153152

154153
TRAINIUM_SUPPORTED_DISTRIBUTION_STRATEGIES = ["torch_distributed"]
155-
154+
TRAINIUM_SUPPORTED_TORCH_DISTRIBUTED_FRAMEWORK_VERSIONS = [
155+
"1.11",
156+
"1.11.0",
157+
"1.12",
158+
"1.12.0",
159+
"1.12.1",
160+
"1.13.1",
161+
]
156162

157163
SMDISTRIBUTED_SUPPORTED_STRATEGIES = ["dataparallel", "modelparallel"]
158164

@@ -1055,9 +1061,8 @@ def validate_torch_distributed_distribution(
10551061
Raises:
10561062
ValueError: if
10571063
`py_version` is not python3 or
1058-
`framework_version` is not in TORCH_DISTRIBUTED_SUPPORTED_FRAMEWORK_VERSIONS
1064+
`framework_version` is not compatible with instance types
10591065
"""
1060-
10611066
torch_distributed_enabled = False
10621067
if "torch_distributed" in distribution:
10631068
torch_distributed_enabled = distribution.get("torch_distributed").get("enabled", False)
@@ -1066,30 +1071,36 @@ def validate_torch_distributed_distribution(
10661071
return
10671072

10681073
err_msg = ""
1074+
10691075
if not image_uri:
10701076
# ignore framework_version and py_version if image_uri is set
10711077
# in case image_uri is not set, then both are mandatory
1072-
if framework_version not in TORCH_DISTRIBUTED_SUPPORTED_FRAMEWORK_VERSIONS:
1073-
err_msg += (
1074-
f"Provided framework_version {framework_version} is not supported by"
1075-
" torch_distributed.\n"
1076-
"Please specify one of the supported framework versions:"
1077-
f" {TORCH_DISTRIBUTED_SUPPORTED_FRAMEWORK_VERSIONS} \n"
1078-
)
10791078
if "py3" not in py_version:
10801079
err_msg += (
10811080
f"Provided py_version {py_version} is not supported by torch_distributed.\n"
1082-
"Please specify py_version>=py3"
1081+
"Please specify py_version>=py3\n"
10831082
)
10841083

1085-
# Check instance compatibility
1086-
match = re.match(r"^ml[\._]([a-z\d]+)\.?\w*$", instance_type)
1087-
if match:
1088-
if not match[1].startswith("trn"):
1084+
# Check instance and framework_version compatibility
1085+
if _is_gpu_instance(instance_type):
1086+
if framework_version not in TORCH_DISTRIBUTED_GPU_SUPPORTED_FRAMEWORK_VERSIONS:
1087+
err_msg += (
1088+
f"Provided framework_version {framework_version} is not supported by"
1089+
f" torch_distributed for instance {instance_type}.\n"
1090+
"Please specify one of the supported framework versions:"
1091+
f"{TORCH_DISTRIBUTED_GPU_SUPPORTED_FRAMEWORK_VERSIONS} \n"
1092+
)
1093+
elif _is_trainium_instance(instance_type):
1094+
if framework_version not in TRAINIUM_SUPPORTED_TORCH_DISTRIBUTED_FRAMEWORK_VERSIONS:
1095+
err_msg += (
1096+
f"Provided framework_version {framework_version} is not supported by"
1097+
f" torch_distributed for instance {instance_type}.\n"
1098+
"Please specify one of the supported framework versions:"
1099+
f"{TRAINIUM_SUPPORTED_TORCH_DISTRIBUTED_FRAMEWORK_VERSIONS} \n"
1100+
)
1101+
else:
10891102
err_msg += (
1090-
"torch_distributed is currently supported only for trainium instances.\n"
1091-
" Please refer https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#distributed-pytorch-training \n" # noqa E501 # pylint: disable=c0301
1092-
"for information regarding distributed training on non-trainium instances"
1103+
"Currently torch_distributed is supported only for GPU and Trainium instances.\n"
10931104
)
10941105

10951106
# Check entry point type
@@ -1103,6 +1114,41 @@ def validate_torch_distributed_distribution(
11031114
raise ValueError(err_msg)
11041115

11051116

1117+
def _is_gpu_instance(instance_type):
1118+
"""Returns bool indicating whether instance_type supports GPU
1119+
1120+
Args:
1121+
instance_type (str): Name of the instance_type to check against.
1122+
1123+
Returns:
1124+
bool: Whether or not the instance_type supports GPU
1125+
"""
1126+
if isinstance(instance_type, str):
1127+
match = re.match(r"^ml[\._]([a-z\d]+)\.?\w*$", instance_type)
1128+
if match:
1129+
if match[1].startswith("p") or match[1].startswith("g"):
1130+
return True
1131+
if instance_type == "local_gpu":
1132+
return True
1133+
return False
1134+
1135+
1136+
def _is_trainium_instance(instance_type):
1137+
"""Returns bool indicating whether instance_type is a Trainium instance
1138+
1139+
Args:
1140+
instance_type (str): Name of the instance_type to check against.
1141+
1142+
Returns:
1143+
bool: Whether or not the instance_type is a Trainium instance
1144+
"""
1145+
if isinstance(instance_type, str):
1146+
match = re.match(r"^ml[\._]([a-z\d]+)\.?\w*$", instance_type)
1147+
if match and match[1].startswith("trn"):
1148+
return True
1149+
return False
1150+
1151+
11061152
def python_deprecation_warning(framework, latest_supported_version):
11071153
"""Placeholder docstring"""
11081154
return PYTHON_2_DEPRECATION_WARNING.format(

src/sagemaker/git_utils.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def _clone_command_for_github_like(git_config, dest_dir):
174174
CalledProcessError: If failed to clone git repo.
175175
"""
176176
is_https = git_config["repo"].startswith("https://")
177-
is_ssh = git_config["repo"].startswith("git@")
177+
is_ssh = git_config["repo"].startswith("git@") or git_config["repo"].startswith("ssh://")
178178
if not is_https and not is_ssh:
179179
raise ValueError("Invalid Git url provided.")
180180
if is_ssh:
@@ -277,12 +277,16 @@ def _run_clone_command(repo_url, dest_dir):
277277
if repo_url.startswith("https://"):
278278
my_env["GIT_TERMINAL_PROMPT"] = "0"
279279
subprocess.check_call(["git", "clone", repo_url, dest_dir], env=my_env)
280-
elif repo_url.startswith("git@"):
281-
with tempfile.NamedTemporaryFile() as sshnoprompt:
282-
with open(sshnoprompt.name, "w") as write_pipe:
283-
write_pipe.write("ssh -oBatchMode=yes $@")
284-
os.chmod(sshnoprompt.name, 0o511)
285-
my_env["GIT_SSH"] = sshnoprompt.name
280+
elif repo_url.startswith("git@") or repo_url.startswith("ssh://"):
281+
try:
282+
with tempfile.NamedTemporaryFile() as sshnoprompt:
283+
with open(sshnoprompt.name, "w") as write_pipe:
284+
write_pipe.write("ssh -oBatchMode=yes $@")
285+
os.chmod(sshnoprompt.name, 0o511)
286+
my_env["GIT_SSH"] = sshnoprompt.name
287+
subprocess.check_call(["git", "clone", repo_url, dest_dir], env=my_env)
288+
except subprocess.CalledProcessError:
289+
del my_env["GIT_SSH"]
286290
subprocess.check_call(["git", "clone", repo_url, dest_dir], env=my_env)
287291

288292

0 commit comments

Comments
 (0)