Skip to content

Commit 3a1f0b3

Browse files
authored
fix: return all failed row indices in feature_group.ingest (#2193)
1 parent 95e790e commit 3a1f0b3

File tree

3 files changed

+37
-16
lines changed

3 files changed

+37
-16
lines changed

src/sagemaker/feature_store/feature_group.py

+33-14
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ class IngestionManagerPandas:
160160
data_frame: DataFrame = attr.ib()
161161
max_workers: int = attr.ib(default=1)
162162
_futures: Dict[Any, Any] = attr.ib(init=False, factory=dict)
163+
_failed_indices: List[int] = attr.ib(factory=list)
163164

164165
@staticmethod
165166
def _ingest_single_batch(
@@ -168,7 +169,7 @@ def _ingest_single_batch(
168169
sagemaker_session: Session,
169170
start_index: int,
170171
end_index: int,
171-
):
172+
) -> List[int]:
172173
"""Ingest a single batch of DataFrame rows into FeatureStore.
173174
174175
Args:
@@ -177,19 +178,38 @@ def _ingest_single_batch(
177178
sagemaker_session (Session): session instance to perform boto calls.
178179
start_index (int): starting position to ingest in this batch.
179180
end_index (int): ending position to ingest in this batch.
181+
182+
Returns:
183+
List of row indices that failed to be ingested.
180184
"""
181185
logger.info("Started ingesting index %d to %d", start_index, end_index)
182-
for row in data_frame[start_index:end_index].itertuples(index=False):
186+
failed_rows = list()
187+
for row in data_frame[start_index:end_index].itertuples():
183188
record = [
184189
FeatureValue(
185-
feature_name=data_frame.columns[index], value_as_string=str(row[index])
190+
feature_name=data_frame.columns[index - 1], value_as_string=str(row[index])
186191
)
187-
for index in range(len(row))
192+
for index in range(1, len(row))
188193
if pd.notna(row[index])
189194
]
190-
sagemaker_session.put_record(
191-
feature_group_name=feature_group_name, record=[value.to_dict() for value in record]
192-
)
195+
try:
196+
sagemaker_session.put_record(
197+
feature_group_name=feature_group_name,
198+
record=[value.to_dict() for value in record],
199+
)
200+
except Exception as e: # pylint: disable=broad-except
201+
logger.error("Failed to ingest row %d: %s", row[0], e)
202+
failed_rows.append(row[0])
203+
return failed_rows
204+
205+
@property
206+
def failed_rows(self) -> List[int]:
207+
"""Get rows that failed to ingest
208+
209+
Returns:
210+
List of row indices that failed to be ingested.
211+
"""
212+
return self._failed_indices
193213

194214
def wait(self, timeout=None):
195215
"""Wait for the ingestion process to finish.
@@ -198,18 +218,17 @@ def wait(self, timeout=None):
198218
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
199219
if timeout is reached.
200220
"""
201-
failed = False
221+
self._failed_indices = list()
202222
for future in as_completed(self._futures, timeout=timeout):
203223
start, end = self._futures[future]
204-
try:
205-
future.result()
206-
except Exception as e: # pylint: disable=broad-except
207-
failed = True
208-
logger.error("Failed to ingest row %d to %d: %s", start, end, e)
224+
result = future.result()
225+
if result:
226+
logger.error("Failed to ingest row %d to %d", start, end)
209227
else:
210228
logger.info("Successfully ingested row %d to %d", start, end)
229+
self._failed_indices += result
211230

212-
if failed:
231+
if len(self._failed_indices) > 0:
213232
raise RuntimeError(
214233
f"Failed to ingest some data into FeatureGroup {self.feature_group_name}"
215234
)

tests/integ/test_feature_store.py

+1
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ def test_create_feature_store(
197197
data_frame=pandas_data_frame, max_workers=3, wait=False
198198
)
199199
ingestion_manager.wait()
200+
assert 0 == len(ingestion_manager.failed_rows)
200201

201202
# Query the integrated Glue table.
202203
athena_query = feature_group.athena_query()

tests/unit/sagemaker/feature_store/test_feature_store.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -274,19 +274,20 @@ def test_ingestion_manager_run_success():
274274

275275
@patch(
276276
"sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_single_batch",
277-
MagicMock(side_effect=Exception("Failed!")),
277+
MagicMock(return_value=[1]),
278278
)
279279
def test_ingestion_manager_run_failure():
280280
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
281281
manager = IngestionManagerPandas(
282282
feature_group_name="MyGroup",
283283
sagemaker_session=sagemaker_session_mock,
284284
data_frame=df,
285-
max_workers=10,
285+
max_workers=1,
286286
)
287287
with pytest.raises(RuntimeError) as error:
288288
manager.run()
289289
assert "Failed to ingest some data into FeatureGroup MyGroup" in str(error)
290+
assert manager.failed_rows == [1]
290291

291292

292293
@pytest.fixture

0 commit comments

Comments
 (0)