diff --git a/pandas/tests/window/moments/test_moments_consistency_expanding.py b/pandas/tests/window/moments/test_moments_consistency_expanding.py index 25a897545ce58..17f76bf824a5d 100644 --- a/pandas/tests/window/moments/test_moments_consistency_expanding.py +++ b/pandas/tests/window/moments/test_moments_consistency_expanding.py @@ -1,19 +1,8 @@ -import warnings - import numpy as np import pytest from pandas import DataFrame, Index, MultiIndex, Series, isna, notna import pandas._testing as tm -from pandas.tests.window.common import ( - moments_consistency_cov_data, - moments_consistency_is_constant, - moments_consistency_mock_mean, - moments_consistency_series_data, - moments_consistency_std_data, - moments_consistency_var_data, - moments_consistency_var_debiasing_factors, -) def test_expanding_corr(series): @@ -171,143 +160,173 @@ def test_expanding_min_periods_apply(engine_and_raw): @pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -def test_expanding_apply_consistency( - consistency_data, base_functions, no_nan_functions, min_periods -): +@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum]) +def test_expanding_apply_consistency_sum_nans(consistency_data, min_periods, f): x, is_constant, no_nans = consistency_data - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", message=".*(empty slice|0 for slice).*", category=RuntimeWarning + if f is np.nansum and min_periods == 0: + pass + else: + expanding_f_result = x.expanding(min_periods=min_periods).sum() + expanding_apply_f_result = x.expanding(min_periods=min_periods).apply( + func=f, raw=True ) - # test consistency between expanding_xyz() and either (a) - # expanding_apply of Series.xyz(), or (b) expanding_apply of - # np.nanxyz() - functions = base_functions - - # GH 8269 - if no_nans: - functions = base_functions + no_nan_functions - for (f, require_min_periods, name) in functions: - expanding_f = getattr(x.expanding(min_periods=min_periods), name) - - if ( - require_min_periods - and (min_periods is not None) - and (min_periods < require_min_periods) - ): - continue - - if name == "count": - expanding_f_result = expanding_f() - expanding_apply_f_result = x.expanding(min_periods=0).apply( - func=f, raw=True - ) - else: - if name in ["cov", "corr"]: - expanding_f_result = expanding_f(pairwise=False) - else: - expanding_f_result = expanding_f() - expanding_apply_f_result = x.expanding(min_periods=min_periods).apply( - func=f, raw=True - ) - - # GH 9422 - if name in ["sum", "prod"]: - tm.assert_equal(expanding_f_result, expanding_apply_f_result) + tm.assert_equal(expanding_f_result, expanding_apply_f_result) @pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -def test_moments_consistency_var(consistency_data, min_periods): +@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum]) +def test_expanding_apply_consistency_sum_no_nans(consistency_data, min_periods, f): + x, is_constant, no_nans = consistency_data - moments_consistency_var_data( - x=x, - is_constant=is_constant, - min_periods=min_periods, - count=lambda x: x.expanding(min_periods=min_periods).count(), - mean=lambda x: x.expanding(min_periods=min_periods).mean(), - var_unbiased=lambda x: x.expanding(min_periods=min_periods).var(), - var_biased=lambda x: x.expanding(min_periods=min_periods).var(ddof=0), - ) + + if no_nans: + if f is np.nansum and min_periods == 0: + pass + else: + expanding_f_result = x.expanding(min_periods=min_periods).sum() + expanding_apply_f_result = x.expanding(min_periods=min_periods).apply( + func=f, raw=True + ) + tm.assert_equal(expanding_f_result, expanding_apply_f_result) @pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -def test_expanding_consistency_std(consistency_data, min_periods): +@pytest.mark.parametrize("ddof", [0, 1]) +def test_moments_consistency_var(consistency_data, min_periods, ddof): x, is_constant, no_nans = consistency_data - moments_consistency_std_data( - x=x, - var_unbiased=lambda x: x.expanding(min_periods=min_periods).var(), - std_unbiased=lambda x: x.expanding(min_periods=min_periods).std(), - var_biased=lambda x: x.expanding(min_periods=min_periods).var(ddof=0), - std_biased=lambda x: x.expanding(min_periods=min_periods).std(ddof=0), - ) + + mean_x = x.expanding(min_periods=min_periods).mean() + var_x = x.expanding(min_periods=min_periods).var(ddof=ddof) + assert not (var_x < 0).any().any() + + if ddof == 0: + # check that biased var(x) == mean(x^2) - mean(x)^2 + mean_x2 = (x * x).expanding(min_periods=min_periods).mean() + tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x)) @pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -def test_expanding_consistency_cov(consistency_data, min_periods): +@pytest.mark.parametrize("ddof", [0, 1]) +def test_moments_consistency_var_constant(consistency_data, min_periods, ddof): x, is_constant, no_nans = consistency_data - moments_consistency_cov_data( - x=x, - var_unbiased=lambda x: x.expanding(min_periods=min_periods).var(), - cov_unbiased=lambda x, y: x.expanding(min_periods=min_periods).cov(y), - var_biased=lambda x: x.expanding(min_periods=min_periods).var(ddof=0), - cov_biased=lambda x, y: x.expanding(min_periods=min_periods).cov(y, ddof=0), - ) + + if is_constant: + count_x = x.expanding(min_periods=min_periods).count() + var_x = x.expanding(min_periods=min_periods).var(ddof=ddof) + + # check that variance of constant series is identically 0 + assert not (var_x > 0).any().any() + expected = x * np.nan + expected[count_x >= max(min_periods, 1)] = 0.0 + if ddof == 1: + expected[count_x < 2] = np.nan + tm.assert_equal(var_x, expected) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("ddof", [0, 1]) +def test_expanding_consistency_std(consistency_data, min_periods, ddof): + x, is_constant, no_nans = consistency_data + + var_x = x.expanding(min_periods=min_periods).var(ddof=ddof) + std_x = x.expanding(min_periods=min_periods).std(ddof=ddof) + assert not (var_x < 0).any().any() + assert not (std_x < 0).any().any() + + # check that var(x) == std(x)^2 + tm.assert_equal(var_x, std_x * std_x) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("ddof", [0, 1]) +def test_expanding_consistency_cov(consistency_data, min_periods, ddof): + x, is_constant, no_nans = consistency_data + var_x = x.expanding(min_periods=min_periods).var(ddof=ddof) + assert not (var_x < 0).any().any() + + cov_x_x = x.expanding(min_periods=min_periods).cov(x, ddof=ddof) + assert not (cov_x_x < 0).any().any() + + # check that var(x) == cov(x, x) + tm.assert_equal(var_x, cov_x_x) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("ddof", [0, 1]) +def test_expanding_consistency_series_cov_corr(consistency_data, min_periods, ddof): + x, is_constant, no_nans = consistency_data + + if isinstance(x, Series): + var_x_plus_y = (x + x).expanding(min_periods=min_periods).var(ddof=ddof) + var_x = x.expanding(min_periods=min_periods).var(ddof=ddof) + var_y = x.expanding(min_periods=min_periods).var(ddof=ddof) + cov_x_y = x.expanding(min_periods=min_periods).cov(x, ddof=ddof) + # check that cov(x, y) == (var(x+y) - var(x) - + # var(y)) / 2 + tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y)) + + # check that corr(x, y) == cov(x, y) / (std(x) * + # std(y)) + corr_x_y = x.expanding(min_periods=min_periods).corr(x) + std_x = x.expanding(min_periods=min_periods).std(ddof=ddof) + std_y = x.expanding(min_periods=min_periods).std(ddof=ddof) + tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y)) + + if ddof == 0: + # check that biased cov(x, y) == mean(x*y) - + # mean(x)*mean(y) + mean_x = x.expanding(min_periods=min_periods).mean() + mean_y = x.expanding(min_periods=min_periods).mean() + mean_x_times_y = (x * x).expanding(min_periods=min_periods).mean() + tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y)) @pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -def test_expanding_consistency_series(consistency_data, min_periods): +def test_expanding_consistency_mean(consistency_data, min_periods): x, is_constant, no_nans = consistency_data - moments_consistency_series_data( - x=x, - mean=lambda x: x.expanding(min_periods=min_periods).mean(), - corr=lambda x, y: x.expanding(min_periods=min_periods).corr(y), - var_unbiased=lambda x: x.expanding(min_periods=min_periods).var(), - std_unbiased=lambda x: x.expanding(min_periods=min_periods).std(), - cov_unbiased=lambda x, y: x.expanding(min_periods=min_periods).cov(y), - var_biased=lambda x: x.expanding(min_periods=min_periods).var(ddof=0), - std_biased=lambda x: x.expanding(min_periods=min_periods).std(ddof=0), - cov_biased=lambda x, y: x.expanding(min_periods=min_periods).cov(y, ddof=0), + + result = x.expanding(min_periods=min_periods).mean() + expected = ( + x.expanding(min_periods=min_periods).sum() + / x.expanding(min_periods=min_periods).count() ) + tm.assert_equal(result, expected.astype("float64")) -@pytest.mark.slow @pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -def test_expanding_consistency(consistency_data, min_periods): +def test_expanding_consistency_constant(consistency_data, min_periods): x, is_constant, no_nans = consistency_data - # suppress warnings about empty slices, as we are deliberately testing - # with empty/0-length Series/DataFrames - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", message=".*(empty slice|0 for slice).*", category=RuntimeWarning - ) - # test consistency between different expanding_* moments - moments_consistency_mock_mean( - x=x, - mean=lambda x: x.expanding(min_periods=min_periods).mean(), - mock_mean=lambda x: x.expanding(min_periods=min_periods).sum() - / x.expanding().count(), - ) + if is_constant: + count_x = x.expanding().count() + mean_x = x.expanding(min_periods=min_periods).mean() + # check that correlation of a series with itself is either 1 or NaN + corr_x_x = x.expanding(min_periods=min_periods).corr(x) - moments_consistency_is_constant( - x=x, - is_constant=is_constant, - min_periods=min_periods, - count=lambda x: x.expanding().count(), - mean=lambda x: x.expanding(min_periods=min_periods).mean(), - corr=lambda x, y: x.expanding(min_periods=min_periods).corr(y), - ) + exp = x.max() if isinstance(x, Series) else x.max().max() - moments_consistency_var_debiasing_factors( - x=x, - var_unbiased=lambda x: x.expanding(min_periods=min_periods).var(), - var_biased=lambda x: x.expanding(min_periods=min_periods).var(ddof=0), - var_debiasing_factors=lambda x: ( - x.expanding().count() - / (x.expanding().count() - 1.0).replace(0.0, np.nan) - ), - ) + # check mean of constant series + expected = x * np.nan + expected[count_x >= max(min_periods, 1)] = exp + tm.assert_equal(mean_x, expected) + + # check correlation of constant series with itself is NaN + expected[:] = np.nan + tm.assert_equal(corr_x_x, expected) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +def test_expanding_consistency_var_debiasing_factors(consistency_data, min_periods): + x, is_constant, no_nans = consistency_data + + # check variance debiasing factors + var_unbiased_x = x.expanding(min_periods=min_periods).var() + var_biased_x = x.expanding(min_periods=min_periods).var(ddof=0) + var_debiasing_factors_x = x.expanding().count() / ( + x.expanding().count() - 1.0 + ).replace(0.0, np.nan) + tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x) @pytest.mark.parametrize(