Skip to content

fix: use FeatureGroup's Session in nonconcurrency ingestion #3617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 79 additions & 19 deletions src/sagemaker/feature_store/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class IngestionManagerPandas:
feature_group_name (str): name of the Feature Group.
sagemaker_fs_runtime_client_config (Config): instance of the Config class
for boto calls.
sagemaker_session (Session): session instance to perform boto calls.
data_frame (DataFrame): pandas DataFrame to be ingested to the given feature group.
max_workers (int): number of threads to create.
max_processes (int): number of processes to create. Each process spawns
Expand All @@ -174,7 +175,8 @@ class IngestionManagerPandas:
"""

feature_group_name: str = attr.ib()
sagemaker_fs_runtime_client_config: Config = attr.ib()
sagemaker_fs_runtime_client_config: Config = attr.ib(default=None)
sagemaker_session: Session = attr.ib(default=None)
max_workers: int = attr.ib(default=1)
max_processes: int = attr.ib(default=1)
profile_name: str = attr.ib(default=None)
Expand Down Expand Up @@ -210,29 +212,20 @@ def _ingest_single_batch(
if "max_attempts" not in retry_config and "total_max_attempts" not in retry_config:
client_config = copy.deepcopy(client_config)
client_config.retries = {"max_attempts": 10, "mode": "standard"}
sagemaker_featurestore_runtime_client = boto3.Session(profile_name=profile_name).client(
sagemaker_fs_runtime_client = boto3.Session(profile_name=profile_name).client(
service_name="sagemaker-featurestore-runtime", config=client_config
)

logger.info("Started ingesting index %d to %d", start_index, end_index)
failed_rows = list()
for row in data_frame[start_index:end_index].itertuples():
record = [
FeatureValue(
feature_name=data_frame.columns[index - 1],
value_as_string=str(row[index]),
)
for index in range(1, len(row))
if pd.notna(row[index])
]
try:
sagemaker_featurestore_runtime_client.put_record(
FeatureGroupName=feature_group_name,
Record=[value.to_dict() for value in record],
)
except Exception as e: # pylint: disable=broad-except
logger.error("Failed to ingest row %d: %s", row[0], e)
failed_rows.append(row[0])
IngestionManagerPandas._ingest_row(
data_frame=data_frame,
row=row,
feature_group_name=feature_group_name,
sagemaker_fs_runtime_client=sagemaker_fs_runtime_client,
failed_rows=failed_rows,
)
return failed_rows

@property
Expand Down Expand Up @@ -274,6 +267,69 @@ def wait(self, timeout=None):
f"Failed to ingest some data into FeatureGroup {self.feature_group_name}",
)

@staticmethod
def _ingest_row(
data_frame: DataFrame,
row: int,
feature_group_name: str,
sagemaker_fs_runtime_client: Session,
failed_rows: List[int],
):
"""Ingest a single Dataframe row into FeatureStore.

Args:
data_frame (DataFrame): source DataFrame to be ingested.
row (int): current row that is being ingested
feature_group_name (str): name of the Feature Group.
sagemaker_featurestore_runtime_client (Session): session instance to perform boto calls.
failed_rows (List[int]): list of indices from the data frame for which ingestion failed.


Returns:
int of row indices that failed to be ingested.
"""
record = [
FeatureValue(
feature_name=data_frame.columns[index - 1],
value_as_string=str(row[index]),
)
for index in range(1, len(row))
if pd.notna(row[index])
]
try:
sagemaker_fs_runtime_client.put_record(
FeatureGroupName=feature_group_name,
Record=[value.to_dict() for value in record],
)
except Exception as e: # pylint: disable=broad-except
logger.error("Failed to ingest row %d: %s", row[0], e)
failed_rows.append(row[0])

def _run_single_process_single_thread(self, data_frame: DataFrame):
"""Ingest a utilizing single process and single thread.

Args:
data_frame (DataFrame): source DataFrame to be ingested.
"""
logger.info("Started ingesting index %d to %d")
failed_rows = list()
sagemaker_fs_runtime_client = self.sagemaker_session.sagemaker_featurestore_runtime_client
for row in data_frame.itertuples():
IngestionManagerPandas._ingest_row(
data_frame=data_frame,
row=row,
feature_group_name=self.feature_group_name,
sagemaker_fs_runtime_client=sagemaker_fs_runtime_client,
failed_rows=failed_rows,
)
self._failed_indices = failed_rows

if len(self._failed_indices) > 0:
raise IngestionError(
self._failed_indices,
f"Failed to ingest some data into FeatureGroup {self.feature_group_name}",
)

def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None):
"""Start the ingestion process with the specified number of processes.

