Skip to content

fix: multiprocess issue in feature_group.py #2573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions src/sagemaker/feature_store/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def _ingest_single_batch(
for row in data_frame[start_index:end_index].itertuples():
record = [
FeatureValue(
feature_name=data_frame.columns[index - 1], value_as_string=str(row[index])
feature_name=data_frame.columns[index - 1],
value_as_string=str(row[index]),
)
for index in range(1, len(row))
if pd.notna(row[index])
Expand Down Expand Up @@ -270,13 +271,24 @@ def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None):
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
if timeout is reached.
"""
# pylint: disable=I1101
batch_size = math.ceil(data_frame.shape[0] / self.max_processes)
# pylint: enable=I1101

args = []
for i in range(self.max_processes):
start_index = min(i * batch_size, data_frame.shape[0])
end_index = min(i * batch_size + batch_size, data_frame.shape[0])
args += [(data_frame[start_index:end_index], start_index, timeout)]
args += [
(
self.max_workers,
self.feature_group_name,
self.sagemaker_fs_runtime_client_config,
data_frame[start_index:end_index],
start_index,
timeout,
)
]

def init_worker():
# ignore keyboard interrupts in child processes.
Expand All @@ -285,13 +297,21 @@ def init_worker():
self._processing_pool = ProcessingPool(self.max_processes, init_worker)
self._processing_pool.restart(force=True)

f = lambda x: self._run_multi_threaded(*x) # noqa: E731
f = lambda x: IngestionManagerPandas._run_multi_threaded(*x) # noqa: E731
self._async_result = self._processing_pool.amap(f, args)

if wait:
self.wait(timeout=timeout)

def _run_multi_threaded(self, data_frame: DataFrame, row_offset=0, timeout=None) -> List[int]:
@staticmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@verayu43 have you tested this change on py38 and py39 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the fraud detection example script reproduced the failure in py39, and after this fix the example script can run correctly in py39.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ahsan, the code change test successfully by running "tox -e py38 tests/unit" and "tox -e py39 tests/unit". Thanks!

def _run_multi_threaded(
max_workers: int,
feature_group_name: str,
sagemaker_fs_runtime_client_config: Config,
data_frame: DataFrame,
row_offset=0,
timeout=None,
) -> List[int]:
"""Start the ingestion process.

Args:
Expand All @@ -305,21 +325,23 @@ def _run_multi_threaded(self, data_frame: DataFrame, row_offset=0, timeout=None)
Returns:
List of row indices that failed to be ingested.
"""
executor = ThreadPoolExecutor(max_workers=self.max_workers)
batch_size = math.ceil(data_frame.shape[0] / self.max_workers)
executor = ThreadPoolExecutor(max_workers=max_workers)
# pylint: disable=I1101
batch_size = math.ceil(data_frame.shape[0] / max_workers)
# pylint: enable=I1101

futures = {}
for i in range(self.max_workers):
for i in range(max_workers):
start_index = min(i * batch_size, data_frame.shape[0])
end_index = min(i * batch_size + batch_size, data_frame.shape[0])
futures[
executor.submit(
self._ingest_single_batch,
feature_group_name=self.feature_group_name,
IngestionManagerPandas._ingest_single_batch,
feature_group_name=feature_group_name,
data_frame=data_frame,
start_index=start_index,
end_index=end_index,
client_config=self.sagemaker_fs_runtime_client_config,
client_config=sagemaker_fs_runtime_client_config,
)
] = (start_index + row_offset, end_index + row_offset)

Expand Down