diff --git a/pymc_experimental/__init__.py b/pymc_experimental/__init__.py index 55393fcbf..58a488df5 100644 --- a/pymc_experimental/__init__.py +++ b/pymc_experimental/__init__.py @@ -14,4 +14,5 @@ from pymc_experimental.bart import * from pymc_experimental import distributions from pymc_experimental import gp +from pymc_experimental import step_methods from pymc_experimental import utils diff --git a/pymc_experimental/step_methods/__init__.py b/pymc_experimental/step_methods/__init__.py new file mode 100644 index 000000000..0b52ff2f8 --- /dev/null +++ b/pymc_experimental/step_methods/__init__.py @@ -0,0 +1,6 @@ +from pymc_experimental.step_methods.mlda import ( + DEMetropolisZMLDA, + MetropolisMLDA, + MLDA, + RecursiveDAProposal, +) diff --git a/pymc_experimental/step_methods/mlda.py b/pymc_experimental/step_methods/mlda.py new file mode 100644 index 000000000..e210ec51d --- /dev/null +++ b/pymc_experimental/step_methods/mlda.py @@ -0,0 +1,1094 @@ +# Copyright 2022 The PyMC Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import warnings +from typing import Any, Dict, List, Optional, Tuple, Type, Union + +import arviz as az +import numpy as np +import pymc as pm +from aesara.tensor.sharedvar import TensorSharedVariable +from pymc.blocking import DictToArrayBijection, RaveledVars +from pymc.model import Model, Point +from pymc.step_methods.arraystep import ArrayStepShared, Competence, metrop_select +from pymc.step_methods.compound import CompoundStep +from pymc.step_methods.metropolis import DEMetropolisZ, Metropolis, Proposal, delta_logp + +__all__ = [ + "MetropolisMLDA", + "DEMetropolisZMLDA", + "RecursiveDAProposal", + "MLDA", + "extract_Q_estimate", +] + + +class MetropolisMLDA(Metropolis): + """ + Metropolis-Hastings sampling step tailored for use as base sampler in MLDA. + """ + + name = "metropolis_mlda" + + def __init__(self, *args, **kwargs): + """ + Initialise MetropolisMLDA. This is a mix of the parent's class' initialisation + and some extra code specific for MLDA. + """ + model = pm.modelcontext(kwargs.get("model", None)) + initial_values = model.initial_point() + + # flag to that variance reduction is activated - forces MetropolisMLDA + # to store quantities of interest in a register if True + self.mlda_variance_reduction = kwargs.pop("mlda_variance_reduction", False) + if self.mlda_variance_reduction: + # Subsampling rate of MLDA sampler one level up + self.mlda_subsampling_rate_above = kwargs.pop("mlda_subsampling_rate_above") + self.sub_counter = 0 + self.Q_last = np.nan + self.Q_reg = [np.nan] * self.mlda_subsampling_rate_above + + # call parent class __init__ + super().__init__(*args, **kwargs) + + # modify the delta function and point to model if VR is used + if self.mlda_variance_reduction: + self.model = model + self.delta_logp_factory = self.delta_logp + self.delta_logp = lambda q, q0: -self.delta_logp_factory(q0, q) + + def reset_tuning(self): + """ + Does not reset sampler parameters. Allows continuation with + the same settings when MetropolisMLDA steps are done in chunks + under MLDA. + """ + return + + def astep(self, q0): + + q_new, stats = super().astep(q0) + + # Add variance reduction functionality. + if self.mlda_variance_reduction: + if stats[0]["accepted"]: + self.Q_last = self.model.Q.get_value() + if self.sub_counter == self.mlda_subsampling_rate_above: + self.sub_counter = 0 + self.Q_reg[self.sub_counter] = self.Q_last + self.sub_counter += 1 + + return q_new, stats + + +class DEMetropolisZMLDA(DEMetropolisZ): + """ + DEMetropolisZ sampling step tailored for use as base sampler in MLDA + """ + + name = "DEMetropolisZ_mlda" + + def __init__(self, *args, **kwargs): + """ + Initialise DEMetropolisZMLDA, uses parent class __init__ + and extra code specific for use within MLDA. + """ + + # flag used for signaling the end of tuning + self.tuning_end_trigger = False + + model = pm.modelcontext(kwargs.get("model", None)) + initial_values = model.initial_point() + + # flag to that variance reduction is activated - forces DEMetropolisZMLDA + # to store quantities of interest in a register if True + self.mlda_variance_reduction = kwargs.pop("mlda_variance_reduction", False) + if self.mlda_variance_reduction: + # Subsampling rate of MLDA sampler one level up + self.mlda_subsampling_rate_above = kwargs.pop("mlda_subsampling_rate_above") + self.sub_counter = 0 + self.Q_last = np.nan + self.Q_reg = [np.nan] * self.mlda_subsampling_rate_above + + # call parent class __init__ + super().__init__(*args, **kwargs) + + # modify the delta function and point to model if VR is used + if self.mlda_variance_reduction: + self.model = model + self.delta_logp_factory = self.delta_logp + self.delta_logp = lambda q, q0: -self.delta_logp_factory(q0, q) + + def reset_tuning(self): + """Skips resetting of tuned sampler parameters + and history to their initial values. Allows + continuation with the same settings when + DEMetropolisZMLDA steps are done in chunks + under MLDA.""" + return + + def astep(self, q0): + + q_new, stats = super().astep(q0) + + # Add variance reduction functionality. + if self.mlda_variance_reduction: + if stats[0]["accepted"]: + self.Q_last = self.model.Q.get_value() + if self.sub_counter == self.mlda_subsampling_rate_above: + self.sub_counter = 0 + self.Q_reg[self.sub_counter] = self.Q_last + self.sub_counter += 1 + + return q_new, stats + + def stop_tuning(self): + """At the end of the tuning phase, this method + removes the first x% of the history so future + proposals are not informed by unconverged tuning + iterations. Runs only once after the end of tuning, + when the self.tuning_end_trigger flag is set to True. + """ + if self.tuning_end_trigger: + self.tuning_end_trigger = False + return super().stop_tuning() + else: + return + + +class MLDA(ArrayStepShared): + """ + Multi-Level Delayed Acceptance (MLDA) sampling step that uses coarse + approximations of a fine model to construct proposals in multiple levels. + + MLDA creates a hierarchy of MCMC chains. Chains sample from different + posteriors that ideally should be approximations of the fine (top-level) + posterior and require less computational effort to evaluate their likelihood. + + Each chain runs for a fixed number of iterations (up to subsampling_rate) and + then the last sample generated is used as a proposal for the chain in the level + above (excluding when variance reduction is used, where a random sample from + the generated sequence is used). The bottom-level chain is a MetropolisMLDA + or DEMetropolisZMLDA sampler. + + The algorithm achieves higher acceptance rate and effective sample sizes + than other samplers if the coarse models are sufficiently good approximations + of the fine one. + + Parameters + ---------- + coarse_models : list + List of coarse (multi-level) models, where the first model + is the coarsest one (level=0) and the last model is the + second finest one (level=L-1 where L is the number of levels). + Note this list excludes the model passed to the model + argument above, which is the finest available. + vars : list + List of value variables for sampler + base_sampler : string + Sampler used in the base (coarsest) chain. Can be 'Metropolis' or + 'DEMetropolisZ'. Defaults to 'DEMetropolisZ'. + base_S : standard deviation of base proposal covariance matrix + Some measure of variance to parameterize base proposal distribution + base_proposal_dist : function + Function that returns zero-mean deviates when parameterized with + S (and n). Defaults to normal. This is the proposal used in the + coarsest (base) chain, i.e. level=0. + base_scaling : scalar or array + Initial scale factor for base proposal. Defaults to 1 if base_sampler + is 'Metropolis' and to 0.001 if base_sampler is 'DEMetropolisZ'. + tune : bool + Flag for tuning in the base proposal. If base_sampler is 'Metropolis' it + should be True or False and defaults to True. Note that + this is overidden by the tune parameter in sample(). For example when calling + step=MLDA(tune=False, ...) and then sample(step=step, tune=200, ...), + tuning will be activated for the first 200 steps. If base_sampler is + 'DEMetropolisZ', it should be True. For 'DEMetropolisZ', there is a separate + argument base_tune_target which allows modifying the type of tuning. + base_tune_target: string + Defines the type of tuning that is performed when base_sampler is + 'DEMetropolisZ'. Allowable values are 'lambda, 'scaling' or None and + it defaults to 'lambda'. + base_tune_interval : int + The frequency of tuning for the base proposal. Defaults to 100 + iterations. + base_lamb : float + Lambda parameter of the base level DE proposal mechanism. Only applicable when + base_sampler is 'DEMetropolisZ'. Defaults to 2.38 / sqrt(2 * ndim) + base_tune_drop_fraction : float + Fraction of tuning steps that will be removed from the base level samplers + history when the tuning ends. Only applicable when base_sampler is + 'DEMetropolisZ'. Defaults to 0.9 - keeping the last 10% of tuning steps + for good mixing while removing 90% of potentially unconverged tuning positions. + model : PyMC Model + Optional model for sampling step. Defaults to None + (taken from context). This model should be the finest of all + multilevel models. + mode : string or `Mode` instance. + Compilation mode passed to Aesara functions + subsampling_rates : integer or list of integers + One interger for all levels or a list with one number for each level + (excluding the finest level). + This is the number of samples generated in level l-1 to + propose a sample for level l - applies to all levels excluding the + finest level). The length of the list needs to be the same as the + length of coarse_models. + base_blocked : bool + Flag to choose whether base sampler (level=0) is a + Compound MetropolisMLDA step (base_blocked=False) + or a blocked MetropolisMLDA step (base_blocked=True). + Only applicable when base_sampler='Metropolis'. + variance_reduction: bool + Calculate and store quantities of interest and quantity of interest + differences between levels to enable computing a variance-reduced + sum of the quantity of interest after sampling. In order to use + variance reduction, the user needs to do the following when defining + the PyMC model (also demonstrated in the example notebook): + - Include a `pm.MutableData()` variable with the name `Q` in the + model description of all levels. + - Use an Aesara Op to calculate the forward model (or the + combination of a forward model and a likelihood). This Op + should have a `perform()` method which (in addition to all + the other calculations), calculates the quantity of interest + and stores it to the variable `Q` of the PyMC model, + using the `set_value()` function. + When variance_reduction=True, all subchains run for a fixed number + of iterations (equal to subsampling_rates) and a random sample is + selected from the generated sequence (instead of the last sample + which is selected when variance_reduction=False). + store_Q_fine: bool + Store the values of the quantity of interest from the fine chain. + adaptive_error_model : bool + When True, MLDA will use the adaptive error model method + proposed in [Cui2012]. The method requires the likelihood of + the model to be adaptive and a forward model to be defined and + fed to the sampler. Thus, it only works when the user does + the following (also demonstrated in the example notebook): + - Include in the model definition at all levels, + the extra variable model_output, which will capture + the forward model outputs. Also include in the model + definition at all levels except the finest one, the + extra variables mu_B and Sigma_B, which will capture + the bias between different levels. All these variables + should be instantiated using the pm.MutableData method. + - Use an Aesara Op to define the forward model (and + optionally the likelihood) for all levels. The Op needs + to store the result of each forward model calculation + to the variable model_output of the PyMC model, + using the `set_value()` function. + - Define a Multivariate Normal likelihood (either using + the standard PyMC API or within an Op) which has mean + equal to the forward model output plus mu_B and covariance + equal to the model error plus Sigma_B. + Given the above, MLDA will capture and iteratively update the + bias terms internally for all level pairs and will correct + each level so that all levels' forward models aim to estimate + the finest level's forward model. + + Examples + ---------- + .. code:: ipython + >>> import pymc as pm + >>> import pymc_experimental as pmx + >>> datum = 1 + + >>> with pm.Model() as coarse_model: + ... x = pm.Normal("x", mu=0, sigma=10) + ... y = pm.Normal("y", mu=x, sigma=1, observed=datum - 0.1) + + >>> with pm.Model(): + ... x = pm.Normal("x", mu=0, sigma=10) + ... y = pm.Normal("y", mu=x, sigma=1, observed=datum) + ... step_method = pmx.step_methods.MLDA( + ... coarse_models=[coarse_model], + ... subsampling_rates=5, + ... ) + ... idata = pm.sample( + ... tune=100, draws=500, chains=2, + ... step=step_method, + ... random_seed=123, + ... ) + + >>> az.summary(idata, kind="stats") + >>> # mean sd hdi_3% hdi_97% + >>> #x 0.99 0.987 -0.734 2.992 + + References + ---------- + .. [Dodwell2019] Dodwell, Tim & Ketelsen, Chris & Scheichl, + Robert & Teckentrup, Aretha. (2019). + Multilevel Markov Chain Monte Carlo. + SIAM Review. 61. 509-545. + `link `__ + .. [Cui2012] Cui, Tiangang & Fox, Colin & O'Sullivan, Michael. + (2012). Adaptive Error Modelling in MCMC Sampling for Large + Scale Inverse Problems. + """ + + name = "mlda" + + # All levels use block sampling, + # except level 0 where the user can choose + default_blocked = True + generates_stats = True + + def __init__( + self, + coarse_models: List[Model], + vars: Optional[list] = None, + base_sampler="DEMetropolisZ", + base_S: Optional = None, + base_proposal_dist: Optional[Type[Proposal]] = None, + base_scaling: Optional = None, + tune: bool = True, + base_tune_target: str = "lambda", + base_tune_interval: int = 100, + base_lamb: Optional = None, + base_tune_drop_fraction: float = 0.9, + model: Optional[Model] = None, + mode: Optional = None, + subsampling_rates: List[int] = 5, + base_blocked: bool = False, + variance_reduction: bool = False, + store_Q_fine: bool = False, + adaptive_error_model: bool = False, + **kwargs, + ) -> None: + # this variable is used to identify MLDA objects which are + # not in the finest level (i.e. child MLDA objects) + self.is_child = kwargs.get("is_child", False) + + if not isinstance(coarse_models, list): + raise ValueError("MLDA step method cannot use coarse_models if it is not a list") + if len(coarse_models) == 0: + raise ValueError( + "MLDA step method was given an empty " + "list of coarse models. Give at least " + "one coarse model." + ) + + # assign internal state + model = pm.modelcontext(model) + initial_values = model.initial_point() + self.model = model + self.coarse_models = coarse_models + self.model_below = self.coarse_models[-1] + self.num_levels = len(self.coarse_models) + 1 + + # set up variance reduction. + self.variance_reduction = variance_reduction + self.store_Q_fine = store_Q_fine + + # check that certain requirements hold + # for the variance reduction feature to work + if self.variance_reduction or self.store_Q_fine: + if not hasattr(self.model, "Q"): + raise AttributeError( + "Model given to MLDA does not contain" + "variable 'Q'. You need to include" + "the variable in the model definition" + "for variance reduction to work or" + "for storing the fine Q." + "Use pm.MutableData() to define it." + ) + if not isinstance(self.model.Q, TensorSharedVariable): + raise TypeError( + "The variable 'Q' in the model definition is not of type " + "'TensorSharedVariable'. Use pm.MutableData() to define the" + "variable." + ) + + if self.is_child and self.variance_reduction: + # this is the subsampling rate applied to the current level + # it is stored in the level above and transferred here + self.subsampling_rate_above = kwargs.pop("subsampling_rate_above", None) + + # set up adaptive error model + self.adaptive_error_model = adaptive_error_model + + # check that certain requirements hold + # for the adaptive error model feature to work + if self.adaptive_error_model: + if not hasattr(self.model_below, "mu_B"): + raise AttributeError( + "Model below in hierarchy does not contain" + "variable 'mu_B'. You need to include" + "the variable in the model definition" + "for adaptive error model to work." + "Use pm.MutableData() to define it." + ) + if not hasattr(self.model_below, "Sigma_B"): + raise AttributeError( + "Model below in hierarchy does not contain" + "variable 'Sigma_B'. You need to include" + "the variable in the model definition" + "for adaptive error model to work." + "Use pm.MutableData() to define it." + ) + if not ( + isinstance(self.model_below.mu_B, TensorSharedVariable) + and isinstance(self.model_below.Sigma_B, TensorSharedVariable) + ): + raise TypeError( + "At least one of the variables 'mu_B' and 'Sigma_B' " + "in the definition of the below model is not of type " + "'TensorSharedVariable'. Use pm.MutableData() to define those " + "variables." + ) + + # this object is used to recursively update the mean and + # variance of the bias correction given new differences + # between levels + self.bias = RecursiveSampleMoments( + self.model_below.mu_B.get_value(), self.model_below.Sigma_B.get_value() + ) + + # this list holds the bias objects from all levels + # it is gradually constructed when MLDA objects are + # created and then shared between all levels + self.bias_all = kwargs.pop("bias_all", None) + if self.bias_all is None: + self.bias_all = [self.bias] + else: + self.bias_all.append(self.bias) + + # variables used for adaptive error model + self.last_synced_output_diff = None + self.adaptation_started = False + + # set up subsampling rates. + if isinstance(subsampling_rates, int): + self.subsampling_rates = [subsampling_rates] * len(self.coarse_models) + else: + if len(subsampling_rates) != len(self.coarse_models): + raise ValueError( + f"List of subsampling rates needs to have the same " + f"length as list of coarse models but the lengths " + f"were {len(subsampling_rates)}, {len(self.coarse_models)}" + ) + self.subsampling_rates = subsampling_rates + + self.subsampling_rate = self.subsampling_rates[-1] + self.subchain_selection = None + + # set up base sampling + self.base_sampler = base_sampler + + # VR is not compatible with compound base samplers so an automatic conversion + # to a block sampler happens here if + if self.variance_reduction and self.base_sampler == "Metropolis" and not base_blocked: + warnings.warn( + "Variance reduction is not compatible with non-blocked (compound) samplers." + "Automatically switching to a blocked Metropolis sampler." + ) + self.base_blocked = True + else: + self.base_blocked = base_blocked + + self.base_S = base_S + self.base_proposal_dist = base_proposal_dist + + if base_scaling is None: + if self.base_sampler == "Metropolis": + self.base_scaling = 1.0 + else: + self.base_scaling = 0.001 + else: + self.base_scaling = float(base_scaling) + + self.tune = tune + if not self.tune and self.base_sampler == "DEMetropolisZ": + raise ValueError( + f"The argument tune was set to False while using" + f" a 'DEMetropolisZ' base sampler. 'DEMetropolisZ' " + f" tune needs to be True." + ) + + self.base_tune_target = base_tune_target + self.base_tune_interval = base_tune_interval + self.base_lamb = base_lamb + self.base_tune_drop_fraction = float(base_tune_drop_fraction) + self.base_tuning_stats = None + + self.mode = mode + + # Process model variables + if vars is None: + vars = model.value_vars + else: + vars = [model.rvs_to_values.get(var, var) for var in vars] + vars = pm.inputvars(vars) + self.vars = vars + self.var_names = [var.name for var in self.vars] + + self.accepted = 0 + + # Construct Aesara function for current-level model likelihood + # (for use in acceptance) + shared = pm.make_shared_replacements(initial_values, vars, model) + self.delta_logp = delta_logp(initial_values, model.logp(), vars, shared) + + # Construct Aesara function for below-level model likelihood + # (for use in acceptance) + model_below = pm.modelcontext(self.model_below) + vars_below = [var for var in model_below.value_vars if var.name in self.var_names] + vars_below = pm.inputvars(vars_below) + shared_below = pm.make_shared_replacements(initial_values, vars_below, model_below) + self.delta_logp_below = delta_logp( + initial_values, model_below.logp(), vars_below, shared_below + ) + + super().__init__(vars, shared) + + # initialise complete step method hierarchy + if self.num_levels == 2: + with self.model_below: + # make sure the correct variables are selected from model_below + vars_below = [ + var for var in self.model_below.value_vars if var.name in self.var_names + ] + + # create kwargs + if self.variance_reduction: + base_kwargs = { + "mlda_subsampling_rate_above": self.subsampling_rate, + "mlda_variance_reduction": True, + } + else: + base_kwargs = {} + + if self.base_sampler == "Metropolis": + # MetropolisMLDA sampler in base level (level=0), targeting self.model_below + self.step_method_below = MetropolisMLDA( + vars=vars_below, + proposal_dist=self.base_proposal_dist, + S=self.base_S, + scaling=self.base_scaling, + tune=self.tune, + tune_interval=self.base_tune_interval, + model=None, + mode=self.mode, + blocked=self.base_blocked, + **base_kwargs, + ) + else: + # DEMetropolisZMLDA sampler in base level (level=0), targeting self.model_below + self.step_method_below = DEMetropolisZMLDA( + vars=vars_below, + S=self.base_S, + proposal_dist=self.base_proposal_dist, + lamb=self.base_lamb, + scaling=self.base_scaling, + tune=self.base_tune_target, + tune_interval=self.base_tune_interval, + tune_drop_fraction=self.base_tune_drop_fraction, + model=None, + mode=self.mode, + **base_kwargs, + ) + else: + # drop the last coarse model + coarse_models_below = self.coarse_models[:-1] + subsampling_rates_below = self.subsampling_rates[:-1] + + with self.model_below: + # make sure the correct variables are selected from model_below + vars_below = [ + var for var in self.model_below.value_vars if var.name in self.var_names + ] + + # create kwargs + if self.variance_reduction: + mlda_kwargs = { + "is_child": True, + "subsampling_rate_above": self.subsampling_rate, + } + else: + mlda_kwargs = {"is_child": True} + if self.adaptive_error_model: + mlda_kwargs = {**mlda_kwargs, **{"bias_all": self.bias_all}} + + # MLDA sampler in some intermediate level, targeting self.model_below + self.step_method_below = MLDA( + vars=vars_below, + base_S=self.base_S, + base_sampler=self.base_sampler, + base_proposal_dist=self.base_proposal_dist, + base_scaling=self.base_scaling, + tune=self.tune, + base_tune_target=self.base_tune_target, + base_tune_interval=self.base_tune_interval, + base_lamb=self.base_lamb, + base_tune_drop_fraction=self.base_tune_drop_fraction, + model=None, + mode=self.mode, + subsampling_rates=subsampling_rates_below, + coarse_models=coarse_models_below, + base_blocked=self.base_blocked, + variance_reduction=self.variance_reduction, + store_Q_fine=False, + adaptive_error_model=self.adaptive_error_model, + **mlda_kwargs, + ) + + # instantiate the recursive DA proposal. + # this is the main proposal used for + # all levels (Recursive Delayed Acceptance) + # (except for level 0 where the step method is MetropolisMLDA + # or DEMetropolisZMLDA - not MLDA) + self.proposal_dist = RecursiveDAProposal( + self.step_method_below, self.model_below, self.tune, self.subsampling_rate + ) + + # set up data types of stats. + if isinstance(self.step_method_below, MLDA): + # get the stat types from the level below if that level is MLDA + self.stats_dtypes = self.step_method_below.stats_dtypes + + else: + # otherwise, set it up from scratch. + self.stats_dtypes = [{"accept": np.float64, "accepted": bool, "tune": bool}] + + if isinstance(self.step_method_below, MetropolisMLDA): + self.stats_dtypes.append({"base_scaling": np.float64}) + elif isinstance(self.step_method_below, DEMetropolisZMLDA): + self.stats_dtypes.append({"base_scaling": np.float64, "base_lambda": np.float64}) + elif isinstance(self.step_method_below, CompoundStep): + for method in self.step_method_below.methods: + if isinstance(method, MetropolisMLDA): + self.stats_dtypes.append({"base_scaling": np.float64}) + elif isinstance(method, DEMetropolisZMLDA): + self.stats_dtypes.append( + {"base_scaling": np.float64, "base_lambda": np.float64} + ) + + # initialise necessary variables for doing variance reduction + if self.variance_reduction: + self.sub_counter = 0 + self.Q_diff = [] + if self.is_child: + self.Q_reg = [np.nan] * self.subsampling_rate_above + if self.num_levels == 2: + self.Q_base_full = [] + if not self.is_child: + for level in range(self.num_levels - 1, 0, -1): + self.stats_dtypes[0][f"Q_{level}_{level - 1}"] = object + self.stats_dtypes[0]["Q_0"] = object + + # initialise necessary variables for doing variance reduction or storing fine Q + if self.variance_reduction or self.store_Q_fine: + self.Q_last = np.nan + self.Q_diff_last = np.nan + if self.store_Q_fine and not self.is_child: + self.stats_dtypes[0][f"Q_{self.num_levels - 1}"] = object + + def astep(self, q0: RaveledVars) -> Tuple[RaveledVars, List[Dict[str, Any]]]: + """One MLDA step, given current sample q0""" + # Check if the tuning flag has been changed and if yes, + # change the proposal's tuning flag and reset self.accepted + # This is triggered by _iter_sample while the highest-level MLDA step + # method is running. It then propagates to all levels. + if self.proposal_dist.tune != self.tune: + self.proposal_dist.tune = self.tune + # set tune in sub-methods of compound stepper explicitly because + # it is not set within sample.py (only the CompoundStep's tune flag is) + if isinstance(self.step_method_below, CompoundStep): + for method in self.step_method_below.methods: + method.tune = self.tune + self.accepted = 0 + + # Set subchain_selection (which sample from the coarse chain + # is passed as a proposal to the fine chain). If variance + # reduction is used, a random sample is selected as proposal. + # If variance reduction is not used, the last sample is + # selected as proposal. + if self.variance_reduction: + self.subchain_selection = np.random.randint(0, self.subsampling_rate) + else: + self.subchain_selection = self.subsampling_rate - 1 + self.proposal_dist.subchain_selection = self.subchain_selection + + # Call the recursive DA proposal to get proposed sample + # and convert dict -> numpy array + q = self.proposal_dist(q0) + + # Evaluate MLDA acceptance log-ratio + # If proposed sample from lower levels is the same as current one, + # do not calculate likelihood, just set accept to 0.0 + if (q.data == q0.data).all(): + accept = np.float64(0.0) + skipped_logp = True + else: + # NB! The order and sign of the first term are swapped compared + # to the convention to make sure the proposal is evaluated last. + accept = -self.delta_logp(q0.data, q.data) + self.delta_logp_below(q0.data, q.data) + skipped_logp = False + + # Accept/reject sample - next sample is stored in q_new + q_new, accepted = metrop_select(accept, q, q0) + if skipped_logp: + accepted = False + + # if sample is accepted, update self.Q_last with the sample's Q value + # runs only for VR or when store_Q_fine is True + if self.variance_reduction or self.store_Q_fine: + if accepted and not skipped_logp: + self.Q_last = self.model.Q.get_value() + + # Variance reduction + if self.variance_reduction: + self.update_vr_variables(accepted, skipped_logp) + + # Adaptive error model - runs only during tuning. + if self.tune and self.adaptive_error_model: + self.update_error_estimate(accepted, skipped_logp) + + # Update acceptance counter + self.accepted += accepted + + stats = {"tune": self.tune, "accept": np.exp(accept), "accepted": accepted} + + # Save the VR statistics to the stats dictionary (only happens in the + # top MLDA level) + if (self.variance_reduction or self.store_Q_fine) and not self.is_child: + q_stats = {} + if self.variance_reduction: + m = self + for level in range(self.num_levels - 1, 0, -1): + # save the Q differences for this level and iteration + q_stats[f"Q_{level}_{level - 1}"] = np.array(m.Q_diff) + # this makes sure Q_diff is reset for + # the next iteration + m.Q_diff = [] + if level == 1: + break + m = m.step_method_below + q_stats["Q_0"] = np.array(m.Q_base_full) + m.Q_base_full = [] + if self.store_Q_fine: + q_stats["Q_" + str(self.num_levels - 1)] = np.array(self.Q_last) + stats = {**stats, **q_stats} + + # Capture the base tuning stats from the level below. + self.base_tuning_stats = [] + + if isinstance(self.step_method_below, MLDA): + self.base_tuning_stats = self.step_method_below.base_tuning_stats + elif isinstance(self.step_method_below, MetropolisMLDA): + self.base_tuning_stats.append({"base_scaling": np.mean(self.step_method_below.scaling)}) + elif isinstance(self.step_method_below, DEMetropolisZMLDA): + self.base_tuning_stats.append( + { + "base_scaling": np.mean(self.step_method_below.scaling), + "base_lambda": self.step_method_below.lamb, + } + ) + elif isinstance(self.step_method_below, CompoundStep): + # Below method is CompoundStep + for method in self.step_method_below.methods: + if isinstance(method, MetropolisMLDA): + self.base_tuning_stats.append({"base_scaling": np.mean(method.scaling)}) + elif isinstance(method, DEMetropolisZMLDA): + self.base_tuning_stats.append( + {"base_scaling": np.mean(method.scaling), "base_lambda": method.lamb} + ) + + return q_new, [stats] + self.base_tuning_stats + + def update_vr_variables(self, accepted, skipped_logp): + """Updates all the variables necessary for VR to work. + + Each level has a Q_last and Q_diff_last register which store + the Q of the last accepted MCMC sample and the difference + between the Q of the last accepted sample in this level and + the Q of the last sample in the level below. + + These registers are updated here so that they can be exported later.""" + + # if this MLDA is not at the finest level, store Q_last in a + # register Q_reg and increase sub_counter (until you reach + # the subsampling rate, at which point you make it zero). + # Q_reg will later be used by the level above to calculate differences + if self.is_child: + if self.sub_counter == self.subsampling_rate_above: + self.sub_counter = 0 + self.Q_reg[self.sub_counter] = self.Q_last + self.sub_counter += 1 + + # if MLDA is in the level above the base level, extract the + # latest set of Q values from Q_reg in the base level + # and add them to Q_base_full (which stores all the history of + # Q values from the base level) + if self.num_levels == 2: + self.Q_base_full.extend(self.step_method_below.Q_reg) + + # if the sample is accepted, update Q_diff_last with the latest + # difference between the last Q of this level and the Q of the + # proposed (selected) sample from the level below. + # If sample is not accepted, just keep the latest accepted Q_diff + if accepted and not skipped_logp: + self.Q_diff_last = self.Q_last - self.step_method_below.Q_reg[self.subchain_selection] + # Add the last accepted Q_diff to the list + self.Q_diff.append(self.Q_diff_last) + + def update_error_estimate(self, accepted, skipped_logp): + """Updates the adaptive error model estimate with + the latest accepted forward model output difference. Also + updates the model variables mu_B and Sigma_B. + + The current level estimates and stores the error + model between the current level and the level below.""" + + # only save errors when a sample is accepted (excluding skipped_logp) + if accepted and not skipped_logp: + # this is the error (i.e. forward model output difference) + # between the current level's model and the model in the level below + self.last_synced_output_diff = ( + self.model.model_output.get_value() - self.model_below.model_output.get_value() + ) + self.adaptation_started = True + + if self.adaptation_started: + # update the internal recursive bias estimator with the last saved error + self.bias.update(self.last_synced_output_diff) + # Update the model variables in the level below the current one. + # Each level has its own bias correction (i.e. bias object) that + # estimates the error between that level and the one below. + # The model variables mu_B and Signa_B of a level are the + # sum of the bias corrections of all levels below and including + # that level. This sum is updated here. + with self.model_below: + pm.set_data( + { + "mu_B": sum( + bias.get_mu() + for bias in self.bias_all[: len(self.bias_all) - self.num_levels + 2] + ) + } + ) + pm.set_data( + { + "Sigma_B": sum( + bias.get_sigma() + for bias in self.bias_all[: len(self.bias_all) - self.num_levels + 2] + ) + } + ) + + @staticmethod + def competence(var, has_grad): + """Return MLDA competence for given var/has_grad. MLDA currently works + only with continuous variables.""" + if var.dtype in pm.discrete_types: + return Competence.INCOMPATIBLE + return Competence.COMPATIBLE + + +class RecursiveSampleMoments: + """ + Iteratively constructs a sample mean + and covariance, given input samples. + + Used to capture an estimate of the mean + and covariance of the bias of an MLDA + coarse model. + """ + + def __init__(self, mu_0, sigma_0, t=1): + self.mu = mu_0 + self.sigma = sigma_0 + self.t = t + + def __call__(self): + return self.mu, self.sigma + + def get_mu(self): + """Returns the current mu value""" + return self.mu + + def get_sigma(self): + """Returns the current covariance value""" + return self.sigma + + def update(self, x): + """Updates the mean and covariance given a + new sample x""" + mu_previous = self.mu.copy() + + self.mu = (1 / (self.t + 1)) * (self.t * mu_previous + x) + + self.sigma = (self.t - 1) / self.t * self.sigma + 1 / self.t * ( + self.t * np.outer(mu_previous, mu_previous) + - (self.t + 1) * np.outer(self.mu, self.mu) + + np.outer(x, x) + ) + + self.t += 1 + + +def extract_Q_estimate(trace, levels): + """ + Returns expectation and standard error of quantity of interest, + given a trace and the number of levels in the multilevel model. + It makes use of the collapsing sum formula. Only applicable when + MLDA with variance reduction has been used for sampling. + """ + + Q_0_raw = trace.get_sampler_stats("Q_0").squeeze() + Q_0 = np.concatenate(Q_0_raw)[None, ::] + ess_Q_0 = az.ess(np.array(Q_0, np.float64)) + Q_0_var = Q_0.var() / ess_Q_0 + + Q_diff_means = [] + Q_diff_vars = [] + for l in range(1, levels): + Q_diff_raw = trace.get_sampler_stats(f"Q_{l}_{l-1}").squeeze() + Q_diff = np.hstack(Q_diff_raw)[None, ::] + ess_diff = az.ess(np.array(Q_diff, np.float64)) + Q_diff_means.append(Q_diff.mean()) + Q_diff_vars.append(Q_diff.var() / ess_diff) + + Q_mean = Q_0.mean() + sum(Q_diff_means) + Q_se = np.sqrt(Q_0_var + sum(Q_diff_vars)) + return Q_mean, Q_se + + +def subsample( + draws=1, + step=None, + start=None, + trace=None, + tune=0, + model=None, +): + """ + A stripped down version of sample(), which is called only + by the RecursiveDAProposal (which is the proposal used in the MLDA + stepper). RecursiveDAProposal only requires a small set of the input + parameters and checks normally performed by sample(), and this + function thus skips some of the code in sampler(). It directly calls + _iter_sample(), rather than sample_many(). The result is a reduced + overhead when running multiple levels in MLDA. + """ + + model = pm.modelcontext(model) + chain = 0 + random_seed = np.random.randint(2**30) + callback = None + + draws += tune + + sampling = pm.sampling._iter_sample( + draws, step, start, trace, chain, tune, model, random_seed, callback + ) + + try: + for it, (trace, _) in enumerate(sampling): + pass + except KeyboardInterrupt: + pass + + return trace + + +# Available proposal distributions for MLDA + + +class RecursiveDAProposal(Proposal): + """ + Recursive Delayed Acceptance proposal to be used with MLDA step sampler. + Recursively calls an MLDA sampler if level > 0 and calls MetropolisMLDA or + DEMetropolisZMLDA sampler if level = 0. The sampler generates + self.subsampling_rate samples and returns the sample with index + self.subchain_selection to be used as a proposal. + Results in a hierarchy of chains each of which is used to propose + samples to the chain above. + """ + + def __init__( + self, + step_method_below: Union[MLDA, MetropolisMLDA, DEMetropolisZMLDA, CompoundStep], + model_below: Model, + tune: bool, + subsampling_rate: int, + ) -> None: + + self.step_method_below = step_method_below + self.model_below = model_below + self.tune = tune + self.subsampling_rate = subsampling_rate + self.subchain_selection = None + self.tuning_end_trigger = True + + def __call__(self, q0: RaveledVars) -> RaveledVars: + """Returns proposed sample given the current sample + in dictionary form (q0_dict).""" + + # Logging is reduced to avoid extensive console output + # during multiple recursive calls of subsample() + _log = logging.getLogger("pymc") + _log.setLevel(logging.ERROR) + + # Convert current sample from RaveledVars -> + # dict before feeding to subsample. + q0_dict = DictToArrayBijection.rmap(q0) + + with self.model_below: + # Check if the tuning flag has been set to False + # in which case tuning is stopped. The flag is set + # to False (by MLDA's astep) when the burn-in + # iterations of the highest-level MLDA sampler run out. + # The change propagates to all levels. + + if self.tune: + # Subsample in tuning mode + trace = subsample( + draws=0, + step=self.step_method_below, + start=q0_dict, + tune=self.subsampling_rate, + ) + else: + # Subsample in normal mode without tuning + # If DEMetropolisZMLDA is the base sampler a flag is raised to + # make sure that history is edited after tuning ends + if self.tuning_end_trigger: + if isinstance(self.step_method_below, DEMetropolisZMLDA): + self.step_method_below.tuning_end_trigger = True + self.tuning_end_trigger = False + + trace = subsample( + draws=self.subsampling_rate, + step=self.step_method_below, + start=q0_dict, + tune=0, + ) + + # set logging back to normal + _log.setLevel(logging.NOTSET) + + # return sample with index self.subchain_selection from the generated + # sequence of length self.subsampling_rate. The index is set within + # MLDA's astep() function + q_dict = trace.point(self.subchain_selection) + + # Make sure output dict is ordered the same way as the input dict. + q_dict = Point( + {key: q_dict[key] for key in q0_dict.keys()}, + model=self.model_below, + filter_model_vars=True, + ) + + return DictToArrayBijection.map(q_dict) diff --git a/pymc_experimental/tests/test_mlda.py b/pymc_experimental/tests/test_mlda.py new file mode 100644 index 000000000..bf9647e76 --- /dev/null +++ b/pymc_experimental/tests/test_mlda.py @@ -0,0 +1,811 @@ +import aesara +import aesara.tensor as at +import arviz as az +import numpy as np +import pytest +from aesara.graph.op import Op +from pymc import ( + Binomial, + CompoundStep, + ConstantData, + DEMetropolisZ, + Metropolis, + Model, + MultivariateNormalProposal, + MutableData, + MvNormal, + Normal, + NormalProposal, + Potential, + UniformProposal, + sample, + set_data, +) +from pymc.tests.checks import close_to +from pymc.tests.models import ( + mv_simple, + mv_simple_coarse, + mv_simple_very_coarse, + simple_2model_continuous, +) + +from pymc_experimental.step_methods.mlda import MLDA, RecursiveDAProposal, extract_Q_estimate + + +def check_stat(check, idata, name): + group = idata.posterior + for (var, stat, value, bound) in check: + s = stat(group[var].sel(chain=0), axis=0) + close_to(s, value, bound, name) + + +def check_stat_dtype(step, idata): + # TODO: This check does not confirm the announced dtypes are correct as the + # sampling machinery will convert them automatically. + for stats_dtypes in getattr(step, "stats_dtypes", []): + for stat, dtype in stats_dtypes.items(): + if stat == "tune": + continue + assert idata.sample_stats[stat].dtype == np.dtype(dtype) + + +class TestMLDA: + steppers = [MLDA] + + def test_proposal_and_base_proposal_choice(self): + """Test that proposal_dist and base_proposal_dist are set as + expected by MLDA""" + _, model, _ = mv_simple() + _, model_coarse, _ = mv_simple_coarse() + with model: + sampler = MLDA(coarse_models=[model_coarse], base_sampler="Metropolis") + assert isinstance(sampler.proposal_dist, RecursiveDAProposal) + assert sampler.base_proposal_dist is None + assert isinstance(sampler.step_method_below.proposal_dist, NormalProposal) + + sampler = MLDA(coarse_models=[model_coarse]) + assert isinstance(sampler.proposal_dist, RecursiveDAProposal) + assert sampler.base_proposal_dist is None + assert isinstance(sampler.step_method_below.proposal_dist, UniformProposal) + + initial_point = model.initial_point() + initial_point_size = sum(initial_point[n.name].size for n in model.value_vars) + s = np.ones(initial_point_size) + sampler = MLDA(coarse_models=[model_coarse], base_sampler="Metropolis", base_S=s) + assert isinstance(sampler.proposal_dist, RecursiveDAProposal) + assert sampler.base_proposal_dist is None + assert isinstance(sampler.step_method_below.proposal_dist, NormalProposal) + + sampler = MLDA(coarse_models=[model_coarse], base_S=s) + assert isinstance(sampler.proposal_dist, RecursiveDAProposal) + assert sampler.base_proposal_dist is None + assert isinstance(sampler.step_method_below.proposal_dist, UniformProposal) + + s = np.diag(s) + sampler = MLDA(coarse_models=[model_coarse], base_sampler="Metropolis", base_S=s) + assert isinstance(sampler.proposal_dist, RecursiveDAProposal) + assert sampler.base_proposal_dist is None + assert isinstance(sampler.step_method_below.proposal_dist, MultivariateNormalProposal) + + sampler = MLDA(coarse_models=[model_coarse], base_S=s) + assert isinstance(sampler.proposal_dist, RecursiveDAProposal) + assert sampler.base_proposal_dist is None + assert isinstance(sampler.step_method_below.proposal_dist, UniformProposal) + + s[0, 0] = -s[0, 0] + with pytest.raises(np.linalg.LinAlgError): + MLDA(coarse_models=[model_coarse], base_sampler="Metropolis", base_S=s) + + def test_step_methods_in_each_level(self): + """Test that MLDA creates the correct hierarchy of step methods when no + coarse models are passed and when two coarse models are passed.""" + _, model, _ = mv_simple() + _, model_coarse, _ = mv_simple_coarse() + _, model_very_coarse, _ = mv_simple_very_coarse() + with model: + initial_point = model.initial_point() + initial_point_size = sum(initial_point[n.name].size for n in model.value_vars) + s = np.ones(initial_point_size) + 2.0 + sampler = MLDA( + coarse_models=[model_very_coarse, model_coarse], + base_S=s, + base_sampler="Metropolis", + ) + assert isinstance(sampler.step_method_below, MLDA) + assert isinstance(sampler.step_method_below.step_method_below, Metropolis) + assert np.all(sampler.step_method_below.step_method_below.proposal_dist.s == s) + + sampler = MLDA(coarse_models=[model_very_coarse, model_coarse], base_S=s) + assert isinstance(sampler.step_method_below, MLDA) + assert isinstance(sampler.step_method_below.step_method_below, DEMetropolisZ) + assert np.all(sampler.step_method_below.step_method_below.proposal_dist.s == s) + + def test_exceptions_coarse_models(self): + """Test that MLDA generates the expected exceptions when no coarse_models arg + is passed, an empty list is passed or when coarse_models is not a list""" + with pytest.raises(TypeError): + _, model, _ = mv_simple() + with model: + MLDA() + + with pytest.raises(ValueError): + _, model, _ = mv_simple() + with model: + MLDA(coarse_models=[]) + + with pytest.raises(ValueError): + _, model, _ = mv_simple() + with model: + MLDA(coarse_models=(model, model)) + + def test_nonparallelized_chains_are_random(self): + """Test that parallel chain are not identical when no parallelisation + is applied""" + with Model() as coarse_model: + Normal("x", 0.3, 1) + + with Model(): + Normal("x", 0, 1) + for stepper in TestMLDA.steppers: + step = stepper(coarse_models=[coarse_model]) + idata = sample(chains=2, cores=1, draws=20, tune=0, step=step, random_seed=1) + samples = idata.posterior["x"].values[:, 5] + assert len(set(samples)) == 2, f"Non parallelized {stepper} chains are identical." + + def test_parallelized_chains_are_random(self): + """Test that parallel chain are + not identical when parallelisation + is applied""" + with Model() as coarse_model: + Normal("x", 0.3, 1) + + with Model(): + Normal("x", 0, 1) + for stepper in TestMLDA.steppers: + step = stepper(coarse_models=[coarse_model]) + idata = sample(chains=2, cores=2, draws=20, tune=0, step=step, random_seed=1) + samples = idata.posterior["x"].values[:, 5] + assert len(set(samples)) == 2, f"Parallelized {stepper} chains are identical." + + def test_acceptance_rate_against_coarseness(self): + """Test that the acceptance rate increases + when the coarse model is closer to + the fine model.""" + with Model() as coarse_model_0: + Normal("x", 5.0, 1.0) + + with Model() as coarse_model_1: + Normal("x", 6.0, 2.0) + + with Model() as coarse_model_2: + Normal("x", 20.0, 5.0) + + possible_coarse_models = [coarse_model_0, coarse_model_1, coarse_model_2] + acc = [] + + with Model(): + Normal("x", 5.0, 1.0) + for coarse_model in possible_coarse_models: + step = MLDA(coarse_models=[coarse_model], subsampling_rates=3) + idata = sample(chains=1, draws=500, tune=100, step=step, random_seed=1) + acc.append(idata.sample_stats["accepted"].mean()) + assert acc[0] > acc[1] > acc[2], ( + "Acceptance rate is not " + "strictly increasing when" + "coarse model is closer to " + "fine model. Acceptance rates" + "were: {}".format(acc) + ) + + def test_mlda_non_blocked(self): + """Test that MLDA correctly creates non-blocked + compound steps in level 0 when using a Metropolis + base sampler.""" + _, model = simple_2model_continuous() + _, model_coarse = simple_2model_continuous() + with model: + for stepper in self.steppers: + assert isinstance( + stepper( + coarse_models=[model_coarse], + base_sampler="Metropolis", + base_blocked=False, + ).step_method_below, + CompoundStep, + ) + + def test_mlda_blocked(self): + """Test the type of base sampler instantiated + when switching base_blocked flag while + the base sampler is Metropolis and when + the base sampler is DEMetropolisZ.""" + _, model = simple_2model_continuous() + _, model_coarse = simple_2model_continuous() + with model: + for stepper in self.steppers: + assert not isinstance( + stepper( + coarse_models=[model_coarse], + base_sampler="Metropolis", + base_blocked=True, + ).step_method_below, + CompoundStep, + ) + assert isinstance( + stepper( + coarse_models=[model_coarse], + base_sampler="Metropolis", + base_blocked=True, + ).step_method_below, + Metropolis, + ) + assert isinstance( + stepper(coarse_models=[model_coarse]).step_method_below, + DEMetropolisZ, + ) + + def test_tuning_and_scaling_on(self): + """Test that tune and base_scaling change as expected when + tuning is on.""" + np.random.seed(1234) + ts = 100 + _, model = simple_2model_continuous() + _, model_coarse = simple_2model_continuous() + with model: + trace_0 = sample( + tune=ts, + draws=20, + step=MLDA( + coarse_models=[model_coarse], + base_sampler="Metropolis", + base_tune_interval=50, + base_scaling=100.0, + ), + chains=1, + discard_tuned_samples=False, + random_seed=1234, + return_inferencedata=False, + ) + + trace_1 = sample( + tune=ts, + draws=20, + step=MLDA( + coarse_models=[model_coarse], + base_tune_target="scaling", + base_tune_interval=50, + base_scaling=100.0, + ), + chains=1, + discard_tuned_samples=False, + random_seed=1234, + return_inferencedata=False, + ) + + trace_2 = sample( + tune=ts, + draws=20, + step=MLDA( + coarse_models=[model_coarse], + base_tune_interval=50, + base_scaling=10, + base_lamb=100.0, + ), + chains=1, + discard_tuned_samples=False, + random_seed=1234, + return_inferencedata=False, + ) + + assert trace_0.get_sampler_stats("tune", chains=0)[0] + assert trace_0.get_sampler_stats("tune", chains=0)[ts - 1] + assert not trace_0.get_sampler_stats("tune", chains=0)[ts] + assert not trace_0.get_sampler_stats("tune", chains=0)[-1] + assert trace_0.get_sampler_stats("base_scaling", chains=0)[0, 0] == 100.0 + assert trace_0.get_sampler_stats("base_scaling", chains=0)[0, 1] == 100.0 + assert trace_0.get_sampler_stats("base_scaling", chains=0)[-1, 0] < 100.0 + assert trace_0.get_sampler_stats("base_scaling", chains=0)[-1, 1] < 100.0 + + assert trace_1.get_sampler_stats("tune", chains=0)[0] + assert trace_1.get_sampler_stats("tune", chains=0)[ts - 1] + assert not trace_1.get_sampler_stats("tune", chains=0)[ts] + assert not trace_1.get_sampler_stats("tune", chains=0)[-1] + assert trace_1.get_sampler_stats("base_scaling", chains=0)[0] == 100.0 + assert trace_1.get_sampler_stats("base_scaling", chains=0)[-1] < 100.0 + + assert trace_2.get_sampler_stats("tune", chains=0)[0] + assert trace_2.get_sampler_stats("tune", chains=0)[ts - 1] + assert not trace_2.get_sampler_stats("tune", chains=0)[ts] + assert not trace_2.get_sampler_stats("tune", chains=0)[-1] + assert trace_2.get_sampler_stats("base_lambda", chains=0)[0] == 100.0 + assert trace_2.get_sampler_stats("base_lambda", chains=0)[-1] < 100.0 + + def test_tuning_and_scaling_off(self): + """Test that tuning is deactivated when sample()'s tune=0 and that + MLDA's tune=False is overridden by sample()'s tune.""" + np.random.seed(12345) + _, model = simple_2model_continuous() + _, model_coarse = simple_2model_continuous() + + ts_0 = 0 + with model: + trace_0 = sample( + tune=ts_0, + draws=100, + step=MLDA( + coarse_models=[model_coarse], + base_sampler="Metropolis", + base_tune_interval=50, + base_scaling=100.0, + tune=False, + ), + chains=1, + discard_tuned_samples=False, + random_seed=12345, + return_inferencedata=False, + ) + + ts_1 = 100 + with model: + trace_1 = sample( + tune=ts_1, + draws=20, + step=MLDA( + coarse_models=[model_coarse], + base_sampler="Metropolis", + base_tune_interval=50, + base_scaling=100.0, + tune=False, + ), + chains=1, + discard_tuned_samples=False, + random_seed=12345, + return_inferencedata=False, + ) + + assert not trace_0.get_sampler_stats("tune", chains=0)[0] + assert not trace_0.get_sampler_stats("tune", chains=0)[-1] + assert ( + trace_0.get_sampler_stats("base_scaling", chains=0)[0, 0] + == trace_0.get_sampler_stats("base_scaling", chains=0)[-1, 0] + == trace_0.get_sampler_stats("base_scaling", chains=0)[0, 1] + == trace_0.get_sampler_stats("base_scaling", chains=0)[-1, 1] + == 100.0 + ) + + assert trace_1.get_sampler_stats("tune", chains=0)[0] + assert trace_1.get_sampler_stats("tune", chains=0)[ts_1 - 1] + assert not trace_1.get_sampler_stats("tune", chains=0)[ts_1] + assert not trace_1.get_sampler_stats("tune", chains=0)[-1] + assert trace_1.get_sampler_stats("base_scaling", chains=0)[0, 0] == 100.0 + assert trace_1.get_sampler_stats("base_scaling", chains=0)[0, 1] == 100.0 + assert trace_1.get_sampler_stats("base_scaling", chains=0)[-1, 0] < 100.0 + assert trace_1.get_sampler_stats("base_scaling", chains=0)[-1, 1] < 100.0 + + ts_2 = 0 + with model: + trace_2 = sample( + tune=ts_2, + draws=100, + step=MLDA( + coarse_models=[model_coarse], + base_tune_interval=50, + base_lamb=100.0, + base_tune_target=None, + ), + chains=1, + discard_tuned_samples=False, + random_seed=12345, + return_inferencedata=False, + ) + + assert not trace_2.get_sampler_stats("tune", chains=0)[0] + assert not trace_2.get_sampler_stats("tune", chains=0)[-1] + assert ( + trace_2.get_sampler_stats("base_lambda", chains=0)[0] + == trace_2.get_sampler_stats("base_lambda", chains=0)[-1] + == trace_2.get_sampler_stats("base_lambda", chains=0)[0] + == trace_2.get_sampler_stats("base_lambda", chains=0)[-1] + == 100.0 + ) + + def test_trace_length(self): + """Check if trace length is as expected.""" + tune = 100 + draws = 50 + with Model() as coarse_model: + Normal("n", 0, 2.2, size=(3,)) + with Model(): + Normal("n", 0, 2, size=(3,)) + step = MLDA(coarse_models=[coarse_model]) + idata = sample(tune=tune, draws=draws, step=step, chains=1, discard_tuned_samples=False) + assert len(idata.warmup_posterior.draw) == tune + assert len(idata.posterior.draw) == draws + + @pytest.mark.parametrize( + "variable,has_grad,outcome", + [("n", True, 1), ("n", False, 1), ("b", True, 0), ("b", False, 0)], + ) + def test_competence(self, variable, has_grad, outcome): + """Test if competence function returns expected + results for different models""" + with Model() as pmodel: + Normal("n", 0, 2, size=(3,)) + Binomial("b", n=2, p=0.3) + assert MLDA.competence(pmodel[variable], has_grad=has_grad) == outcome + + def test_multiple_subsampling_rates(self): + """Test that when you give a single integer it is applied to all levels and + when you give a list the list is applied correctly.""" + with Model() as coarse_model_0: + Normal("n", 0, 2.2, size=(3,)) + with Model() as coarse_model_1: + Normal("n", 0, 2.1, size=(3,)) + with Model(): + Normal("n", 0, 2.0, size=(3,)) + + step_1 = MLDA(coarse_models=[coarse_model_0, coarse_model_1], subsampling_rates=3) + assert len(step_1.subsampling_rates) == 2 + assert step_1.subsampling_rates[0] == step_1.subsampling_rates[1] == 3 + + step_2 = MLDA(coarse_models=[coarse_model_0, coarse_model_1], subsampling_rates=[3, 4]) + assert step_2.subsampling_rates[0] == 3 + assert step_2.subsampling_rates[1] == 4 + + with pytest.raises(ValueError): + step_3 = MLDA( + coarse_models=[coarse_model_0, coarse_model_1], + subsampling_rates=[3, 4, 10], + ) + + def test_aem_mu_sigma(self): + """Test that AEM estimates mu_B and Sigma_B in + the coarse models of a 3-level LR example correctly""" + # create data for linear regression + if aesara.config.floatX == "float32": + p = "float32" + else: + p = "float64" + np.random.seed(123456) + size = 200 + true_intercept = 1 + true_slope = 2 + sigma = 1 + x = np.linspace(0, 1, size, dtype=p) + # y = a + b*x + true_regression_line = true_intercept + true_slope * x + # add noise + y = true_regression_line + np.random.normal(0, sigma**2, size) + s = np.identity(y.shape[0], dtype=p) + np.fill_diagonal(s, sigma**2) + + # forward model Op - here, just the regression equation + class ForwardModel(Op): + if aesara.config.floatX == "float32": + itypes = [at.fvector] + otypes = [at.fvector] + else: + itypes = [at.dvector] + otypes = [at.dvector] + + def __init__(self, x, pymc_model): + self.x = x + self.pymc_model = pymc_model + + def perform(self, node, inputs, outputs): + intercept = inputs[0][0] + x_coeff = inputs[0][1] + + temp = intercept + x_coeff * x + self.pymc_model.bias.data + with self.pymc_model: + set_data({"model_output": temp}) + outputs[0][0] = np.array(temp) + + # create the coarse models with separate biases + mout = [] + coarse_models = [] + + with Model() as coarse_model_0: + bias = ConstantData("bias", 3.5 * np.ones(y.shape, dtype=p)) + mu_B = MutableData("mu_B", -1.3 * np.ones(y.shape, dtype=p)) + Sigma_B = MutableData("Sigma_B", np.zeros((y.shape[0], y.shape[0]), dtype=p)) + model_output = MutableData("model_output", np.zeros(y.shape, dtype=p)) + Sigma_e = ConstantData("Sigma_e", s) + + # Define priors + intercept = Normal("Intercept", 0, sigma=20) + x_coeff = Normal("x", 0, sigma=20) + + theta = at.as_tensor_variable([intercept, x_coeff]) + + mout.append(ForwardModel(x, coarse_model_0)) + + # Define likelihood + likelihood = MvNormal("y", mu=mout[0](theta) + mu_B, cov=Sigma_e, observed=y) + + coarse_models.append(coarse_model_0) + + with Model() as coarse_model_1: + bias = ConstantData("bias", 2.2 * np.ones(y.shape, dtype=p)) + mu_B = MutableData("mu_B", -2.2 * np.ones(y.shape, dtype=p)) + Sigma_B = MutableData("Sigma_B", np.zeros((y.shape[0], y.shape[0]), dtype=p)) + model_output = MutableData("model_output", np.zeros(y.shape, dtype=p)) + Sigma_e = ConstantData("Sigma_e", s) + + # Define priors + intercept = Normal("Intercept", 0, sigma=20) + x_coeff = Normal("x", 0, sigma=20) + + theta = at.as_tensor_variable([intercept, x_coeff]) + + mout.append(ForwardModel(x, coarse_model_1)) + + # Define likelihood + likelihood = MvNormal("y", mu=mout[1](theta) + mu_B, cov=Sigma_e, observed=y) + + coarse_models.append(coarse_model_1) + + # fine model and inference + with Model() as model: + bias = ConstantData("bias", np.zeros(y.shape, dtype=p)) + model_output = MutableData("model_output", np.zeros(y.shape, dtype=p)) + Sigma_e = ConstantData("Sigma_e", s) + + # Define priors + intercept = Normal("Intercept", 0, sigma=20) + x_coeff = Normal("x", 0, sigma=20) + + theta = at.as_tensor_variable([intercept, x_coeff]) + + mout.append(ForwardModel(x, model)) + + # Define likelihood + likelihood = MvNormal("y", mu=mout[-1](theta), cov=Sigma_e, observed=y) + + step_mlda = MLDA(coarse_models=coarse_models, adaptive_error_model=True) + + trace_mlda = sample( + draws=100, + step=step_mlda, + chains=1, + tune=200, + discard_tuned_samples=True, + random_seed=84759238, + ) + + m0 = step_mlda.step_method_below.model_below.mu_B.get_value() + s0 = step_mlda.step_method_below.model_below.Sigma_B.get_value() + m1 = step_mlda.model_below.mu_B.get_value() + s1 = step_mlda.model_below.Sigma_B.get_value() + + assert np.allclose(m0, -3.5) + assert np.allclose(m1, -2.2) + assert np.allclose(s0, 0, atol=1e-3) + assert np.allclose(s1, 0, atol=1e-3) + + def test_variance_reduction(self): + """ + Test if the right stats are outputed when variance reduction is used in MLDA, + if the output estimates are close (VR estimate vs. standard estimate from + the first chain) and if the variance of VR is lower. Uses a linear regression + model with multiple levels where approximate levels have fewer data. + + """ + # arithmetic precision + if aesara.config.floatX == "float32": + p = "float32" + else: + p = "float64" + + # set up the model and data + seed = 12345 + np.random.seed(seed) + size = 100 + true_intercept = 1 + true_slope = 2 + sigma = 0.1 + x = np.linspace(0, 1, size, dtype=p) + # y = a + b*x + true_regression_line = true_intercept + true_slope * x + # add noise + y = true_regression_line + np.random.normal(0, sigma**2, size) + s = sigma + + x_coarse_0 = x[::3] + y_coarse_0 = y[::3] + x_coarse_1 = x[::2] + y_coarse_1 = y[::2] + + # MCMC parameters + ndraws = 200 + ntune = 100 + nsub = 3 + nchains = 1 + + # define likelihoods with different Q + class Likelihood(Op): + if aesara.config.floatX == "float32": + itypes = [at.fvector] + otypes = [at.fscalar] + else: + itypes = [at.dvector] + otypes = [at.dscalar] + + def __init__(self, x, y, pymc_model): + self.x = x + self.y = y + self.pymc_model = pymc_model + + def perform(self, node, inputs, outputs): + intercept = inputs[0][0] + x_coeff = inputs[0][1] + + temp = np.array(intercept + x_coeff * self.x, dtype=p) + with self.pymc_model: + set_data({"Q": np.array(x_coeff, dtype=p)}) + outputs[0][0] = np.array( + -(0.5 / s**2) * np.sum((temp - self.y) ** 2, dtype=p), dtype=p + ) + + # run four MLDA steppers for all combinations of + # base_sampler and forward model + for stepper in ["Metropolis", "DEMetropolisZ"]: + mout = [] + coarse_models = [] + + with Model() as coarse_model_0: + if aesara.config.floatX == "float32": + Q = MutableData("Q", np.float32(0.0)) + else: + Q = MutableData("Q", np.float64(0.0)) + + # Define priors + intercept = Normal("Intercept", true_intercept, sigma=1) + x_coeff = Normal("x", true_slope, sigma=1) + + theta = at.as_tensor_variable([intercept, x_coeff]) + + mout.append(Likelihood(x_coarse_0, y_coarse_0, coarse_model_0)) + Potential("likelihood", mout[0](theta)) + + coarse_models.append(coarse_model_0) + + with Model() as coarse_model_1: + if aesara.config.floatX == "float32": + Q = MutableData("Q", np.float32(0.0)) + else: + Q = MutableData("Q", np.float64(0.0)) + + # Define priors + intercept = Normal("Intercept", true_intercept, sigma=1) + x_coeff = Normal("x", true_slope, sigma=1) + + theta = at.as_tensor_variable([intercept, x_coeff]) + + mout.append(Likelihood(x_coarse_1, y_coarse_1, coarse_model_1)) + Potential("likelihood", mout[1](theta)) + + coarse_models.append(coarse_model_1) + + with Model() as model: + if aesara.config.floatX == "float32": + Q = MutableData("Q", np.float32(0.0)) + else: + Q = MutableData("Q", np.float64(0.0)) + + # Define priors + intercept = Normal("Intercept", true_intercept, sigma=1) + x_coeff = Normal("x", true_slope, sigma=1) + + theta = at.as_tensor_variable([intercept, x_coeff]) + + mout.append(Likelihood(x, y, model)) + Potential("likelihood", mout[-1](theta)) + + step = MLDA( + coarse_models=coarse_models, + base_sampler=stepper, + subsampling_rates=nsub, + variance_reduction=True, + store_Q_fine=True, + ) + + trace = sample( + draws=ndraws, + step=step, + chains=nchains, + tune=ntune, + cores=1, + discard_tuned_samples=True, + random_seed=seed, + return_inferencedata=False, + ) + + # get fine level stats (standard method) + Q_2 = trace.get_sampler_stats("Q_2").reshape((nchains, ndraws)) + Q_mean_standard = Q_2.mean(axis=1).mean() + Q_se_standard = np.sqrt(Q_2.var() / az.ess(np.array(Q_2, np.float64))) + + # get VR stats + Q_mean_vr, Q_se_vr = extract_Q_estimate(trace, 3) + + # check that returned values are floats and finite. + assert isinstance(Q_mean_standard, np.floating) + assert np.isfinite(Q_mean_standard) + assert isinstance(Q_mean_vr, np.floating) + assert np.isfinite(Q_mean_vr) + assert isinstance(Q_se_standard, np.floating) + assert np.isfinite(Q_se_standard) + assert isinstance(Q_se_vr, np.floating) + assert np.isfinite(Q_se_vr) + + # check consistency of QoI across levels. + Q_1_0 = np.concatenate(trace.get_sampler_stats("Q_1_0")).reshape( + (nchains, ndraws * nsub) + ) + Q_2_1 = np.concatenate(trace.get_sampler_stats("Q_2_1")).reshape((nchains, ndraws)) + # This used to be a scrict zero equality! + assert np.isclose(Q_1_0.mean(axis=1), 0.0, atol=1e-4) + assert np.isclose(Q_2_1.mean(axis=1), 0.0, atol=1e-4) + + def test_step_continuous(self): + def step_fn(C, model_coarse): + return MLDA( + coarse_models=[model_coarse], + base_S=C, + base_proposal_dist=MultivariateNormalProposal, + ) + + start, model, (mu, C) = mv_simple() + unc = np.diag(C) ** 0.5 + check = (("x", np.mean, mu, unc / 10), ("x", np.std, unc, unc / 10)) + _, model_coarse, _ = mv_simple_coarse() + with model: + step = step_fn(C, model_coarse) + idata = sample( + tune=1000, + draws=1000, + chains=1, + step=step, + start=start, + model=model, + random_seed=1, + ) + check_stat(check, idata, step.__class__.__name__) + check_stat_dtype(idata, step) + + @aesara.config.change_flags({"floatX": "float64", "warn_float64": "ignore"}) + def test_float64_MLDA(self): + data = np.random.randn(5) + + with Model() as coarse_model: + x = Normal("x", initval=np.array(1.0, dtype="float64")) + obs = Normal("obs", mu=x, sigma=1.0, observed=data + 0.5) + + with Model() as model: + x = Normal("x", initval=np.array(1.0, dtype="float64")) + obs = Normal("obs", mu=x, sigma=1.0, observed=data) + + assert x.dtype == "float64" + assert obs.dtype == "float64" + + with model: + sample(draws=10, tune=10, chains=1, step=MLDA(coarse_models=[coarse_model])) + + @aesara.config.change_flags({"floatX": "float32", "warn_float64": "warn"}) + def test_float32_MLDA(self): + data = np.random.randn(5).astype("float32") + + with Model() as coarse_model: + x = Normal("x", initval=np.array(1.0, dtype="float32")) + obs = Normal("obs", mu=x, sigma=1.0, observed=data + 0.5) + + with Model() as model: + x = Normal("x", initval=np.array(1.0, dtype="float32")) + obs = Normal("obs", mu=x, sigma=1.0, observed=data) + + assert x.dtype == "float32" + assert obs.dtype == "float32" + + with model: + sample(draws=10, tune=10, chains=1, step=MLDA(coarse_models=[coarse_model]))