Skip to content

Commit 581316f

Browse files
authored
Merge branch 'master' into pt17_18
2 parents b79023d + e4b851d commit 581316f

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

src/sagemaker/feature_store/feature_group.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,15 @@ class IngestionManagerPandas:
163163
max_workers (int): number of threads to create.
164164
max_processes (int): number of processes to create. Each process spawns
165165
``max_workers`` threads.
166+
profile_name (str): the profile credential should be used for ``PutRecord``
167+
(default: None).
166168
"""
167169

168170
feature_group_name: str = attr.ib()
169171
sagemaker_fs_runtime_client_config: Config = attr.ib()
170172
max_workers: int = attr.ib(default=1)
171173
max_processes: int = attr.ib(default=1)
174+
profile_name: str = attr.ib(default=None)
172175
_async_result: AsyncResult = attr.ib(default=None)
173176
_processing_pool: ProcessingPool = attr.ib(default=None)
174177
_failed_indices: List[int] = attr.ib(factory=list)
@@ -180,6 +183,7 @@ def _ingest_single_batch(
180183
client_config: Config,
181184
start_index: int,
182185
end_index: int,
186+
profile_name: str = None,
183187
) -> List[int]:
184188
"""Ingest a single batch of DataFrame rows into FeatureStore.
185189
@@ -190,6 +194,8 @@ def _ingest_single_batch(
190194
client to perform boto calls.
191195
start_index (int): starting position to ingest in this batch.
192196
end_index (int): ending position to ingest in this batch.
197+
profile_name (str): the profile credential should be used for ``PutRecord``
198+
(default: None).
193199
194200
Returns:
195201
List of row indices that failed to be ingested.
@@ -198,7 +204,7 @@ def _ingest_single_batch(
198204
if "max_attempts" not in retry_config and "total_max_attempts" not in retry_config:
199205
client_config = copy.deepcopy(client_config)
200206
client_config.retries = {"max_attempts": 10, "mode": "standard"}
201-
sagemaker_featurestore_runtime_client = boto3.Session().client(
207+
sagemaker_featurestore_runtime_client = boto3.Session(profile_name=profile_name).client(
202208
service_name="sagemaker-featurestore-runtime", config=client_config
203209
)
204210

@@ -287,6 +293,7 @@ def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None):
287293
data_frame[start_index:end_index],
288294
start_index,
289295
timeout,
296+
self.profile_name,
290297
)
291298
]
292299