Expand Down Expand Up @@ -385,7 +441,10 @@ def run(self, data_frame: DataFrame, wait=True, timeout=None):
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
if timeout is reached.
"""
self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout)
if self.max_workers == 1 and self.max_processes == 1 and self.profile_name is None:
self._run_single_process_single_thread(data_frame=data_frame)
else:
self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout)


class IngestionError(Exception):
Expand Down Expand Up @@ -744,6 +803,7 @@ def ingest(

manager = IngestionManagerPandas(
feature_group_name=self.name,
sagemaker_session=self.sagemaker_session,
sagemaker_fs_runtime_client_config=self.sagemaker_session.sagemaker_featurestore_runtime_client.meta.config,
max_workers=max_workers,
max_processes=max_processes,
Expand Down
39 changes: 36 additions & 3 deletions tests/unit/sagemaker/feature_store/test_feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def test_ingest(ingestion_manager_init, sagemaker_session_mock, fs_runtime_clien

ingestion_manager_init.assert_called_once_with(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=10,
max_processes=1,
Expand All @@ -317,6 +318,32 @@ def test_ingest(ingestion_manager_init, sagemaker_session_mock, fs_runtime_clien
)


@patch("sagemaker.feature_store.feature_group.IngestionManagerPandas")
def test_ingest_default(ingestion_manager_init, sagemaker_session_mock):
sagemaker_session_mock.sagemaker_featurestore_runtime_client.meta.config = (
fs_runtime_client_config_mock
)

feature_group = FeatureGroup(name="MyGroup", sagemaker_session=sagemaker_session_mock)
df = pd.DataFrame(dict((f"float{i}", pd.Series([2.0], dtype="float64")) for i in range(300)))

mock_ingestion_manager_instance = Mock()
ingestion_manager_init.return_value = mock_ingestion_manager_instance
feature_group.ingest(data_frame=df)

ingestion_manager_init.assert_called_once_with(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=1,
max_processes=1,
profile_name=None,
)
mock_ingestion_manager_instance.run.assert_called_once_with(
data_frame=df, wait=True, timeout=None
)


@patch("sagemaker.feature_store.feature_group.IngestionManagerPandas")
def test_ingest_with_profile_name(
ingestion_manager_init, sagemaker_session_mock, fs_runtime_client_config_mock
Expand All @@ -334,6 +361,7 @@ def test_ingest_with_profile_name(

ingestion_manager_init.assert_called_once_with(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=10,
max_processes=1,
Expand Down Expand Up @@ -403,6 +431,7 @@ def test_ingestion_manager_run_success():
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
manager = IngestionManagerPandas(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=10,
)
Expand All @@ -421,6 +450,7 @@ def test_ingestion_manager_run_multi_process_with_multi_thread_success(
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
manager = IngestionManagerPandas(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=2,
max_processes=2,
Expand All @@ -436,16 +466,17 @@ def test_ingestion_manager_run_failure():
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
manager = IngestionManagerPandas(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=1,
max_workers=2,
)

with pytest.raises(IngestionError) as error:
manager.run(df)

assert "Failed to ingest some data into FeatureGroup MyGroup" in str(error)
assert error.value.failed_rows == [1]
assert manager.failed_rows == [1]
assert error.value.failed_rows == [1, 1]
assert manager.failed_rows == [1, 1]


@patch(
Expand All @@ -456,6 +487,7 @@ def test_ingestion_manager_with_profile_name_run_failure():
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
manager = IngestionManagerPandas(
feature_group_name="MyGroup",
sagemaker_session=sagemaker_session_mock,
sagemaker_fs_runtime_client_config=fs_runtime_client_config_mock,
max_workers=1,
profile_name="non_exist",
Expand All @@ -475,6 +507,7 @@ def test_ingestion_manager_run_multi_process_failure():
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
manager = IngestionManagerPandas(
feature_group_name="MyGroup",
sagemaker_session=None,
sagemaker_fs_runtime_client_config=None,
max_workers=2,
max_processes=2,
Expand Down