Skip to content

Commit ccc017a

Browse files
author
Rui Wang Napieralski
committed
fix: return all failed row indices in feature_group.ingest
1 parent 3dd4373 commit ccc017a

File tree

3 files changed

+39
-26
lines changed

3 files changed

+39
-26
lines changed

src/sagemaker/feature_store/feature_group.py

+34-20
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def _ingest_single_batch(
168168
sagemaker_session: Session,
169169
start_index: int,
170170
end_index: int,
171-
):
171+
) -> List[int]:
172172
"""Ingest a single batch of DataFrame rows into FeatureStore.
173173
174174
Args:
@@ -177,50 +177,62 @@ def _ingest_single_batch(
177177
sagemaker_session (Session): session instance to perform boto calls.
178178
start_index (int): starting position to ingest in this batch.
179179
end_index (int): ending position to ingest in this batch.
180+
181+
Returns:
182+
List of row indices that failed to be ingested.
180183
"""
181184
logger.info("Started ingesting index %d to %d", start_index, end_index)
182-
for row in data_frame[start_index:end_index].itertuples(index=False):
185+
failed_rows = list()
186+
for row in data_frame[start_index:end_index].itertuples():
183187
record = [
184188
FeatureValue(
185-
feature_name=data_frame.columns[index], value_as_string=str(row[index])
189+
feature_name=data_frame.columns[index - 1], value_as_string=str(row[index])
186190
)
187-
for index in range(len(row))
191+
for index in range(1, len(row))
188192
if pd.notna(row[index])
189193
]
190-
sagemaker_session.put_record(
191-
feature_group_name=feature_group_name, record=[value.to_dict() for value in record]
192-
)
194+
try:
195+
sagemaker_session.put_record(
196+
feature_group_name=feature_group_name,
197+
record=[value.to_dict() for value in record],
198+
)
199+
except Exception as e: # pylint: disable=broad-except
200+
logger.error("Failed to ingest row %d: %s", row[0], e)
201+
failed_rows.append(row[0])
202+
return failed_rows
193203

194-
def wait(self, timeout=None):
204+
def wait(self, timeout=None) -> List[int]:
195205
"""Wait for the ingestion process to finish.
196206
197207
Args:
198208
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
199209
if timeout is reached.
210+
211+
Returns:
212+
List of row indices that failed to be ingested.
200213
"""
201-
failed = False
214+
failed = []
202215
for future in as_completed(self._futures, timeout=timeout):
203216
start, end = self._futures[future]
204-
try:
205-
future.result()
206-
except Exception as e: # pylint: disable=broad-except
207-
failed = True
208-
logger.error("Failed to ingest row %d to %d: %s", start, end, e)
217+
result = future.result()
218+
if result:
219+
logger.error("Failed to ingest row %d to %d", start, end)
209220
else:
210221
logger.info("Successfully ingested row %d to %d", start, end)
222+
failed += result
211223

212-
if failed:
213-
raise RuntimeError(
214-
f"Failed to ingest some data into FeatureGroup {self.feature_group_name}"
215-
)
224+
return failed
216225

217-
def run(self, wait=True, timeout=None):
226+
def run(self, wait=True, timeout=None) -> List[int]:
218227
"""Start the ingestion process.
219228
220229
Args:
221230
wait (bool): whether to wait for the ingestion to finish or not.
222231
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
223232
if timeout is reached.
233+
234+
Returns:
235+
List of row indices that failed to be ingested.
224236
"""
225237
executor = ThreadPoolExecutor(max_workers=self.max_workers)
226238
batch_size = math.ceil(self.data_frame.shape[0] / self.max_workers)
@@ -241,9 +253,11 @@ def run(self, wait=True, timeout=None):
241253
] = (start_index, end_index)
242254

243255
self._futures = futures
256+
failed = []
244257
if wait:
245-
self.wait(timeout=timeout)
258+
failed = self.wait(timeout=timeout)
246259
executor.shutdown(wait=False)
260+
return failed
247261

248262

249263
@attr.s

tests/integ/test_feature_store.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ def test_create_feature_store(
196196
ingestion_manager = feature_group.ingest(
197197
data_frame=pandas_data_frame, max_workers=3, wait=False
198198
)
199-
ingestion_manager.wait()
199+
failed = ingestion_manager.wait()
200+
assert 0 == len(failed)
200201

201202
# Query the integrated Glue table.
202203
athena_query = feature_group.athena_query()

tests/unit/sagemaker/feature_store/test_feature_store.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -274,19 +274,17 @@ def test_ingestion_manager_run_success():
274274

275275
@patch(
276276
"sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_single_batch",
277-
MagicMock(side_effect=Exception("Failed!")),
277+
MagicMock(return_value=[1]),
278278
)
279279
def test_ingestion_manager_run_failure():
280280
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
281281
manager = IngestionManagerPandas(
282282
feature_group_name="MyGroup",
283283
sagemaker_session=sagemaker_session_mock,
284284
data_frame=df,
285-
max_workers=10,
285+
max_workers=1,
286286
)
287-
with pytest.raises(RuntimeError) as error:
288-
manager.run()
289-
assert "Failed to ingest some data into FeatureGroup MyGroup" in str(error)
287+
assert manager.run() == [1]
290288

291289

292290
@pytest.fixture

0 commit comments

Comments
 (0)