@@ -240,10 +240,8 @@ def wait(self, timeout=None):
240
240
"""
241
241
results = None
242
242
try :
243
- self ._async_result .wait (timeout = timeout )
244
- if self ._async_result .ready () and self ._async_result ._success :
245
- results = self ._async_result ._value
246
- except KeyboardInterrupt as i :
243
+ results = self ._async_result .get (timeout = timeout )
244
+ except KeyboardInterrupt or NotImplementedError as i :
247
245
# terminate workers abruptly on keyboard interrupt.
248
246
self ._processing_pool .terminate ()
249
247
self ._processing_pool .close ()
@@ -254,20 +252,20 @@ def wait(self, timeout=None):
254
252
self ._processing_pool .close ()
255
253
self ._processing_pool .clear ()
256
254
257
- if not results or results == NotImplementedError :
255
+ if results == None :
258
256
return
259
-
260
- self ._failed_indices = [
261
- failed_index for failed_indices in results for failed_index in failed_indices
262
- ]
257
+ else :
258
+ self ._failed_indices = [
259
+ failed_index for failed_indices in results for failed_index in failed_indices
260
+ ]
263
261
264
262
if len (self ._failed_indices ) > 0 :
265
263
raise IngestionError (
266
264
self ._failed_indices ,
267
265
f"Failed to ingest some data into FeatureGroup { self .feature_group_name } " ,
268
266
)
269
267
270
- def _run_multi_process (self , data_frame : DataFrame , wait = True , timeout = None ):
268
+ def _run_multi_process (self , data_frame : DataFrame , wait = True , timeout = None ): # Not in use
271
269
"""Start the ingestion process with the specified number of processes.
272
270
273
271
Args:
@@ -313,7 +311,6 @@ def _run_multi_threaded(self, data_frame: DataFrame, row_offset=0, timeout=None)
313
311
"""
314
312
executor = ThreadPoolExecutor (max_workers = self .max_workers )
315
313
batch_size = math .ceil (data_frame .shape [0 ] / self .max_workers )
316
-
317
314
futures = {}
318
315
for i in range (self .max_workers ):
319
316
start_index = min (i * batch_size , data_frame .shape [0 ])
@@ -338,7 +335,6 @@ def _run_multi_threaded(self, data_frame: DataFrame, row_offset=0, timeout=None)
338
335
else :
339
336
logger .info ("Successfully ingested row %d to %d" , start , end )
340
337
failed_indices += result
341
-
342
338
executor .shutdown (wait = False )
343
339
344
340
return failed_indices
@@ -352,7 +348,8 @@ def run(self, data_frame: DataFrame, wait=True, timeout=None):
352
348
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
353
349
if timeout is reached.
354
350
"""
355
- self ._run_multi_process (data_frame = data_frame , wait = wait , timeout = timeout )
351
+ #self._run_multi_process(data_frame=data_frame, wait=wait, timeout=timeout)
352
+ self ._run_multi_threaded (data_frame = data_frame , timeout = timeout )
356
353
357
354
358
355
class IngestionError (Exception ):
0 commit comments