Skip to content

Commit 2a6e479

Browse files
psnileshNilesh PS
and
Nilesh PS
authored
feat: add throughput management support for feature group (#4359)
* feat: add throughput management support for feature group * documentation: add doc for feature group throughput config --------- Co-authored-by: Nilesh PS <[email protected]>
1 parent dbc936e commit 2a6e479

File tree

7 files changed

+313
-13
lines changed

7 files changed

+313
-13
lines changed

doc/api/prep_data/feature_store.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ Inputs
7575
:members:
7676
:show-inheritance:
7777

78+
.. autoclass:: sagemaker.feature_store.inputs.ThroughputConfig
79+
:members:
80+
:show-inheritance:
81+
82+
.. autoclass:: sagemaker.feature_store.inputs.ThroughputConfigUpdate
83+
:members:
84+
:show-inheritance:
85+
7886
.. autoclass:: sagemaker.feature_store.inputs.OnlineStoreConfig
7987
:members:
8088
:show-inheritance:
@@ -99,6 +107,10 @@ Inputs
99107
:members:
100108
:show-inheritance:
101109

110+
.. autoclass:: sagemaker.feature_store.inputs.ThroughputModeEnum
111+
:members:
112+
:show-inheritance:
113+
102114
.. autoclass:: sagemaker.feature_store.inputs.ResourceEnum
103115
:members:
104116
:show-inheritance:

src/sagemaker/feature_store/feature_group.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
TtlDuration,
6565
OnlineStoreConfigUpdate,
6666
OnlineStoreStorageTypeEnum,
67+
ThroughputConfig,
68+
ThroughputConfigUpdate,
6769
)
6870
from sagemaker.utils import resolve_value_from_config, format_tags, Tags
6971

