Skip to content

Commit f9f6c0d

Browse files
WeichenXu123HyukjinKwon
authored andcommitted
[SPARK-36425][PYSPARK][ML] Support CrossValidatorModel get standard deviation of metrics for each paramMap
Signed-off-by: Weichen Xu <weichen.xudatabricks.com> ### What changes were proposed in this pull request? Support CrossValidatorModel get standard deviation of metrics for each paramMap. ### Why are the changes needed? So that in mlflow autologging, we can log standard deviation of metrics which is useful. ### Does this PR introduce _any_ user-facing change? Yes. `CrossValidatorModel` add a public attribute `stdMetrics` which are the standard deviation of metrics for each paramMap ### How was this patch tested? Unit test. Closes #33652 from WeichenXu123/add_std_metric. Authored-by: Weichen Xu <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 4624e59 commit f9f6c0d

File tree

2 files changed

+63
-7
lines changed

2 files changed

+63
-7
lines changed

python/pyspark/ml/tests/test_tuning.py

+29
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,18 @@ def assert_param_maps_equal(self, paramMaps1, paramMaps2):
194194

195195
class CrossValidatorTests(SparkSessionTestCase, ValidatorTestUtilsMixin):
196196

197+
def test_gen_avg_and_std_metrics(self):
198+
metrics_all = [
199+
[1.0, 3.0, 2.0, 4.0],
200+
[3.0, 2.0, 2.0, 4.0],
201+
[3.0, 2.5, 2.1, 8.0],
202+
]
203+
avg_metrics, std_metrics = CrossValidator._gen_avg_and_std_metrics(metrics_all)
204+
assert np.allclose(avg_metrics, [2.33333333, 2.5, 2.03333333, 5.33333333])
205+
assert np.allclose(std_metrics, [0.94280904, 0.40824829, 0.04714045, 1.88561808])
206+
assert isinstance(avg_metrics, list)
207+
assert isinstance(std_metrics, list)
208+
197209
def test_copy(self):
198210
dataset = self.spark.createDataFrame([
199211
(10, 10.0),
@@ -232,6 +244,7 @@ def test_copy(self):
232244
for index in range(len(cvModel.avgMetrics)):
233245
self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
234246
< 0.0001)
247+
self.assertTrue(np.allclose(cvModel.stdMetrics, cvModelCopied.stdMetrics))
235248
# SPARK-32092: CrossValidatorModel.copy() needs to copy all existing params
236249
for param in [
237250
lambda x: x.getNumFolds(),
@@ -246,6 +259,12 @@ def test_copy(self):
246259
'foo',
247260
"Changing the original avgMetrics should not affect the copied model"
248261
)
262+
cvModel.stdMetrics[0] = 'foo'
263+
self.assertNotEqual(
264+
cvModelCopied.stdMetrics[0],
265+
'foo',
266+
"Changing the original stdMetrics should not affect the copied model"
267+
)
249268
cvModel.subModels[0][0].getInducedError = lambda: 'foo'
250269
self.assertNotEqual(
251270
cvModelCopied.subModels[0][0].getInducedError(),
@@ -353,6 +372,15 @@ def _run_test_save_load_trained_model(self, LogisticRegressionCls, LogisticRegre
353372
loadedCvModel.isSet(param) for param in loadedCvModel.params
354373
))
355374

375+
# mimic old version CrossValidatorModel (without stdMetrics attribute)
376+
# test loading model backwards compatibility
377+
cvModel2 = cvModel.copy()
378+
cvModel2.stdMetrics = []
379+
cvModelPath2 = temp_path + "/cvModel2"
380+
cvModel2.save(cvModelPath2)
381+
loadedCvModel2 = CrossValidatorModel.load(cvModelPath2)
382+
assert loadedCvModel2.stdMetrics == []
383+
356384
def test_save_load_trained_model(self):
357385
self._run_test_save_load_trained_model(LogisticRegression, LogisticRegressionModel)
358386
self._run_test_save_load_trained_model(DummyLogisticRegression,
@@ -414,6 +442,7 @@ def test_parallel_evaluation(self):
414442
cv.setParallelism(2)
415443
cvParallelModel = cv.fit(dataset)
416444
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
445+
self.assertEqual(cvSerialModel.stdMetrics, cvParallelModel.stdMetrics)
417446

418447
def test_expose_sub_models(self):
419448
temp_path = tempfile.mkdtemp()

python/pyspark/ml/tuning.py

+34-7
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,10 @@ def load(self, path):
499499
bestModelPath = os.path.join(path, 'bestModel')
500500
bestModel = DefaultParamsReader.loadParamsInstance(bestModelPath, self.sc)
501501
avgMetrics = metadata['avgMetrics']
502+
if 'stdMetrics' in metadata:
503+
stdMetrics = metadata['stdMetrics']
504+
else:
505+
stdMetrics = None
502506
persistSubModels = ('persistSubModels' in metadata) and metadata['persistSubModels']
503507

504508
if persistSubModels:
@@ -512,7 +516,9 @@ def load(self, path):
512516
else:
513517
subModels = None
514518

515-
cvModel = CrossValidatorModel(bestModel, avgMetrics=avgMetrics, subModels=subModels)
519+
cvModel = CrossValidatorModel(
520+
bestModel, avgMetrics=avgMetrics, subModels=subModels, stdMetrics=stdMetrics
521+
)
516522
cvModel = cvModel._resetUid(metadata['uid'])
517523
cvModel.set(cvModel.estimator, estimator)
518524
cvModel.set(cvModel.estimatorParamMaps, estimatorParamMaps)
@@ -536,6 +542,9 @@ def saveImpl(self, path):
536542
.getValidatorModelWriterPersistSubModelsParam(self)
537543
extraMetadata = {'avgMetrics': instance.avgMetrics,
538544
'persistSubModels': persistSubModels}
545+
if instance.stdMetrics:
546+
extraMetadata['stdMetrics'] = instance.stdMetrics
547+
539548
_ValidatorSharedReadWrite.saveImpl(path, instance, self.sc, extraMetadata=extraMetadata)
540549
bestModelPath = os.path.join(path, 'bestModel')
541550
instance.bestModel.save(bestModelPath)
@@ -710,13 +719,19 @@ def setCollectSubModels(self, value):
710719
"""
711720
return self._set(collectSubModels=value)
712721

722+
@staticmethod
723+
def _gen_avg_and_std_metrics(metrics_all):
724+
avg_metrics = np.mean(metrics_all, axis=0)
725+
std_metrics = np.std(metrics_all, axis=0)
726+
return list(avg_metrics), list(std_metrics)
727+
713728
def _fit(self, dataset):
714729
est = self.getOrDefault(self.estimator)
715730
epm = self.getOrDefault(self.estimatorParamMaps)
716731
numModels = len(epm)
717732
eva = self.getOrDefault(self.evaluator)
718733
nFolds = self.getOrDefault(self.numFolds)
719-
metrics = [0.0] * numModels
734+
metrics_all = [[0.0] * numModels for i in range(nFolds)]
720735

721736
pool = ThreadPool(processes=min(self.getParallelism(), numModels))
722737
subModels = None
@@ -733,19 +748,21 @@ def _fit(self, dataset):
733748
inheritable_thread_target,
734749
_parallelFitTasks(est, train, eva, validation, epm, collectSubModelsParam))
735750
for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
736-
metrics[j] += (metric / nFolds)
751+
metrics_all[i][j] = metric
737752
if collectSubModelsParam:
738753
subModels[i][j] = subModel
739754

740755
validation.unpersist()
741756
train.unpersist()
742757

758+
metrics, std_metrics = CrossValidator._gen_avg_and_std_metrics(metrics_all)
759+
743760
if eva.isLargerBetter():
744761
bestIndex = np.argmax(metrics)
745762
else:
746763
bestIndex = np.argmin(metrics)
747764
bestModel = est.fit(dataset, epm[bestIndex])
748-
return self._copyValues(CrossValidatorModel(bestModel, metrics, subModels))
765+
return self._copyValues(CrossValidatorModel(bestModel, metrics, subModels, std_metrics))
749766

750767
def _kFold(self, dataset):
751768
nFolds = self.getOrDefault(self.numFolds)
@@ -875,15 +892,20 @@ def _to_java(self):
875892

876893
class CrossValidatorModel(Model, _CrossValidatorParams, MLReadable, MLWritable):
877894
"""
878-
879895
CrossValidatorModel contains the model with the highest average cross-validation
880896
metric across folds and uses this model to transform input data. CrossValidatorModel
881897
also tracks the metrics for each param map evaluated.
882898
883899
.. versionadded:: 1.4.0
900+
901+
Notes
902+
-----
903+
Since version 3.3.0, CrossValidatorModel contains a new attribute "stdMetrics",
904+
which represent standard deviation of metrics for each paramMap in
905+
CrossValidator.estimatorParamMaps.
884906
"""
885907

886-
def __init__(self, bestModel, avgMetrics=None, subModels=None):
908+
def __init__(self, bestModel, avgMetrics=None, subModels=None, stdMetrics=None):
887909
super(CrossValidatorModel, self).__init__()
888910
#: best model from cross validation
889911
self.bestModel = bestModel
@@ -892,6 +914,9 @@ def __init__(self, bestModel, avgMetrics=None, subModels=None):
892914
self.avgMetrics = avgMetrics or []
893915
#: sub model list from cross validation
894916
self.subModels = subModels
917+
#: standard deviation of metrics for each paramMap in
918+
#: CrossValidator.estimatorParamMaps, in the corresponding order.
919+
self.stdMetrics = stdMetrics or []
895920

896921
def _transform(self, dataset):
897922
return self.bestModel.transform(dataset)
@@ -924,7 +949,9 @@ def copy(self, extra=None):
924949
[sub_model.copy() for sub_model in fold_sub_models]
925950
for fold_sub_models in self.subModels
926951
]
927-
return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels), extra=extra)
952+
stdMetrics = list(self.stdMetrics)
953+
return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels, stdMetrics),
954+
extra=extra)
928955

929956
@since("2.3.0")
930957
def write(self):

0 commit comments

Comments
 (0)