jupytext | kernelspec | substitutions | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
|
(factor_analysis)=
:::{post} 19 Mar, 2022 :tags: factor analysis, matrix factorization, PCA :category: advanced, how-to :author: Chris Hartl, Christopher Krapu, Oriol Abril-Pla :::
+++
Factor analysis is a widely used probabilistic model for identifying low-rank structure in multivariate data as encoded in latent variables. It is very closely related to principal components analysis, and differs only in the prior distributions assumed for these latent variables. It is also a good example of a linear Gaussian model as it can be described entirely as a linear transformation of underlying Gaussian variates. For a high-level view of how factor analysis relates to other models, you can check out this diagram originally published by Ghahramani and Roweis.
+++
:::{include} ../extra_installs.md :::
import aesara.tensor as at
import arviz as az
import matplotlib
import numpy as np
import pymc as pm
import scipy as sp
import seaborn as sns
import xarray as xr
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
from numpy.random import default_rng
from xarray_einstats import linalg
from xarray_einstats.stats import XrContinuousRV
print(f"Running on PyMC3 v{pm.__version__}")
%config InlineBackend.figure_format = 'retina'
az.style.use("arviz-darkgrid")
np.set_printoptions(precision=3, suppress=True)
RANDOM_SEED = 31415
rng = default_rng(RANDOM_SEED)
+++
To work through a few examples, we'll first generate some data. The data will not follow the exact generative process assumed by the factor analysis model, as the latent variates will not be Gaussian. We'll assume that we have an observed data set with
n = 250
k_true = 4
d = 10
The next code cell generates the data via creating latent variable arrays M
and linear transformation Q
. Then, the matrix product err_sd
.
err_sd = 2
M = rng.binomial(1, 0.25, size=(k_true, n))
Q = np.hstack([rng.exponential(2 * k_true - k, size=(d, 1)) for k in range(k_true)]) * rng.binomial(
1, 0.75, size=(d, k_true)
)
Y = np.round(1000 * np.dot(Q, M) + rng.standard_normal(size=(d, n)) * err_sd) / 1000
Because of the way we have generated the data, the covariance matrix expressing correlations between columns of
plt.figure(figsize=(4, 3))
sns.heatmap(np.corrcoef(Y));
If you squint long enough, you may be able to glimpse a few places where distinct columns are likely linear functions of each other.
+++
Probabilistic PCA (PPCA) and factor analysis (FA) are a common source of topics on PyMC Discourse. The posts linked below handle different aspects of the problem including:
+++
+++
The model for factor analysis is the probabilistic matrix factorization
with
k = 2
coords = {"latent_columns": np.arange(k), "rows": np.arange(n), "observed_columns": np.arange(d)}
with pm.Model(coords=coords) as PPCA:
W = pm.Normal("W", dims=("observed_columns", "latent_columns"))
F = pm.Normal("F", dims=("latent_columns", "rows"))
psi = pm.HalfNormal("psi", 1.0)
X = pm.Normal("X", mu=at.dot(W, F), sigma=psi, observed=Y, dims=("observed_columns", "rows"))
trace = pm.sample(tune=2000, random_seed=RANDOM_SEED) # target_accept=0.9
At this point, there are already several warnings regarding diverging samples and failure of convergence checks. We can see further problems in the trace plot below. This plot shows the path taken by each sampler chain for a single entry in the matrix
for i in trace.posterior.chain.values:
samples = trace.posterior["W"].sel(chain=i, observed_columns=3, latent_columns=1)
plt.plot(samples, label="Chain {}".format(i + 1))
plt.axhline(samples.mean(), color=f"C{i}")
plt.legend(ncol=4, loc="upper center", fontsize=12, frameon=True), plt.xlabel("Sample");
Each chain appears to have a different sample mean and we can also see that there is a great deal of autocorrelation across chains, manifest as long-range trends over sampling iterations. Some of the chains may have divergences as well, lending further evidence to the claim that using MCMC for this model as shown is suboptimal.
One of the primary drawbacks for this model formulation is its lack of identifiability. With this model representation, only the product
This can be fixed by constraining the form of W to be:
- Lower triangular
- Positive with an increasing diagonal
We can adapt expand_block_triangular
to fill out a non-square matrix. This function mimics pm.expand_packed_triangular
, but while the latter only works on packed versions of square matrices (i.e.
def expand_packed_block_triangular(d, k, packed, diag=None, mtype="aesara"):
# like expand_packed_triangular, but with d > k.
assert mtype in {"aesara", "numpy"}
assert d >= k
def set_(M, i_, v_):
if mtype == "aesara":
return at.set_subtensor(M[i_], v_)
M[i_] = v_
return M
out = at.zeros((d, k), dtype=float) if mtype == "aesara" else np.zeros((d, k), dtype=float)
if diag is None:
idxs = np.tril_indices(d, m=k)
out = set_(out, idxs, packed)
else:
idxs = np.tril_indices(d, k=-1, m=k)
out = set_(out, idxs, packed)
idxs = (np.arange(k), np.arange(k))
out = set_(out, idxs, diag)
return out
We'll also define another function which helps create a diagonal positive matrix with increasing entries along the main diagonal.
def makeW(d, k, dim_names):
# make a W matrix adapted to the data shape
n_od = int(k * d - k * (k - 1) / 2 - k)
# trick: the cumulative sum of z will be positive increasing
z = pm.HalfNormal("W_z", 1.0, dims="latent_columns")
b = pm.HalfNormal("W_b", 1.0, shape=(n_od,), dims="packed_dim")
L = expand_packed_block_triangular(d, k, b, at.ones(k))
W = pm.Deterministic("W", at.dot(L, at.diag(at.extra_ops.cumsum(z))), dims=dim_names)
return W
With these modifications, we remake the model and run the MCMC sampler again.
with pm.Model(coords=coords) as PPCA_identified:
W = makeW(d, k, ("observed_columns", "latent_columns"))
F = pm.Normal("F", dims=("latent_columns", "rows"))
psi = pm.HalfNormal("psi", 1.0)
X = pm.Normal("X", mu=at.dot(W, F), sigma=psi, observed=Y, dims=("observed_columns", "rows"))
trace = pm.sample(tune=2000) # target_accept=0.9
for i in range(4):
samples = trace.posterior["W"].sel(chain=i, observed_columns=3, latent_columns=1)
plt.plot(samples, label="Chain {}".format(i + 1))
plt.legend(ncol=4, loc="lower center", fontsize=8), plt.xlabel("Sample");
Because the n
. In addition, the link between an observed data point
This scalability problem can be addressed analytically by integrating
If you are unfamiliar with the matrix normal distribution, you can consider it to be an extension of the multivariate Gaussian to matrix-valued random variates. Then, the between-row correlations and the between-column correlations are handled by two separate covariance matrices specified as parameters to the matrix normal. Here, it simplifies our notation for a model formulation that has marginalized out ADVI
and FullRankADVI
approximations.
coords["observed_columns2"] = coords["observed_columns"]
with pm.Model(coords=coords) as PPCA_scaling:
W = makeW(d, k, ("observed_columns", "latent_columns"))
Y_mb = pm.Minibatch(Y.T, 50) # MvNormal parametrizes covariance of columns, so transpose Y
psi = pm.HalfNormal("psi", 1.0)
E = pm.Deterministic(
"cov",
at.dot(W, at.transpose(W)) + psi * at.diag(at.ones(d)),
dims=("observed_columns", "observed_columns2"),
)
X = pm.MvNormal("X", 0.0, cov=E, observed=Y_mb)
trace_vi = pm.fit(n=50000, method="fullrank_advi", obj_n_mc=1).sample()
When we compare the posteriors calculated using MCMC and VI, we find that (for at least this specific parameter we are looking at) the two distributions are close, but they do differ in their mean. The MCMC chains all agree with each other and the ADVI estimate is not far off.
col_selection = dict(observed_columns=3, latent_columns=1)
ax = az.plot_kde(
trace_vi.posterior["W"].sel(**col_selection).squeeze().values,
label="FR-ADVI posterior",
plot_kwargs={"alpha": 0},
fill_kwargs={"alpha": 0.5, "color": "red"},
)
for i in trace.posterior.chain.values:
mcmc_samples = trace.posterior["W"].sel(chain=i, **col_selection).values
az.plot_kde(
mcmc_samples,
label="MCMC posterior for chain {}".format(i + 1),
plot_kwargs={"color": f"C{i}"},
)
ax.set_title(rf"PDFs of $W$ estimate at {col_selection}")
ax.legend(loc="upper right");
The matrix
+++
Here, we draw many random variates from a standard normal distribution, transforming them appropriate to represent the posterior of
# configure xarray-einstats
def get_default_dims(dims, dims2):
proposed_dims = [dim for dim in dims if dim not in {"chain", "draw"}]
assert len(proposed_dims) == 2
if dims2 is None:
return proposed_dims
linalg.get_default_dims = get_default_dims
post = trace_vi.posterior
obs = trace.observed_data
WW = linalg.inv(
linalg.matmul(
post["W"], post["W"], dims=("latent_columns", "observed_columns", "latent_columns")
)
)
WW_W = linalg.matmul(
WW,
post["W"],
dims=(("latent_columns", "latent_columns2"), ("latent_columns", "observed_columns")),
)
F_mu = xr.dot(1 / np.sqrt(post["psi"]) * WW_W, obs["X"], dims="observed_columns")
WW_chol = linalg.cholesky(WW)
norm_dist = XrContinuousRV(sp.stats.norm, xr.zeros_like(F_mu)) # the zeros_like defines the shape
F_sampled = F_mu + linalg.matmul(
WW_chol,
norm_dist.rvs(),
dims=(("latent_columns", "latent_columns2"), ("latent_columns", "rows")),
)
fig, ax = plt.subplots()
ls = ["-", "--"]
for i in range(2):
for j in range(5):
az.plot_kde(
F_sampled.sel(latent_columns=i, rows=j).squeeze().values,
plot_kwargs={"color": f"C{j}", "ls": ls[i]},
ax=ax,
)
legend = ax.legend(
handles=[Line2D([], [], color="k", ls=ls[i], label=f"{i}") for i in range(2)],
title="latent column",
loc="upper left",
)
ax.add_artist(legend)
ax.legend(
handles=[Line2D([], [], color=f"C{i}", label=f"{i}") for i in range(5)],
title="row",
loc="upper right",
);
- Authored by chartl on May 6, 2019
- Updated by Christopher Krapu on April 4, 2021
- Updated by Oriol Abril-Pla to use PyMC v4 and xarray-einstats on March, 2022
+++
%load_ext watermark
%watermark -n -u -v -iv -w -p aeppl
:::{include} ../page_footer.md :::