@@ -311,6 +318,7 @@ def _run_multi_threaded(
311318
data_frame: DataFrame,
312319
row_offset=0,
313320
timeout=None,
321+
profile_name=None,
314322
) -> List[int]:
315323
"""Start the ingestion process.
316324
@@ -321,6 +329,8 @@ def _run_multi_threaded(
321329
wait (bool): whether to wait for the ingestion to finish or not.
322330
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
323331
if timeout is reached.
332+
profile_name (str): the profile credential should be used for ``PutRecord``
333+
(default: None).
324334
325335
Returns:
326336
List of row indices that failed to be ingested.
@@ -342,6 +352,7 @@ def _run_multi_threaded(
342352
start_index=start_index,
343353
end_index=end_index,
344354
client_config=sagemaker_fs_runtime_client_config,
355+
profile_name=profile_name,
345356
)
346357
] = (start_index + row_offset, end_index + row_offset)
347358

@@ -581,6 +592,7 @@ def ingest(
581592
max_processes: int = 1,
582593
wait: bool = True,
583594
timeout: Union[int, float] = None,
595+
profile_name: str = None,
584596
) -> IngestionManagerPandas:
585597
"""Ingest the content of a pandas DataFrame to feature store.
586598
@@ -599,6 +611,11 @@ def ingest(
599611
They can also be found from the IngestionManagerPandas' ``failed_rows`` function after
600612
the exception is thrown.
601613
614+
`profile_name` argument is an optional one. It will use the default credential if None is
615+
passed. This `profile_name` is used in the sagemaker_featurestore_runtime client only. See
616+
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more
617+
about the default credential.
618+
602619
Args:
603620
data_frame (DataFrame): data_frame to be ingested to feature store.
604621
max_workers (int): number of threads to be created.
@@ -607,6 +624,8 @@ def ingest(
607624
wait (bool): whether to wait for the ingestion to finish or not.
608625
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
609626
if timeout is reached.
627+
profile_name (str): the profile credential should be used for ``PutRecord``
628+
(default: None).
610629
611630
Returns:
612631
An instance of IngestionManagerPandas.
@@ -622,6 +641,7 @@ def ingest(
622641
sagemaker_fs_runtime_client_config=self.sagemaker_session.sagemaker_featurestore_runtime_client.meta.config,
623642
max_workers=max_workers,
624643
max_processes=max_processes,
644+
profile_name=profile_name,
625645
)
626646

627647
manager.run(data_frame=data_frame, wait=wait, timeout=timeout)

tests/integ/sagemaker/lineage/conftest.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
from tests.integ.sagemaker.lineage.helpers import name, names
3737

3838
SLEEP_TIME_SECONDS = 1
39-
STATIC_PIPELINE_NAME = "SdkIntegTestStaticPipeline13"
40-
STATIC_ENDPOINT_NAME = "SdkIntegTestStaticEndpoint13"
39+
STATIC_PIPELINE_NAME = "SdkIntegTestStaticPipeline14"
40+
STATIC_ENDPOINT_NAME = "SdkIntegTestStaticEndpoint14"
4141

4242

4343
@pytest.fixture

tests/unit/sagemaker/feature_store/test_feature_store.py

+48
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import pandas as pd
1818
import pytest
1919
from mock import Mock, patch, MagicMock
20+
from botocore.exceptions import ProfileNotFound
2021

2122
from sagemaker.feature_store.feature_definition import (
2223
FractionalFeatureDefinition,
@@ -227,6 +228,34 @@ def test_ingest(ingestion_manager_init, sagemaker_session_mock, fs_runtime_clien
227228
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
228229
max_workers=10,
229230
max_processes=1,
231+
profile_name=None,
232+
)
233+
mock_ingestion_manager_instance.run.assert_called_once_with(
234+
data_frame=df, wait=True, timeout=None
235+
)
236+
237+
238+
@patch("sagemaker.feature_store.feature_group.IngestionManagerPandas")
239+
def test_ingest_with_profile_name(
240+
ingestion_manager_init, sagemaker_session_mock, fs_runtime_client_config_mock
241+
):
242+
sagemaker_session_mock.sagemaker_featurestore_runtime_client.meta.config = (
243+
fs_runtime_client_config_mock
244+
)
245+
246+
feature_group = FeatureGroup(name="MyGroup", sagemaker_session=sagemaker_session_mock)
247+
df = pd.DataFrame(dict((f"float{i}", pd.Series([2.0], dtype="float64")) for i in range(300)))
248+
249+
mock_ingestion_manager_instance = Mock()
250+
ingestion_manager_init.return_value = mock_ingestion_manager_instance
251+
feature_group.ingest(data_frame=df, max_workers=10, profile_name="profile_name")
252+
253+
ingestion_manager_init.assert_called_once_with(
254+
feature_group_name="MyGroup",
255+
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
256+
max_workers=10,
257+
max_processes=1,
258+
profile_name="profile_name",
230259
)
231260
mock_ingestion_manager_instance.run.assert_called_once_with(
232261
data_frame=df, wait=True, timeout=None
@@ -340,6 +369,25 @@ def test_ingestion_manager_run_failure():
340369
assert manager.failed_rows == [1]
341370

342371

372+
@patch(
373+
"sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_single_batch",
374+
MagicMock(side_effect=ProfileNotFound(profile="non_exist")),
375+
)
376+
def test_ingestion_manager_with_profile_name_run_failure():
377+
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
378+
manager = IngestionManagerPandas(
379+
feature_group_name="MyGroup",
380+
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
381+
max_workers=1,
382+
profile_name="non_exist",
383+
)
384+
385+
try:
386+
manager.run(df)
387+
except Exception as e:
388+
assert "The config profile (non_exist) could not be found" in str(e)
389+
390+
343391
@patch(
344392
"sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_single_batch",
345393
PicklableMock(return_value=[1]),

0 commit comments

Comments
 (0)