@@ -541,6 +543,7 @@ def create(
541543
tags: Optional[Tags] = None,
542544
table_format: TableFormatEnum = None,
543545
online_store_storage_type: OnlineStoreStorageTypeEnum = None,
546+
throughput_config: ThroughputConfig = None,
544547
) -> Dict[str, Any]:
545548
"""Create a SageMaker FeatureStore FeatureGroup.
546549
@@ -570,6 +573,8 @@ def create(
570573
table_format (TableFormatEnum): format of the offline store table (default: None).
571574
online_store_storage_type (OnlineStoreStorageTypeEnum): storage type for the
572575
online store (default: None).
576+
throughput_config (ThroughputConfig): throughput configuration of the
577+
feature group (default: None).
573578
574579
Returns:
575580
Response dict from service.
@@ -618,6 +623,9 @@ def create(
618623
)
619624
create_feature_store_args.update({"online_store_config": online_store_config.to_dict()})
620625

626+
if throughput_config:
627+
create_feature_store_args.update({"throughput_config": throughput_config.to_dict()})
628+
621629
# offline store configuration
622630
if s3_uri:
623631
s3_storage_config = S3StorageConfig(s3_uri=s3_uri)
@@ -656,17 +664,17 @@ def update(
656664
self,
657665
feature_additions: Sequence[FeatureDefinition] = None,
658666
online_store_config: OnlineStoreConfigUpdate = None,
667+
throughput_config: ThroughputConfigUpdate = None,
659668
) -> Dict[str, Any]:
660669
"""Update a FeatureGroup and add new features from the given feature definitions.
661670
662671
Args:
663672
feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated.
664673
online_store_config (OnlineStoreConfigUpdate): online store config to be updated.
665-
674+
throughput_config (ThroughputConfigUpdate): target throughput configuration
666675
Returns:
667676
Response dict from service.
668677
"""
669-
670678
if feature_additions is None:
671679
feature_additions_parameter = None
672680
else:
@@ -679,10 +687,15 @@ def update(
679687
else:
680688
online_store_config_parameter = online_store_config.to_dict()
681689

690+
throughput_config_parameter = (
691+
None if throughput_config is None else throughput_config.to_dict()
692+
)
693+
682694
return self.sagemaker_session.update_feature_group(
683695
feature_group_name=self.name,
684696
feature_additions=feature_additions_parameter,
685697
online_store_config=online_store_config_parameter,
698+
throughput_config=throughput_config_parameter,
686699
)
687700

688701
def update_feature_metadata(

src/sagemaker/feature_store/inputs.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,3 +453,79 @@ class ExpirationTimeResponseEnum(Enum):
453453

454454
DISABLED = "Disabled"
455455
ENABLED = "Enabled"
456+
457+
458+
class ThroughputModeEnum(Enum):
459+
"""Enum of throughput modes supported by feature group.
460+
461+
Throughput mode of feature group can be ON_DEMAND or PROVISIONED.
462+
"""
463+
464+
ON_DEMAND = "OnDemand"
465+
PROVISIONED = "Provisioned"
466+
467+
468+
@attr.s
469+
class ThroughputConfig(Config):
470+
"""Throughput configuration of the feature group.
471+
472+
Throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for
473+
read and write capacity units. ON_DEMAND works best for less predictable traffic,
474+
while PROVISIONED works best for consistent and predictable traffic.
475+
476+
Attributes:
477+
mode (ThroughputModeEnum): Throughput mode
478+
provisioned_read_capacity_units (int): For provisioned feature groups, this indicates
479+
the read throughput you are billed for and can consume without throttling.
480+
provisioned_write_capacity_units (int): For provisioned feature groups, this indicates
481+
the write throughput you are billed for and can consume without throttling.
482+
"""
483+
484+
mode: ThroughputModeEnum = attr.ib(default=None)
485+
provisioned_read_capacity_units: int = attr.ib(default=None)
486+
provisioned_write_capacity_units: int = attr.ib(default=None)
487+
488+
def to_dict(self) -> Dict[str, Any]:
489+
"""Construct a dictionary based on the attributes provided.
490+
491+
Returns:
492+
dict represents the attributes.
493+
"""
494+
return Config.construct_dict(
495+
ThroughputMode=self.mode.value if self.mode else None,
496+
ProvisionedReadCapacityUnits=self.provisioned_read_capacity_units,
497+
ProvisionedWriteCapacityUnits=self.provisioned_write_capacity_units,
498+
)
499+
500+
501+
@attr.s
502+
class ThroughputConfigUpdate(Config):
503+
"""Target throughput configuration for the feature group.
504+
505+
Target throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for
506+
read and write capacity units. ON_DEMAND works best for less predictable traffic,
507+
while PROVISIONED works best for consistent and predictable traffic.
508+
509+
Attributes:
510+
mode (ThroughputModeEnum): Target throughput mode
511+
provisioned_read_capacity_units (int): For provisioned feature groups, this indicates
512+
the read throughput you are billed for and can consume without throttling.
513+
provisioned_write_capacity_units (int): For provisioned feature groups, this indicates
514+
the write throughput you are billed for and can consume without throttling.
515+
"""
516+
517+
mode: ThroughputModeEnum = attr.ib(default=None)
518+
provisioned_read_capacity_units: int = attr.ib(default=None)
519+
provisioned_write_capacity_units: int = attr.ib(default=None)
520+
521+
def to_dict(self) -> Dict[str, Any]:
522+
"""Construct a dictionary based on the attributes provided.
523+
524+
Returns:
525+
dict represents the attributes.
526+
"""
527+
return Config.construct_dict(
528+
ThroughputMode=self.mode.value if self.mode else None,
529+
ProvisionedReadCapacityUnits=self.provisioned_read_capacity_units,
530+
ProvisionedWriteCapacityUnits=self.provisioned_write_capacity_units,
531+
)

src/sagemaker/session.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5679,6 +5679,7 @@ def create_feature_group(
56795679
role_arn: str = None,
56805680
online_store_config: Dict[str, str] = None,
56815681
offline_store_config: Dict[str, str] = None,
5682+
throughput_config: Dict[str, Any] = None,
56825683
description: str = None,
56835684
tags: Optional[Tags] = None,
56845685
) -> Dict[str, Any]:
@@ -5694,6 +5695,8 @@ def create_feature_group(
56945695
feature online store.
56955696
offline_store_config (Dict[str, str]): dict contains configuration of the
56965697
feature offline store.
5698+
throughput_config (Dict[str, str]): dict contains throughput configuration
5699+
for the feature group.
56975700
description (str): description of the FeatureGroup.
56985701
tags (Optional[Tags]): tags for labeling a FeatureGroup.
56995702
@@ -5729,6 +5732,7 @@ def create_feature_group(
57295732
kwargs,
57305733
OnlineStoreConfig=inferred_online_store_from_config,
57315734
OfflineStoreConfig=inferred_offline_store_from_config,
5735+
ThroughputConfig=throughput_config,
57325736
Description=description,
57335737
Tags=tags,
57345738
)
@@ -5757,28 +5761,32 @@ def update_feature_group(
57575761
feature_group_name: str,
57585762
feature_additions: Sequence[Dict[str, str]] = None,
57595763
online_store_config: Dict[str, any] = None,
5764+
throughput_config: Dict[str, Any] = None,
57605765
) -> Dict[str, Any]:
57615766
"""Update a FeatureGroup
57625767
5763-
either adding new features from the given feature definitions
5764-
or updating online store config
5768+
Supports modifications like adding new features from the given feature definitions,
5769+
updating online store and throughput configurations.
57655770
57665771
Args:
57675772
feature_group_name (str): name of the FeatureGroup to update.
57685773
feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated.
5774+
online_store_config (Dict[str, Any]): updates to online store config
5775+
throughput_config (Dict[str, Any]): target throughput configuration of the feature group
57695776
Returns:
57705777
Response dict from service.
57715778
"""
5779+
update_req = {"FeatureGroupName": feature_group_name}
5780+
if online_store_config is not None:
5781+
update_req["OnlineStoreConfig"] = online_store_config
57725782

5773-
if feature_additions is None:
5774-
return self.sagemaker_client.update_feature_group(
5775-
FeatureGroupName=feature_group_name,
5776-
OnlineStoreConfig=online_store_config,
5777-
)
5783+
if throughput_config is not None:
5784+
update_req["ThroughputConfig"] = throughput_config
57785785

5779-
return self.sagemaker_client.update_feature_group(
5780-
FeatureGroupName=feature_group_name, FeatureAdditions=feature_additions
5781-
)
5786+
if feature_additions is not None:
5787+
update_req["FeatureAdditions"] = feature_additions
5788+
5789+
return self.sagemaker_client.update_feature_group(**update_req)
57825790

57835791
def list_feature_groups(
57845792
self,

tests/integ/test_feature_store.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
TtlDuration,
4444
OnlineStoreConfigUpdate,
4545
OnlineStoreStorageTypeEnum,
46+
ThroughputConfig,
47+
ThroughputModeEnum,
48+
ThroughputConfigUpdate,
4649
)
4750
from sagemaker.feature_store.dataset_builder import (
4851
JoinTypeEnum,
@@ -410,6 +413,78 @@ def test_create_feature_group_standard_storage_type(
410413
assert storage_type == "Standard"
411414

412415

416+
def test_throughput_create_as_provisioned_and_update_to_ondemand(
417+
feature_store_session,
418+
role,
419+
feature_group_name,
420+
pandas_data_frame,
421+
):
422+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
423+
feature_group.load_feature_definitions(data_frame=pandas_data_frame)
424+
with cleanup_feature_group(feature_group):
425+
feature_group.create(
426+
s3_uri=False,
427+
record_identifier_name="feature1",
428+
event_time_feature_name="feature3",
429+
role_arn=role,
430+
enable_online_store=True,
431+
throughput_config=ThroughputConfig(ThroughputModeEnum.PROVISIONED, 4000, 4000),
432+
)
433+
_wait_for_feature_group_create(feature_group)
434+
435+
tp_config = feature_group.describe().get("ThroughputConfig")
436+
mode = tp_config.get("ThroughputMode")
437+
rcu = tp_config.get("ProvisionedReadCapacityUnits")
438+
wcu = tp_config.get("ProvisionedWriteCapacityUnits")
439+
assert mode == ThroughputModeEnum.PROVISIONED.value
440+
assert rcu == 4000
441+
assert wcu == 4000
442+
443+
feature_group.update(throughput_config=ThroughputConfigUpdate(ThroughputModeEnum.ON_DEMAND))
444+
_wait_for_feature_group_update(feature_group)
445+
446+
tp_config = feature_group.describe().get("ThroughputConfig")
447+
mode = tp_config.get("ThroughputMode")
448+
assert mode == ThroughputModeEnum.ON_DEMAND.value
449+
450+
451+
def test_throughput_create_as_ondemand_and_update_to_provisioned(
452+
feature_store_session,
453+
role,
454+
feature_group_name,
455+
pandas_data_frame,
456+
):
457+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
458+
feature_group.load_feature_definitions(data_frame=pandas_data_frame)
459+
with cleanup_feature_group(feature_group):
460+
feature_group.create(
461+
s3_uri=False,
462+
record_identifier_name="feature1",
463+
event_time_feature_name="feature3",
464+
role_arn=role,
465+
enable_online_store=True,
466+
throughput_config=ThroughputConfig(ThroughputModeEnum.ON_DEMAND),
467+
)
468+
_wait_for_feature_group_create(feature_group)
469+
470+
tp_config = feature_group.describe().get("ThroughputConfig")
471+
mode = tp_config.get("ThroughputMode")
472+
assert mode == ThroughputModeEnum.ON_DEMAND.value
473+
474+
feature_group.update(
475+
throughput_config=ThroughputConfigUpdate(ThroughputModeEnum.PROVISIONED, 100, 200)
476+
)
477+
_wait_for_feature_group_update(feature_group)
478+
479+
tp_config = feature_group.describe().get("ThroughputConfig")
480+
mode = tp_config.get("ThroughputMode")
481+
rcu = tp_config.get("ProvisionedReadCapacityUnits")
482+
wcu = tp_config.get("ProvisionedWriteCapacityUnits")
483+
assert mode == ThroughputModeEnum.PROVISIONED.value
484+
assert rcu == 100
485+
assert wcu == 200
486+
487+
413488
def test_ttl_duration(
414489
feature_store_session,
415490
role,

0 commit comments

Comments
 (0)