Skip to content

change: Add PipelineVariable annotation to Amazon estimators #3373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions src/sagemaker/amazon/amazon_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import logging
import tempfile
from typing import Union
from typing import Union, Optional, Dict

from six.moves.urllib.parse import urlparse

Expand All @@ -30,6 +30,7 @@
from sagemaker.utils import sagemaker_timestamp
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.workflow import is_pipeline_variable

logger = logging.getLogger(__name__)

Expand All @@ -40,18 +41,20 @@ class AmazonAlgorithmEstimatorBase(EstimatorBase):
This class isn't intended to be instantiated directly.
"""

feature_dim = hp("feature_dim", validation.gt(0), data_type=int)
mini_batch_size = hp("mini_batch_size", validation.gt(0), data_type=int)
repo_name = None
repo_version = None
feature_dim: hp = hp("feature_dim", validation.gt(0), data_type=int)
mini_batch_size: hp = hp("mini_batch_size", validation.gt(0), data_type=int)
repo_name: Optional[str] = None
repo_version: Optional[str] = None

DEFAULT_MINI_BATCH_SIZE: Optional[int] = None

def __init__(
self,
role,
instance_count=None,
instance_type=None,
data_location=None,
enable_network_isolation=False,
role: str,
instance_count: Optional[Union[int, PipelineVariable]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
data_location: Optional[str] = None,
enable_network_isolation: Union[bool, PipelineVariable] = False,
**kwargs
):
"""Initialize an AmazonAlgorithmEstimatorBase.
Expand All @@ -62,16 +65,16 @@ def __init__(
endpoints use this role to access training data and model
artifacts. After the endpoint is created, the inference code
might use the IAM role, if it needs to access an AWS resource.
instance_count (int): Number of Amazon EC2 instances to use
instance_count (int or PipelineVariable): Number of Amazon EC2 instances to use
for training. Required.
instance_type (str): Type of EC2 instance to use for training,
instance_type (str or PipelineVariable): Type of EC2 instance to use for training,
for example, 'ml.c4.xlarge'. Required.
data_location (str or None): The s3 prefix to upload RecordSet
objects to, expressed as an S3 url. For example
"s3://example-bucket/some-key-prefix/". Objects will be saved in
a unique sub-directory of the specified location. If None, a
default data location will be used.
enable_network_isolation (bool): Specifies whether container will
enable_network_isolation (bool or PipelineVariable): Specifies whether container will
run in network isolation mode. Network isolation mode restricts
the container access to outside networks (such as the internet).
Also known as internet-free mode (default: ``False``).
Expand Down Expand Up @@ -113,8 +116,14 @@ def data_location(self):
return self._data_location

@data_location.setter
def data_location(self, data_location):
def data_location(self, data_location: str):
"""Placeholder docstring"""
if is_pipeline_variable(data_location):
raise TypeError(
"Invalid input: data_location should be a plain string "
"rather than a pipeline variable - ({}).".format(type(data_location))
)

if not data_location.startswith("s3://"):
raise ValueError(
'Expecting an S3 URL beginning with "s3://". Got "{}"'.format(data_location)
Expand Down Expand Up @@ -198,12 +207,12 @@ def _prepare_for_training(self, records, mini_batch_size=None, job_name=None):
@runnable_by_pipeline
def fit(
self,
records,
mini_batch_size=None,
wait=True,
logs=True,
job_name=None,
experiment_config=None,
records: "RecordSet",
mini_batch_size: Optional[int] = None,
wait: bool = True,
logs: bool = True,
job_name: Optional[str] = None,
experiment_config: Optional[Dict[str, str]] = None,
):
"""Fit this Estimator on serialized Record objects, stored in S3.
Expand Down Expand Up @@ -301,6 +310,20 @@ def record_set(self, train, labels=None, channel="train", encrypt=False):
channel=channel,
)

def _get_default_mini_batch_size(self, num_records: int):
"""Generate the default mini_batch_size"""
if is_pipeline_variable(self.instance_count):
logger.warning(
"mini_batch_size is not given in .fit() and instance_count is a "
"pipeline variable (%s) which is only interpreted in pipeline execution time. "
"Thus setting mini_batch_size to 1, since it can't be greater than "
"number of records per instance_count, otherwise the training job fails.",
type(self.instance_count),
)
return 1

return min(self.DEFAULT_MINI_BATCH_SIZE, max(1, int(num_records / self.instance_count)))


class RecordSet(object):
"""Placeholder docstring"""
Expand Down Expand Up @@ -461,7 +484,7 @@ def upload_numpy_to_s3_shards(
raise ex


def get_image_uri(region_name, repo_name, repo_version=1):
def get_image_uri(region_name, repo_name, repo_version="1"):
"""Deprecated method. Please use sagemaker.image_uris.retrieve().
Args:
Expand Down
116 changes: 58 additions & 58 deletions src/sagemaker/amazon/factorization_machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,83 +37,83 @@ class FactorizationMachines(AmazonAlgorithmEstimatorBase):
sparse datasets economically.
"""

repo_name = "factorization-machines"
repo_version = 1
repo_name: str = "factorization-machines"
repo_version: str = "1"

num_factors = hp("num_factors", gt(0), "An integer greater than zero", int)
predictor_type = hp(
num_factors: hp = hp("num_factors", gt(0), "An integer greater than zero", int)
predictor_type: hp = hp(
"predictor_type",
isin("binary_classifier", "regressor"),
'Value "binary_classifier" or "regressor"',
str,
)
epochs = hp("epochs", gt(0), "An integer greater than 0", int)
clip_gradient = hp("clip_gradient", (), "A float value", float)
eps = hp("eps", (), "A float value", float)
rescale_grad = hp("rescale_grad", (), "A float value", float)
bias_lr = hp("bias_lr", ge(0), "A non-negative float", float)
linear_lr = hp("linear_lr", ge(0), "A non-negative float", float)
factors_lr = hp("factors_lr", ge(0), "A non-negative float", float)
bias_wd = hp("bias_wd", ge(0), "A non-negative float", float)
linear_wd = hp("linear_wd", ge(0), "A non-negative float", float)
factors_wd = hp("factors_wd", ge(0), "A non-negative float", float)
bias_init_method = hp(
epochs: hp = hp("epochs", gt(0), "An integer greater than 0", int)
clip_gradient: hp = hp("clip_gradient", (), "A float value", float)
eps: hp = hp("eps", (), "A float value", float)
rescale_grad: hp = hp("rescale_grad", (), "A float value", float)
bias_lr: hp = hp("bias_lr", ge(0), "A non-negative float", float)
linear_lr: hp = hp("linear_lr", ge(0), "A non-negative float", float)
factors_lr: hp = hp("factors_lr", ge(0), "A non-negative float", float)
bias_wd: hp = hp("bias_wd", ge(0), "A non-negative float", float)
linear_wd: hp = hp("linear_wd", ge(0), "A non-negative float", float)
factors_wd: hp = hp("factors_wd", ge(0), "A non-negative float", float)
bias_init_method: hp = hp(
"bias_init_method",
isin("normal", "uniform", "constant"),
'Value "normal", "uniform" or "constant"',
str,
)
bias_init_scale = hp("bias_init_scale", ge(0), "A non-negative float", float)
bias_init_sigma = hp("bias_init_sigma", ge(0), "A non-negative float", float)
bias_init_value = hp("bias_init_value", (), "A float value", float)
linear_init_method = hp(
bias_init_scale: hp = hp("bias_init_scale", ge(0), "A non-negative float", float)
bias_init_sigma: hp = hp("bias_init_sigma", ge(0), "A non-negative float", float)
bias_init_value: hp = hp("bias_init_value", (), "A float value", float)
linear_init_method: hp = hp(
"linear_init_method",
isin("normal", "uniform", "constant"),
'Value "normal", "uniform" or "constant"',
str,
)
linear_init_scale = hp("linear_init_scale", ge(0), "A non-negative float", float)
linear_init_sigma = hp("linear_init_sigma", ge(0), "A non-negative float", float)
linear_init_value = hp("linear_init_value", (), "A float value", float)
factors_init_method = hp(
linear_init_scale: hp = hp("linear_init_scale", ge(0), "A non-negative float", float)
linear_init_sigma: hp = hp("linear_init_sigma", ge(0), "A non-negative float", float)
linear_init_value: hp = hp("linear_init_value", (), "A float value", float)
factors_init_method: hp = hp(
"factors_init_method",
isin("normal", "uniform", "constant"),
'Value "normal", "uniform" or "constant"',
str,
)
factors_init_scale = hp("factors_init_scale", ge(0), "A non-negative float", float)
factors_init_sigma = hp("factors_init_sigma", ge(0), "A non-negative float", float)
factors_init_value = hp("factors_init_value", (), "A float value", float)
factors_init_scale: hp = hp("factors_init_scale", ge(0), "A non-negative float", float)
factors_init_sigma: hp = hp("factors_init_sigma", ge(0), "A non-negative float", float)
factors_init_value: hp = hp("factors_init_value", (), "A float value", float)

def __init__(
self,
role,
instance_count=None,
instance_type=None,
num_factors=None,
predictor_type=None,
epochs=None,
clip_gradient=None,
eps=None,
rescale_grad=None,
bias_lr=None,
linear_lr=None,
factors_lr=None,
bias_wd=None,
linear_wd=None,
factors_wd=None,
bias_init_method=None,
bias_init_scale=None,
bias_init_sigma=None,
bias_init_value=None,
linear_init_method=None,
linear_init_scale=None,
linear_init_sigma=None,
linear_init_value=None,
factors_init_method=None,
factors_init_scale=None,
factors_init_sigma=None,
factors_init_value=None,
role: str,
instance_count: Optional[Union[int, PipelineVariable]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
num_factors: Optional[int] = None,
predictor_type: Optional[str] = None,
epochs: Optional[int] = None,
clip_gradient: Optional[float] = None,
eps: Optional[float] = None,
rescale_grad: Optional[float] = None,
bias_lr: Optional[float] = None,
linear_lr: Optional[float] = None,
factors_lr: Optional[float] = None,
bias_wd: Optional[float] = None,
linear_wd: Optional[float] = None,
factors_wd: Optional[float] = None,
bias_init_method: Optional[str] = None,
bias_init_scale: Optional[float] = None,
bias_init_sigma: Optional[float] = None,
bias_init_value: Optional[float] = None,
linear_init_method: Optional[str] = None,
linear_init_scale: Optional[float] = None,
linear_init_sigma: Optional[float] = None,
linear_init_value: Optional[float] = None,
factors_init_method: Optional[str] = None,
factors_init_scale: Optional[float] = None,
factors_init_sigma: Optional[float] = None,
factors_init_value: Optional[float] = None,
**kwargs
):
"""Factorization Machines is :class:`Estimator` for general-purpose supervised learning.
Expand Down Expand Up @@ -160,9 +160,9 @@ def __init__(
endpoints use this role to access training data and model
artifacts. After the endpoint is created, the inference code
might use the IAM role, if accessing AWS resource.
instance_count (int): Number of Amazon EC2 instances to use
instance_count (int or PipelineVariable): Number of Amazon EC2 instances to use
for training.
instance_type (str): Type of EC2 instance to use for training,
instance_type (str or PipelineVariable): Type of EC2 instance to use for training,
for example, 'ml.c4.xlarge'.
num_factors (int): Dimensionality of factorization.
predictor_type (str): Type of predictor 'binary_classifier' or
Expand All @@ -183,7 +183,7 @@ def __init__(
linear_wd (float): Non-negative weight decay for linear terms.
factors_wd (float): Non-negative weight decay for factorization
terms.
bias_init_method (string): Initialization method for the bias term:
bias_init_method (str): Initialization method for the bias term:
'normal', 'uniform' or 'constant'.
bias_init_scale (float): Non-negative range for initialization of
the bias term that takes effect when bias_init_method parameter
Expand All @@ -193,7 +193,7 @@ def __init__(
bias_init_method parameter is 'normal'.
bias_init_value (float): Initial value of the bias term that takes
effect when bias_init_method parameter is 'constant'.
linear_init_method (string): Initialization method for linear term:
linear_init_method (str): Initialization method for linear term:
'normal', 'uniform' or 'constant'.
linear_init_scale (float): Non-negative range for initialization of
linear terms that takes effect when linear_init_method parameter
Expand All @@ -203,7 +203,7 @@ def __init__(
linear_init_method parameter is 'normal'.
linear_init_value (float): Initial value of linear terms that takes
effect when linear_init_method parameter is 'constant'.
factors_init_method (string): Initialization method for
factors_init_method (str): Initialization method for
factorization term: 'normal', 'uniform' or 'constant'.
factors_init_scale (float): Non-negative range for initialization of
factorization terms that takes effect when factors_init_method
Expand Down
52 changes: 26 additions & 26 deletions src/sagemaker/amazon/ipinsights.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,45 @@ class IPInsights(AmazonAlgorithmEstimatorBase):
as user IDs or account numbers.
"""

repo_name = "ipinsights"
repo_version = 1
MINI_BATCH_SIZE = 10000
repo_name: str = "ipinsights"
repo_version: str = "1"
MINI_BATCH_SIZE: int = 10000

num_entity_vectors = hp(
num_entity_vectors: hp = hp(
"num_entity_vectors", (ge(1), le(250000000)), "An integer in [1, 250000000]", int
)
vector_dim = hp("vector_dim", (ge(4), le(4096)), "An integer in [4, 4096]", int)
vector_dim: hp = hp("vector_dim", (ge(4), le(4096)), "An integer in [4, 4096]", int)

batch_metrics_publish_interval = hp(
batch_metrics_publish_interval: hp = hp(
"batch_metrics_publish_interval", (ge(1)), "An integer greater than 0", int
)
epochs = hp("epochs", (ge(1)), "An integer greater than 0", int)
learning_rate = hp("learning_rate", (ge(1e-6), le(10.0)), "A float in [1e-6, 10.0]", float)
num_ip_encoder_layers = hp(
epochs: hp = hp("epochs", (ge(1)), "An integer greater than 0", int)
learning_rate: hp = hp("learning_rate", (ge(1e-6), le(10.0)), "A float in [1e-6, 10.0]", float)
num_ip_encoder_layers: hp = hp(
"num_ip_encoder_layers", (ge(0), le(100)), "An integer in [0, 100]", int
)
random_negative_sampling_rate = hp(
random_negative_sampling_rate: hp = hp(
"random_negative_sampling_rate", (ge(0), le(500)), "An integer in [0, 500]", int
)
shuffled_negative_sampling_rate = hp(
shuffled_negative_sampling_rate: hp = hp(
"shuffled_negative_sampling_rate", (ge(0), le(500)), "An integer in [0, 500]", int
)
weight_decay = hp("weight_decay", (ge(0.0), le(10.0)), "A float in [0.0, 10.0]", float)
weight_decay: hp = hp("weight_decay", (ge(0.0), le(10.0)), "A float in [0.0, 10.0]", float)

def __init__(
self,
role,
instance_count=None,
instance_type=None,
num_entity_vectors=None,
vector_dim=None,
batch_metrics_publish_interval=None,
epochs=None,
learning_rate=None,
num_ip_encoder_layers=None,
random_negative_sampling_rate=None,
shuffled_negative_sampling_rate=None,
weight_decay=None,
role: str,
instance_count: Optional[Union[int, PipelineVariable]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
num_entity_vectors: Optional[int] = None,
vector_dim: Optional[int] = None,
batch_metrics_publish_interval: Optional[int] = None,
epochs: Optional[int] = None,
learning_rate: Optional[float] = None,
num_ip_encoder_layers: Optional[int] = None,
random_negative_sampling_rate: Optional[int] = None,
shuffled_negative_sampling_rate: Optional[int] = None,
weight_decay: Optional[float] = None,
**kwargs
):
"""This estimator is for IP Insights.
Expand Down Expand Up @@ -106,9 +106,9 @@ def __init__(
endpoints use this role to access training data and model
artifacts. After the endpoint is created, the inference code
might use the IAM role, if accessing AWS resource.
instance_count (int): Number of Amazon EC2 instances to use
instance_count (int or PipelineVariable): Number of Amazon EC2 instances to use
for training.
instance_type (str): Type of EC2 instance to use for training,
instance_type (str or PipelineVariable): Type of EC2 instance to use for training,
for example, 'ml.m5.xlarge'.
num_entity_vectors (int): Required. The number of embeddings to
train for entities accessing online resources. We recommend 2x
Expand Down
Loading