diff --git a/pymc_experimental/statespace/core/representation.py b/pymc_experimental/statespace/core/representation.py index 95957fe4..e654e4d0 100644 --- a/pymc_experimental/statespace/core/representation.py +++ b/pymc_experimental/statespace/core/representation.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Tuple, Type, Union +from typing import Optional, Type, Union import numpy as np import pytensor @@ -10,7 +10,7 @@ ) floatX = pytensor.config.floatX -KeyLike = Union[Tuple[Union[str, int]], str] +KeyLike = Union[tuple[Union[str, int]], str] class PytensorRepresentation: @@ -228,8 +228,8 @@ def _update_shape(self, key: KeyLike, value: Union[np.ndarray, pt.TensorType]) - self.shapes[key] = shape def _add_time_dim_to_slice( - self, name: str, slice_: Union[List[int], Tuple[int]], n_dim: int - ) -> Tuple[int]: + self, name: str, slice_: Union[list[int], tuple[int]], n_dim: int + ) -> tuple[int]: # Case 1: There is never a time dim. No changes needed. if name in NEVER_TIME_VARYING: return slice_ diff --git a/pymc_experimental/statespace/core/statespace.py b/pymc_experimental/statespace/core/statespace.py index 5bcce7a3..29c91f6b 100644 --- a/pymc_experimental/statespace/core/statespace.py +++ b/pymc_experimental/statespace/core/statespace.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Optional, Sequence, Union import numpy as np import pandas as pd @@ -30,16 +30,13 @@ ALL_STATE_AUX_DIM, ALL_STATE_DIM, FILTER_OUTPUT_DIMS, - FILTER_OUTPUT_NAMES, FILTER_OUTPUT_TYPES, JITTER_DEFAULT, MATRIX_DIMS, MATRIX_NAMES, OBS_STATE_DIM, - SHOCK_AUX_DIM, SHOCK_DIM, SHORT_NAME_TO_LONG, - SMOOTHER_OUTPUT_NAMES, TIME_DIM, VECTOR_VALUED, ) @@ -221,23 +218,26 @@ def __init__( verbose: bool = True, measurement_error: bool = False, ): - self._fit_mode = None - self._fit_coords = None + self._fit_mode: Optional[str] = None + self._fit_coords: Optional[dict[str, Sequence[str]]] = None + self._fit_dims: Optional[dict[str, Sequence[str]]] = None + self._fit_data: Optional[pt.TensorVariable] = None + self._needs_exog_data = False self._exog_names = [] self._name_to_variable = {} + self._name_to_data = {} self.k_endog = k_endog self.k_states = k_states self.k_posdef = k_posdef self.measurement_error = measurement_error - self.data_len = None # All models contain a state space representation and a Kalman filter self.ssm = PytensorRepresentation(k_endog, k_states, k_posdef) # This will be populated with PyMC random matrices after calling _insert_random_variables - self.subbed_ssm: List[Variable] = None + self.subbed_ssm: Optional[list[pt.TensorVariable]] = None if filter_type.lower() not in FILTER_FACTORY.keys(): raise NotImplementedError( @@ -252,10 +252,16 @@ def __init__( self.make_symbolic_graph() if verbose: + # These are split into separate try-except blocks, because it will be quite rare of models to implement + # _print_data_requirements, but we still want to print the prior requirements. try: self._print_prior_requirements() except NotImplementedError: pass + try: + self._print_data_requirements() + except NotImplementedError: + pass def _print_prior_requirements(self) -> None: """ @@ -273,7 +279,24 @@ def _print_prior_requirements(self) -> None: f"{out}" ) - def _unpack_statespace_with_placeholders(self) -> Tuple: + def _print_data_requirements(self) -> None: + """ + Prints a short report to the terminal about the data needed for the model, including their names, shapes, + and named dimensions. + """ + + out = "" + for data, info in self.data_info.items(): + out += f'\t{data} -- shape: {info["shape"]}, dims: {info["dims"]}\n' + out = out.rstrip() + + _log.info( + "The following MutableData variables should be assigned to the model inside a PyMC " + f"model block: \n" + f"{out}" + ) + + def _unpack_statespace_with_placeholders(self) -> tuple[pt.TensorVariable]: """ Helper function to quickly obtain all statespace matrices in the standard order. Matrices returned by this method will include pytensor placeholders. @@ -291,7 +314,7 @@ def _unpack_statespace_with_placeholders(self) -> Tuple: return a0, P0, c, d, T, Z, R, H, Q - def unpack_statespace(self) -> Tuple: + def unpack_statespace(self) -> list[pt.TensorVariable]: """ Helper function to quickly obtain all statespace matrices in the standard order. """ @@ -306,7 +329,7 @@ def unpack_statespace(self) -> Tuple: return self.subbed_ssm @property - def param_names(self) -> List[str]: + def param_names(self) -> list[str]: """ Names of model parameters @@ -316,7 +339,17 @@ def param_names(self) -> List[str]: raise NotImplementedError("The param_names property has not been implemented!") @property - def param_info(self) -> Dict[str, Dict[str, Any]]: + def data_names(self) -> list[str]: + """ + Names of data variables expected by the model. + + This does not include the observed data series, which is automatically handled by PyMC. This property only + needs to be implemented for models that expect exogenous data. + """ + raise NotImplementedError("The data_names property has not been implemented!") + + @property + def param_info(self) -> dict[str, dict[str, Any]]: """ Information about parameters needed to declare priors @@ -331,7 +364,20 @@ def param_info(self) -> Dict[str, Dict[str, Any]]: raise NotImplementedError("The params_info property has not been implemented!") @property - def state_names(self) -> List[str]: + def data_info(self) -> dict[str, dict[str, Any]]: + """ + Information about MutableData variables that need to be declared in the PyMC model block. + + Returns a dictionary of data_name: dictionary of property-name:property description pairs. The return value is + used by the ``_print_data_requirements`` method, to print a message telling users how to define the necessary + data for the model. Each dictionary should have the following key-value pairs: + * key: "shape", value: a tuple of integers + * key: "dims", value: tuple of strings + """ + raise NotImplementedError("The data_info property has not been implemented!") + + @property + def state_names(self) -> list[str]: """ A k_states length list of strings, associated with the model's hidden states @@ -340,14 +386,14 @@ def state_names(self) -> List[str]: raise NotImplementedError("The state_names property has not been implemented!") @property - def observed_states(self) -> List[str]: + def observed_states(self) -> list[str]: """ A k_endog length list of strings, associated with the model's observed states """ raise NotImplementedError("The observed_states property has not been implemented!") @property - def shock_names(self) -> List[str]: + def shock_names(self) -> list[str]: """ A k_posdef length list of strings, associated with the model's shock processes @@ -355,7 +401,7 @@ def shock_names(self) -> List[str]: raise NotImplementedError("The shock_names property has not been implemented!") @property - def default_priors(self) -> Dict[str, Callable]: + def default_priors(self) -> dict[str, Callable]: """ Dictionary of parameter names and callable functions to construct default priors for the model @@ -365,7 +411,7 @@ def default_priors(self) -> Dict[str, Callable]: raise NotImplementedError("The default_priors property has not been implemented!") @property - def coords(self) -> Dict[str, Sequence[str]]: + def coords(self) -> dict[str, Sequence[str]]: """ PyMC model coordinates @@ -376,7 +422,7 @@ def coords(self) -> Dict[str, Sequence[str]]: raise NotImplementedError("The coords property has not been implemented!") @property - def param_dims(self) -> Dict[str, Sequence[str]]: + def param_dims(self) -> dict[str, Sequence[str]]: """ Dictionary of named dimensions for each model parameter @@ -437,6 +483,45 @@ def make_and_register_variable(self, name, shape, dtype=floatX) -> Variable: self._name_to_variable[name] = placeholder return placeholder + def make_and_register_data( + self, name: str, shape: Union[int, tuple[int]], dtype: str = floatX + ) -> Variable: + r""" + Helper function to create a pytensor symbolic variable and register it in the _name_to_data dictionary + + Parameters + ---------- + name : str + The name of the placeholder data. Must be the name of an expected data variable. + shape : int or tuple of int + Shape of the parameter + dtype : str, default pytensor.config.floatX + dtype of the parameter + + Notes + ----- + See docstring for make_and_register_variable for more details. This function is similar, but handles data + inputs instead of model parameters. + + An error is raised if the provided name has already been registered, or if the name is not present in the + ``data_names`` property. + """ + if name not in self.data_names: + raise ValueError( + f"{name} is not a model parameter. All placeholder variables should correspond to model " + f"parameters." + ) + + if name in self._name_to_data.keys(): + raise ValueError( + f"{name} is already a registered placeholder variable with shape " + f"{self._name_to_data[name].type.shape}" + ) + + placeholder = pt.tensor(name, shape=shape, dtype=dtype) + self._name_to_data[name] = placeholder + return placeholder + def make_symbolic_graph(self) -> None: """ The purpose of the make_symbolic_graph function is to hide tedious parameter allocations from the user. @@ -494,7 +579,9 @@ def make_symbolic_graph(self) -> None: """ raise NotImplementedError("The make_symbolic_statespace method has not been implemented!") - def _get_matrix_shape_and_dims(self, name: str) -> Tuple[Tuple, Tuple]: + def _get_matrix_shape_and_dims( + self, name: str + ) -> tuple[Optional[tuple[int]], Optional[tuple[str]]]: """ Get the shape and dimensions of a matrix associated with the specified name. @@ -519,9 +606,10 @@ def _get_matrix_shape_and_dims(self, name: str) -> Tuple[Tuple, Tuple]: pm_mod = modelcontext(None) dims = MATRIX_DIMS.get(name, None) dims = dims if all([dim in pm_mod.coords.keys() for dim in dims]) else None + data_len = len(self._fit_data) if name in self.kalman_filter.seq_names: - shape = (self.data_len,) + self.ssm[SHORT_NAME_TO_LONG[name]].type.shape + shape = (data_len,) + self.ssm[SHORT_NAME_TO_LONG[name]].type.shape dims = (TIME_DIM,) + dims else: shape = self.ssm[SHORT_NAME_TO_LONG[name]].type.shape @@ -530,7 +618,11 @@ def _get_matrix_shape_and_dims(self, name: str) -> Tuple[Tuple, Tuple]: return shape, dims - def _get_output_shape_and_dims(self, idata: InferenceData, filter_output: str) -> Tuple: + def _get_output_shape_and_dims( + self, idata: InferenceData, filter_output: str + ) -> tuple[ + Optional[tuple[int]], Optional[tuple[int]], Optional[tuple[str]], Optional[tuple[str]] + ]: """ Get the shapes and dimensions of the output variables from the provided InferenceData. @@ -583,7 +675,7 @@ def _get_output_shape_and_dims(self, idata: InferenceData, filter_output: str) - return mu_shape, cov_shape, mu_dims, cov_dims - def _insert_random_variables(self) -> List[Variable]: + def _insert_random_variables(self): """ Replace pytensor symbolic variables with PyMC random variables. @@ -630,13 +722,49 @@ def _insert_random_variables(self) -> List[Variable]: + ", ".join(missing_params) ) + excess_params = list(set(found_params) - set(self.param_names)) + if len(excess_params) > 0: + raise ValueError( + "The following parameters were found in the PyMC model but are not required by the statespace model: " + + ", ".join(excess_params) + ) + matrices = list(self._unpack_statespace_with_placeholders()) - replacement_dict = { - var: pt.atleast_1d(pymc_model[name]) for name, var in self._name_to_variable.items() - } + + replacement_dict = {var: pymc_model[name] for name, var in self._name_to_variable.items()} self.subbed_ssm = graph_replace(matrices, replace=replacement_dict, strict=True) - def _register_matrices_with_pymc_model(self) -> List[pt.TensorVariable]: + def _insert_data_variables(self): + """ + Replace symbolic pytensor variables with PyMC data containers. + + Only used when models require exogenous data. The observed data is not added to the model using this method! + """ + + try: + data_names = self.data_names + except NotImplementedError: + return + + pymc_model = modelcontext(None) + found_data = [] + with pymc_model: + for data_name in data_names: + data = getattr(pymc_model, data_name, None) + if data: + found_data.append(data.name) + + missing_data = list(set(data_names) - set(found_data)) + if len(missing_data) > 0: + raise ValueError( + "The following required exogenous data were not found in the PyMC model: " + + ", ".join(missing_data) + ) + + replacement_dict = {data: pymc_model[name] for name, data in self._name_to_data.items()} + self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=True) + + def _register_matrices_with_pymc_model(self) -> list[pt.TensorVariable]: """ Add all statespace matrices to the PyMC model currently on the context stack as pm.Deterministic nodes, and adds named dimensions if they are found. @@ -644,7 +772,7 @@ def _register_matrices_with_pymc_model(self) -> List[pt.TensorVariable]: Returns ------- registered_matrices: list of pt.TensorVariable - List of statespace matrices, wrapped in pm.Deterministic + list of statespace matrices, wrapped in pm.Deterministic """ pm_mod = modelcontext(None) @@ -667,6 +795,26 @@ def _register_matrices_with_pymc_model(self) -> List[pt.TensorVariable]: return registered_matrices + @staticmethod + def _register_kalman_filter_outputs_with_pymc_model(outputs: tuple[pt.TensorVariable]) -> None: + mod = modelcontext(None) + states, covs = outputs[:4], outputs[4:] + + state_names = ["filtered_state", "predicted_state", "observed_state", "smoothed_state"] + cov_names = [ + "filtered_covariance", + "predicted_covariance", + "observed_covariance", + "smoothed_covariance", + ] + + with mod: + for state, name in zip(states, state_names): + pm.Deterministic(name, state, dims=FILTER_OUTPUT_DIMS.get(name, None)) + + for cov, name in zip(covs, cov_names): + pm.Deterministic(name, cov, dims=FILTER_OUTPUT_DIMS.get(name, None)) + def add_exogenous(self, exog: pt.TensorVariable) -> None: """ Add an exogenous process to the statespace model @@ -701,7 +849,7 @@ def add_exogenous(self, exog: pt.TensorVariable) -> None: elif exog.ndim > 2: raise ValueError(f"Exogenous data must be at most 2d, found {exog.ndim} dimensions") - # Need to specifically ask for the time dim + # Need to specifically ask for the time dim (last one) when slicing into self.ssm d = self.ssm["obs_intercept", :, :] self.ssm["obs_intercept"] = d + exog @@ -715,14 +863,12 @@ def build_statespace_graph( mode: Optional[str] = None, missing_fill_value: Optional[float] = None, cov_jitter: Optional[float] = JITTER_DEFAULT, - return_updates: bool = False, - include_smoother: bool = True, + save_kalman_filter_outputs_in_idata: bool = False, ) -> None: """ Given a parameter vector `theta`, constructs the full computational graph describing the state space model and the associated log probability of the data. Hidden states and log probabilities are computed via the Kalman - Filter. All statespace matrices, as well as Kalman Filter outputs, will be added to the PyMC model on the - context stack as pm.Deterministic variables. + Filter. Parameters ---------- @@ -760,26 +906,17 @@ def build_statespace_graph( - The Univariate Filter is more robust than other filters, and can tolerate a lower jitter value - return_updates : bool, optional, default=False - If True, the method will return the update dictionary used by pytensor.Scan to update random variables. - Only useful for diagnostic purposes. - - include_smoother : bool, optional, default=True - If True, the Kalman smoother will be included in the computation graph to generate smoothed states - and covariances. These will be added to the PyMC model on the context stack as pm.Deterministic variables - - Returns - ------- - None or Dict - If `return_updates` is True, the method will return a dictionary of updates from the Kalman filter. - If `return_updates` is False, the method will return None. + save_kalman_filter_outputs_in_idata: bool, optional, default=False + If True, Kalman Filter outputs will be saved in the model as deterministics. Useful for debugging, but + should not be necessary for the majority of users. """ pm_mod = modelcontext(None) self._insert_random_variables() + self._insert_data_variables() + obs_coords = pm_mod.coords.get(OBS_STATE_DIM, None) - self.data_len = data.shape[0] data, nan_mask = register_data_with_pymc( data, n_obs=self.ssm.k_endog, @@ -788,53 +925,29 @@ def build_statespace_graph( missing_fill_value=missing_fill_value, ) - registered_matrices = self._register_matrices_with_pymc_model() - filter_outputs = self.kalman_filter.build_graph( pt.as_tensor_variable(data), - *registered_matrices, + *self.unpack_statespace(), mode=mode, missing_fill_value=missing_fill_value, - return_updates=return_updates, cov_jitter=cov_jitter, ) - if return_updates: - outputs, updates = filter_outputs - else: - outputs = filter_outputs - + outputs = filter_outputs logp = outputs.pop(-1) states, covs = outputs[:3], outputs[3:] filtered_states, predicted_states, observed_states = states filtered_covariances, predicted_covariances, observed_covariances = covs - - outputs_to_use = [ - filtered_states, - predicted_states, - filtered_covariances, - predicted_covariances, - ] - names_to_use = FILTER_OUTPUT_NAMES.copy() - - if include_smoother: - smoothed_states, smoothed_covariances = self._build_smoother_graph( - filtered_states, - filtered_covariances, - registered_matrices, - mode=mode, - cov_jitter=cov_jitter, + if save_kalman_filter_outputs_in_idata: + smooth_states, smooth_covariances = self._build_smoother_graph( + filtered_states, filtered_covariances, self.unpack_statespace(), mode=mode ) - outputs_to_use += [smoothed_states, smoothed_covariances] - names_to_use += SMOOTHER_OUTPUT_NAMES.copy() + all_kf_outputs = states + [smooth_states] + covs + [smooth_covariances] + self._register_kalman_filter_outputs_with_pymc_model(all_kf_outputs) - for output, name in zip(outputs_to_use, names_to_use): - dims = FILTER_OUTPUT_DIMS.get(name, None) - dims = dims if all([dim in pm_mod.coords.keys() for dim in dims]) else None - pm.Deterministic(name, output, dims=dims) - - obs_dims = FILTER_OUTPUT_DIMS.get("obs", None) + obs_dims = FILTER_OUTPUT_DIMS["obs"] obs_dims = obs_dims if all([dim in pm_mod.coords.keys() for dim in obs_dims]) else None + SequenceMvNormal( "obs", mus=observed_states, @@ -845,10 +958,9 @@ def build_statespace_graph( ) self._fit_coords = pm_mod.coords.copy() + self._fit_dims = pm_mod.named_vars_to_dims.copy() self._fit_mode = mode - - if return_updates: - return updates + self._fit_data = data def _build_smoother_graph( self, @@ -895,43 +1007,78 @@ def _build_smoother_graph( smooth_states, smooth_covariances = self.kalman_smoother.build_graph( T, R, Q, filtered_states, filtered_covariances, mode=mode, cov_jitter=cov_jitter ) + smooth_states.name = "smooth_states" + smooth_covariances.name = "smooth_covariances" return smooth_states, smooth_covariances - def _build_dummy_graph(self, skip_matrices=None): + def _build_dummy_graph(self) -> None: """ Build a dummy computation graph for the state space model matrices. - This method creates "dummy" pm.Flat variables representing the matrices used in the state space model. - The shape and dimensions of each matrix are determined based on the model specifications. These variables - are used by the `sample_posterior`, `forecast`, and `impulse_response_functions` methods. + This method creates "dummy" pm.Flat variables representing the deep parameters used in the state space model. - Parameters - ---------- - skip_matrices : Optional[List[str]], default=None - A list of matrix names to skip during the dummy graph construction. If provided, the matrices - specified in this list will not be created in the dummy graph. + Returns + ------- + list[pm.Flat] + A list of pm.Flat variables representing all parameters estimated by the model. + """ + for name in self.param_names: + pm.Flat( + name, + shape=self._name_to_variable[name].type.shape, + dims=self._fit_dims.get(name, None), + ) + + def _kalman_filter_outputs_from_dummy_graph( + self, + ) -> tuple[list[pt.TensorVariable], list[tuple[pt.TensorVariable, pt.TensorVariable]]]: + """ + Builds a Kalman filter graph using "dummy" pm.Flat distributions for the model variables and sorts the returns + into (mean, covariance) pairs for each of filtered, predicted, and smoothed output. Returns ------- - List[pm.Flat] - A list of pm.Flat variables representing the dummy matrices used in the state space model. The return - order is always x0, P0, c, d, T, Z, R, H, Q, skipping any matrices indicated in `skip_matrices`. + matrices: list of tensors + Statespace matrices with dummy parameters. + + grouped_outputs: list of tuple of tensors + A list of tuples, each containing the mean and covariance of the filtered, predicted, and smoothed states. """ - modelcontext(None) - if skip_matrices is None: - skip_matrices = [] + self._build_dummy_graph() + x0, P0, c, d, T, Z, R, H, Q = self.unpack_statespace() + + filter_outputs = self.kalman_filter.build_graph( + pt.as_tensor_variable(self._fit_data), + x0, + P0, + c, + d, + T, + Z, + R, + H, + Q, + mode=self._fit_mode, + ) - matrices = [] - for name in MATRIX_NAMES: - if name in skip_matrices: - continue + filter_outputs.pop(-1) + states, covariances = filter_outputs[:3], filter_outputs[3:] - shape, dims = self._get_matrix_shape_and_dims(name) - x = pm.Flat(f"{name}", shape=shape, dims=dims) - matrices.append(x) + filtered_states, predicted_states, _ = states + filtered_covariances, predicted_covariances, _ = covariances - return matrices + [smoothed_states, smoothed_covariances] = self.kalman_smoother.build_graph( + T, R, Q, filtered_states, filtered_covariances, mode=self._fit_mode + ) + + grouped_outputs = [ + (filtered_states, filtered_covariances), + (predicted_states, predicted_covariances), + (smoothed_states, smoothed_covariances), + ] + + return [x0, P0, c, d, T, Z, R, H, Q], grouped_outputs def _sample_conditional( self, idata: InferenceData, group: str, random_seed: Optional[RandomState] = None, **kwargs @@ -940,7 +1087,6 @@ def _sample_conditional( Common functionality shared between `sample_conditional_prior` and `sample_conditional_posterior`. See those methods for details. - Parameters ---------- idata : InferenceData @@ -964,40 +1110,50 @@ def _sample_conditional( _verify_group(group) group_idata = getattr(idata, group) - filters_to_use = ["filtered", "predicted", "smoothed"] with pm.Model(coords=self._fit_coords): - d, Z, H = self._build_dummy_graph(skip_matrices=["x0", "P0", "c", "T", "R", "Q"]) + [ + x0, + P0, + c, + d, + T, + Z, + R, + H, + Q, + ], grouped_outputs = self._kalman_filter_outputs_from_dummy_graph() - for filter_output in filters_to_use: - mu_shape, cov_shape, mu_dims, cov_dims = self._get_output_shape_and_dims( - group_idata, filter_output - ) + for name, (mu, cov) in zip(FILTER_OUTPUT_TYPES, grouped_outputs): + dummy_ll = pt.zeros_like(mu) - mus = pm.Flat(f"{filter_output}_state", shape=mu_shape, dims=mu_dims) - covs = pm.Flat(f"{filter_output}_covariance", shape=cov_shape, dims=cov_dims) - latent_states = SequenceMvNormal( - f"{filter_output}_{group}", - mus=mus, - covs=covs, - logp=pt.zeros(mus.shape[0]), - dims=mu_dims, + state_dims = ( + (TIME_DIM, ALL_STATE_DIM) + if all([dim in self._fit_coords for dim in [TIME_DIM, ALL_STATE_DIM]]) + else (None, None) ) - - obs_mu, obs_covs = self._observed_mu_cov_from_latent(d, Z, H, latent_states) - - # If there are dims on the latent dim, we can use the defaults to put them on the observed too obs_dims = ( (TIME_DIM, OBS_STATE_DIM) - if not all([x is None for x in mu_dims]) + if all([dim in self._fit_coords for dim in [TIME_DIM, OBS_STATE_DIM]]) else (None, None) ) SequenceMvNormal( - f"{filter_output}_{group}_observed", + f"{name}_{group}", + mus=mu, + covs=cov, + logp=dummy_ll, + dims=state_dims, + ) + + obs_mu = (Z @ mu[..., None]).squeeze(-1) + obs_cov = Z @ cov @ pt.swapaxes(Z, -2, -1) + H + + SequenceMvNormal( + f"{name}_{group}_observed", mus=obs_mu, - covs=obs_covs, - logp=pt.zeros(obs_mu.shape[0]), + covs=obs_cov, + logp=dummy_ll, dims=obs_dims, ) @@ -1005,7 +1161,7 @@ def _sample_conditional( group_idata, var_names=[ f"{name}_{group}{suffix}" - for name in filters_to_use + for name in FILTER_OUTPUT_TYPES for suffix in ["", "_observed"] ], compile_kwargs={"mode": get_mode(self._fit_mode)}, @@ -1087,7 +1243,8 @@ def _sample_unconditional( dims = [TIME_DIM, ALL_STATE_DIM, OBS_STATE_DIM] with pm.Model(coords=temp_coords if dims is not None else None): - matrices = [x0, P0, c, d, T, Z, R, H, Q] = self._build_dummy_graph() + self._build_dummy_graph() + matrices = [x0, P0, c, d, T, Z, R, H, Q] = self.unpack_statespace() if not self.measurement_error: H_jittered = pm.Deterministic( @@ -1095,7 +1252,7 @@ def _sample_unconditional( ) matrices = [x0, P0, c, d, T, Z, R, H_jittered, Q] - _ = LinearGaussianStateSpace( + LinearGaussianStateSpace( group, *matrices, steps=steps, @@ -1228,9 +1385,9 @@ def sample_unconditional_prior( def sample_unconditional_posterior( self, - idata, - steps=None, - use_data_time_dim=False, + idata: InferenceData, + steps: Optional[int] = None, + use_data_time_dim: bool = False, random_seed: Optional[RandomState] = None, **kwargs, ) -> InferenceData: @@ -1397,14 +1554,20 @@ def forecast( cov_dims = ["data_time"] + cov_dims[1:] with pm.Model(coords=temp_coords): - c, d, T, Z, R, H, Q = self._build_dummy_graph(skip_matrices=["x0", "P0"]) - if not self.measurement_error: - H = pm.Deterministic( - "H_jittered", pt.specify_shape(stabilize(H), (self.k_endog, self.k_endog)) - ) + [ + x0, + P0, + c, + d, + T, + Z, + R, + H, + Q, + ], grouped_outputs = self._kalman_filter_outputs_from_dummy_graph() + group_idx = FILTER_OUTPUT_TYPES.index(filter_output) - mu = pm.Flat(f"{filter_output}_state", shape=mu_shape, dims=mu_dims) - cov = pm.Flat(f"{filter_output}_covariance", shape=cov_shape, dims=cov_dims) + mu, cov = grouped_outputs[group_idx] x0 = pm.Deterministic( "x0_slice", mu[t0_idx], dims=mu_dims[1:] if mu_dims is not None else None @@ -1443,7 +1606,8 @@ def forecast( def impulse_response_function( self, idata, - steps: int = None, + n_steps: int = 40, + use_posterior_cov: bool = True, shock_size: Optional[Union[float, np.ndarray]] = None, shock_cov: Optional[np.ndarray] = None, shock_trajectory: Optional[np.ndarray] = None, @@ -1463,27 +1627,39 @@ def impulse_response_function( idata : az.InferenceData An Arviz InferenceData object containing the posterior distribution over model parameters. - steps : Optional[int], default=None - The number of time steps to calculate the impulse response. If not provided, it defaults to 40, unless - a shock trajectory is provided. + n_steps: int + The number of time steps to calculate the impulse response. Default is 40. + + If `shock_trajectory` is provided, the length of the shock trajectory will override this value. + + use_posterior_cov: bool, default=True + Whether to use the covariance matrix of the posterior distribution to generate the impulse response. + + Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified. shock_size : Optional[Union[float, np.ndarray]], default=None The size of the shock applied to the system. If specified, it will create a covariance matrix for the shock with diagonal elements equal to `shock_size`. If float, the diagonal will be filled with `shock_size`. If an array, `shock_size` must match the number of shocks in the state space model. + Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified. + shock_cov : Optional[np.ndarray], default=None A user-specified covariance matrix for the shocks. It should be a 2D numpy array with dimensions (n_shocks, n_shocks), where n_shocks is the number of shocks in the state space model. + Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified. + shock_trajectory : Optional[np.ndarray], default=None A pre-defined trajectory of shocks applied to the system. It should be a 2D numpy array with dimensions (n, n_shocks), where n is the number of time steps and k_posdef is the number of shocks in the state space model. + Only one of `use_posterior_cov`, `shock_cov`, `shock_size`, or `shock_trajectory` can be specified. + orthogonalize_shocks : bool, default=False If True, orthogonalize the shocks using Cholesky decomposition when generating the impulse - response. This option is only relevant when `shock_trajectory` is not provided. + response. This option is ignored if `shock_trajectory` or `shock_size` are used. random_seed : int, RandomState or Generator, optional Seed for the random number generator. @@ -1497,14 +1673,20 @@ def impulse_response_function( An Arviz InferenceData object containing impulse response function in a variable named "irf". """ options = [shock_size, shock_cov, shock_trajectory] - n_nones = sum(x is not None for x in options) - sample_posterior_Q = n_nones == 0 + n_options = sum(x is not None for x in options) + Q = None # No covariance matrix needed if a trajectory is provided. Will be overwritten later if needed. - if n_nones > 1: + if n_options > 1: raise ValueError("Specify exactly 0 or 1 of shock_size, shock_cov, or shock_trajectory") + elif n_options == 1: + # If the user passed an alternative parameterization for the shocks of the IRF, don't use the posterior + use_posterior_cov = False + if shock_trajectory is not None: + # Validate the shock trajectory n, k = shock_trajectory.shape steps = n + if k != self.k_posdef: raise ValueError( "If shock_trajectory is provided, there must be a trajectory provided for each shock. " @@ -1515,43 +1697,30 @@ def impulse_response_function( "Both steps and shock_trajectory were provided but do not agree. Length of " "shock_trajectory will take priority, and steps will be ignored." ) - steps = n + n_steps = n # Overwrite steps with the length of the shock trajectory shock_trajectory = pt.as_tensor_variable(shock_trajectory) - if steps is None: - steps = 40 - simulation_coords = self._fit_coords.copy() - simulation_coords[TIME_DIM] = np.arange(steps, dtype="int") + simulation_coords[TIME_DIM] = np.arange(n_steps, dtype="int") with pm.Model(coords=simulation_coords): + self._build_dummy_graph() + P0, _, c, d, T, Z, R, H, post_Q = self.unpack_statespace() x0 = pm.Deterministic("x0_new", pt.zeros(self.k_states), dims=[ALL_STATE_DIM]) - Q = None - if sample_posterior_Q: - matrices = self._build_dummy_graph(skip_matrices=["x0"]) - # x0 was not loaded into the model - P0, c, _, T, _, R, _, Q = matrices - - else: - matrices = self._build_dummy_graph(skip_matrices=["x0", "Q"]) - - # Neither x0 nor Q was loaded into the model - P0, c, _, T, _, R, _ = matrices - - if shock_cov is not None: - Q = pm.Deterministic( - "Q_new", pt.as_tensor_variable(shock_cov), dims=[SHOCK_DIM, SHOCK_AUX_DIM] - ) + if use_posterior_cov: + Q = post_Q + if orthogonalize_shocks: + Q = pt.linalg.cholesky(Q) + elif shock_cov is not None: + Q = pt.as_tensor_variable(shock_cov) + if orthogonalize_shocks: + Q = pt.linalg.cholesky(Q) if shock_trajectory is None: - shock_trajectory = pt.zeros((steps, self.k_posdef)) + shock_trajectory = pt.zeros((n_steps, self.k_posdef)) if Q is not None: - if orthogonalize_shocks: - Q = pt.linalg.cholesky(Q) - init_shock = pm.MvNormal( - "initial_shock", mu=0, cov=stabilize(Q, JITTER_DEFAULT), dims=[SHOCK_DIM] - ) + init_shock = pm.MvNormal("initial_shock", mu=0, cov=Q, dims=[SHOCK_DIM]) else: init_shock = pm.Deterministic( "initial_shock", pt.as_tensor_variable(shock_size), dims=[SHOCK_DIM] @@ -1570,15 +1739,15 @@ def irf_step(shock, x, c, T, R): sequences=[shock_trajectory], outputs_info=[x0], non_sequences=[c, T, R], - n_steps=steps, + n_steps=n_steps, strict=True, mode=self._fit_mode, ) - irf = pm.Deterministic("irf", irf, dims=[TIME_DIM, ALL_STATE_DIM]) + pm.Deterministic("irf", irf, dims=[TIME_DIM, ALL_STATE_DIM]) - compile_kwargs = kwargs.get("compile_kwargs", None) - if compile_kwargs is None: + compile_kwargs = kwargs.get("compile_kwargs", {}) + if "mode" not in compile_kwargs.keys(): compile_kwargs = {"mode": self._fit_mode} else: mode = compile_kwargs.get("mode") @@ -1612,7 +1781,8 @@ def _sort_obs_inputs_by_time_varying(self, d, Z): return seqs, non_seqs - def _sort_obs_scan_args(self, args): + @staticmethod + def _sort_obs_scan_args(args): args = list(args) # If a matrix is time-varying, pytensor will put a [t] on the name @@ -1624,47 +1794,3 @@ def _sort_obs_scan_args(self, args): ordered_args.append(args[idx]) return ordered_args - - def _observed_mu_cov_from_latent(self, d, Z, H, latent_states): - """ - Given a PyMC Random Variable of latent states, construct an RV of for the observed states - - This function constructs an observed mean from the latent mean by applying the observation equation: - - .. math:: - \begin{align} y_t &= Z_t x_t + d_t + \\eta_t & \\eta &\\sim N(0, H_t) \\end{align} - - It also has some logic to handle time-varying matrices Z, d, or H. - - Parameters - ---------- - latent_states: RandomVariable - SequenceMvNormal of latent states - - Returns - ------- - observed_states: RandomVariable - SequenceMvNormal of observed states - """ - seqs, non_seqs = self._sort_obs_inputs_by_time_varying(d, Z) - seqs = [latent_states] + seqs - - def obs_step(*args): - x, *dZ = args - d, Z = self._sort_obs_scan_args(dZ) - - return d + Z @ x - - obs_mu, _ = pytensor.scan( - obs_step, sequences=seqs, non_sequences=non_seqs, strict=False, mode=self._fit_mode - ) - - obs_covs = ( - H - if "H" in self.kalman_filter.seq_names - else pt.expand_dims( - pt.specify_shape(stabilize(H, JITTER_DEFAULT), H.type.shape), (0,) - ).repeat(obs_mu.shape[0], axis=0) - ) - - return obs_mu, obs_covs diff --git a/pymc_experimental/statespace/filters/distributions.py b/pymc_experimental/statespace/filters/distributions.py index 45eedd90..8a75e103 100644 --- a/pymc_experimental/statespace/filters/distributions.py +++ b/pymc_experimental/statespace/filters/distributions.py @@ -290,16 +290,16 @@ def dist(cls, a0, P0, c, d, T, Z, R, H, Q, *, steps=None, **kwargs): return latent_states, obs_states -class SequenceMvNormalRV(SymbolicRandomVariable): +class KalmanFilterRV(SymbolicRandomVariable): default_output = 1 - _print_name = ("SequenceMvNormal", "\\operatorname{SequenceMvNormal}") + _print_name = ("KalmanFilter", "\\operatorname{KalmanFilter}") def update(self, node: Node): return {node.inputs[-1]: node.outputs[0]} class SequenceMvNormal(Continuous): - rv_op = SequenceMvNormalRV + rv_op = KalmanFilterRV def __new__(cls, *args, **kwargs): support_shape = get_support_shape( @@ -336,6 +336,7 @@ def rv_op(cls, mus, covs, logp, steps, support_shape, size=None): else: batch_size = support_shape + # mus_, covs_ = mus.type(), covs.type() mus_, covs_, support_shape_ = mus.type(), covs.type(), support_shape.type() steps_ = steps.type() logp_ = logp.type() @@ -352,7 +353,7 @@ def step(mu, cov, rng): (seq_mvn_rng,) = tuple(updates.values()) - mvn_seq_op = SequenceMvNormalRV( + mvn_seq_op = KalmanFilterRV( inputs=[mus_, covs_, logp_, steps_], outputs=[seq_mvn_rng, mvn_seq], ndim_supp=2 ) @@ -360,7 +361,7 @@ def step(mu, cov, rng): return mvn_seq -@_logprob.register(SequenceMvNormalRV) +@_logprob.register(KalmanFilterRV) def sequence_mvnormal_logp(op, values, mus, covs, logp, steps, rng, **kwargs): return check_parameters( logp, diff --git a/pymc_experimental/statespace/filters/kalman_filter.py b/pymc_experimental/statespace/filters/kalman_filter.py index 62aaea14..bf17d743 100644 --- a/pymc_experimental/statespace/filters/kalman_filter.py +++ b/pymc_experimental/statespace/filters/kalman_filter.py @@ -1,5 +1,5 @@ from abc import ABC -from typing import List, Optional, Tuple +from typing import Optional import numpy as np import pytensor @@ -49,11 +49,11 @@ def __init__(self, mode=None): mode : str or None The mode used for Pytensor compilation. - seq_names : List[str] + seq_names : list[str] A list of name representing time-varying statespace matrices. That is, inputs that will need to be provided to the `sequences` argument of `pytensor.scan` - non_seq_names : List[str] + non_seq_names : list[str] A list of names representing static statespace matrices. That is, inputs that will need to be provided to the `non_sequences` argument of `pytensor.scan` @@ -68,8 +68,8 @@ def __init__(self, mode=None): """ self.mode: str = mode - self.seq_names: List[str] = [] - self.non_seq_names: List[str] = [] + self.seq_names: list[str] = [] + self.non_seq_names: list[str] = [] self.n_states = None self.n_posdef = None @@ -121,8 +121,8 @@ def check_params(self, data, a0, P0, c, d, T, Z, R, H, Q): @staticmethod def add_check_on_time_varying_shapes( - data: TensorVariable, sequence_params: List[TensorVariable] - ) -> List[Variable]: + data: TensorVariable, sequence_params: list[TensorVariable] + ) -> list[Variable]: """ Insert a check that time-varying matrices match the data shape to the computational graph. @@ -134,12 +134,12 @@ def add_check_on_time_varying_shapes( data : TensorVariable The tensor representing the data. - sequence_params : List[TensorVariable] + sequence_params : list[TensorVariable] A list of tensors to be provided to `pytensor.scan` as `sequences`. Returns ------- - List[TensorVariable] + list[TensorVariable] A list of tensors wrapped in an `Assert` `Op` that checks the shape of the 0th dimension on each is equal to the shape of the 0th dimension on the data. @@ -153,7 +153,7 @@ def add_check_on_time_varying_shapes( return params_with_assert - def unpack_args(self, args) -> Tuple: + def unpack_args(self, args) -> tuple: """ The order of inputs to the inner scan function is not known, since some, all, or none of the input matrices can be time varying. The order arguments are fed to the inner function is sequences, outputs_info, @@ -203,7 +203,7 @@ def build_graph( return_updates=False, missing_fill_value=None, cov_jitter=None, - ) -> List[TensorVariable]: + ) -> list[TensorVariable]: """ Construct the computation graph for the Kalman filter. See [1] for details. @@ -278,7 +278,7 @@ def build_graph( return filter_results, updates return filter_results - def _postprocess_scan_results(self, results, a0, P0, n) -> List[TensorVariable]: + def _postprocess_scan_results(self, results, a0, P0, n) -> list[TensorVariable]: """ Transform the values returned by the Kalman Filter scan into a form expected by users. In particular: 1. Append the initial state and covariance matrix to their respective Kalman predictions. This matches the @@ -350,7 +350,7 @@ def _postprocess_scan_results(self, results, a0, P0, n) -> List[TensorVariable]: def handle_missing_values( self, y, Z, H - ) -> Tuple[TensorVariable, TensorVariable, TensorVariable, float]: + ) -> tuple[TensorVariable, TensorVariable, TensorVariable, float]: """ This function handles missing values in the observation data `y` and adjusts the design matrix `Z` and the observation noise covariance matrix `H` accordingly. Missing values are replaced with zeros to prevent @@ -398,7 +398,7 @@ def handle_missing_values( return y_masked, Z_masked, H_masked, all_nan_flag @staticmethod - def predict(a, P, c, T, R, Q) -> Tuple[TensorVariable, TensorVariable]: + def predict(a, P, c, T, R, Q) -> tuple[TensorVariable, TensorVariable]: """ Perform the prediction step of the Kalman filter. @@ -450,7 +450,7 @@ def predict(a, P, c, T, R, Q) -> Tuple[TensorVariable, TensorVariable]: @staticmethod def update( a, P, y, c, d, Z, H, all_nan_flag - ) -> Tuple[TensorVariable, TensorVariable, TensorVariable, TensorVariable, TensorVariable]: + ) -> tuple[TensorVariable, TensorVariable, TensorVariable, TensorVariable, TensorVariable]: """ Perform the update step of the Kalman filter. @@ -489,13 +489,13 @@ def update( Returns ------- - Tuple[TensorVariable, TensorVariable, TensorVariable, TensorVariable, TensorVariable] + tuple[TensorVariable, TensorVariable, TensorVariable, TensorVariable, TensorVariable] A tuple containing the updated state vector `a_filtered`, the updated covariance matrix `P_filtered`, the predicted observation `obs_mu`, the predicted observation covariance matrix `obs_cov`, and the log-likelihood `ll`. """ raise NotImplementedError - def kalman_step(self, *args) -> Tuple: + def kalman_step(self, *args) -> tuple: """ Performs a single iteration of the Kalman filter, which is composed of two steps : an update step and a prediction step. The timing convention follows [1], in which initial state and covariance estimates a0 and P0 @@ -624,7 +624,7 @@ def update(self, a, P, y, c, d, Z, H, all_nan_flag): Returns ------- - Tuple[TensorVariable, TensorVariable, TensorVariable, TensorVariable, float] + tuple[TensorVariable, TensorVariable, TensorVariable, TensorVariable, float] A tuple containing the updated state vector `a_filtered`, the updated covariance matrix `P_filtered`, the one-step forecast mean `y_hat`, one-step forcast covariance matrix `F`, and the log-likelihood of the data, given the one-step forecasts, `ll`. @@ -769,7 +769,7 @@ def build_graph( return_updates=False, missing_fill_value=None, cov_jitter=None, - ) -> List[TensorVariable]: + ) -> list[TensorVariable]: """ Need to override the base step to add an argument to self.update, passing F_inv at every step. """ diff --git a/pymc_experimental/statespace/filters/kalman_smoother.py b/pymc_experimental/statespace/filters/kalman_smoother.py index 63e261d8..32581fa4 100644 --- a/pymc_experimental/statespace/filters/kalman_smoother.py +++ b/pymc_experimental/statespace/filters/kalman_smoother.py @@ -100,6 +100,9 @@ def build_graph( [smoothed_covariances[::-1], pt.expand_dims(P_last, axis=(0,))], axis=0 ) + smoothed_states.name = "smoothed_states" + smoothed_covariances.name = "smoothed_covariances" + return smoothed_states, smoothed_covariances def smoother_step(self, *args): diff --git a/pymc_experimental/statespace/models/SARIMAX.py b/pymc_experimental/statespace/models/SARIMAX.py index 52caa48e..9350b5b1 100644 --- a/pymc_experimental/statespace/models/SARIMAX.py +++ b/pymc_experimental/statespace/models/SARIMAX.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Optional, Sequence, Tuple +from typing import Any, Optional, Sequence, Tuple import numpy as np import pytensor.tensor as pt @@ -258,7 +258,7 @@ def param_names(self): return names @property - def param_info(self) -> Dict[str, Dict[str, Any]]: + def param_info(self) -> dict[str, dict[str, Any]]: info = { "x0": { "shape": (self.k_states,), @@ -269,11 +269,11 @@ def param_info(self) -> Dict[str, Dict[str, Any]]: "constraints": "Positive Semi-definite", }, "sigma_obs": { - "shape": (self.k_endog,), + "shape": None if self.k_endog == 1 else (self.k_endog,), "constraints": "Positive", }, "sigma_state": { - "shape": (self.k_posdef,), + "shape": None if self.k_posdef == 1 else (self.k_posdef,), "constraints": "Positive", }, "ar_params": { @@ -307,6 +307,8 @@ def state_names(self): states += ["innovations"] if self.q > 0: states += [f"L{i + 1}.innovations" for i in range(self._q_max - 1)] + else: + raise NotImplementedError() return states @@ -330,8 +332,9 @@ def param_dims(self): "seasonal_ar_params": (SEASONAL_AR_PARAM_DIM,), "seasonal_ma_params": (SEASONAL_MA_PARAM_DIM,), } - - if not self.measurement_error: + if self.k_endog == 1: + del coord_map["sigma_state"] + if not self.measurement_error or self.k_endog == 1: del coord_map["sigma_obs"] if self.p == 0: del coord_map["ar_params"] @@ -348,7 +351,7 @@ def param_dims(self): return coord_map @property - def coords(self) -> Dict[str, Sequence]: + def coords(self) -> dict[str, Sequence]: coords = make_default_coords(self) if self.p > 0: coords.update({AR_PARAM_DIM: list(range(1, self.p + 1))}) @@ -512,14 +515,14 @@ def make_symbolic_graph(self) -> None: # Set up the state covariance matrix state_cov_idx = ("state_cov",) + np.diag_indices(self.k_posdef) state_cov = self.make_and_register_variable( - "sigma_state", shape=(self.k_posdef,), dtype=floatX + "sigma_state", shape=() if self.k_posdef == 1 else (self.k_posdef,), dtype=floatX ) self.ssm[state_cov_idx] = state_cov**2 if self.measurement_error: obs_cov_idx = ("obs_cov",) + np.diag_indices(self.k_endog) obs_cov = self.make_and_register_variable( - "sigma_obs", shape=(self.k_endog,), dtype=floatX + "sigma_obs", shape=() if self.k_endog == 1 else (self.k_endog,), dtype=floatX ) self.ssm[obs_cov_idx] = obs_cov**2 diff --git a/pymc_experimental/statespace/models/VARMAX.py b/pymc_experimental/statespace/models/VARMAX.py index fb4ff451..0fe1ec64 100644 --- a/pymc_experimental/statespace/models/VARMAX.py +++ b/pymc_experimental/statespace/models/VARMAX.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Sequence, Tuple +from typing import Any, Sequence, Tuple import numpy as np import pytensor @@ -31,7 +31,7 @@ class BayesianVARMAX(PyMCStateSpace): Number of autoregressive (AR) and moving average (MA) terms to include in the model. All terms up to the specified order are included. For restricted models, set zeros directly on the priors. - endog_names: List of str, optional + endog_names: list of str, optional Names of the endogenous variables being modeled. Used to generate names for the state and shock coords. If None, the state names will simply be numbered. @@ -141,7 +141,7 @@ class BayesianVARMAX(PyMCStateSpace): def __init__( self, order: Tuple[int, int], - endog_names: List[str] = None, + endog_names: list[str] = None, k_endog: int = None, stationary_initialization: bool = False, filter_type: str = "standard", @@ -200,7 +200,7 @@ def param_names(self): return names @property - def param_info(self) -> Dict[str, Dict[str, Any]]: + def param_info(self) -> dict[str, dict[str, Any]]: info = { "x0": { "shape": (self.k_states,), @@ -258,7 +258,7 @@ def default_priors(self): raise NotImplementedError @property - def coords(self) -> Dict[str, Sequence]: + def coords(self) -> dict[str, Sequence]: coords = make_default_coords(self) if self.p > 0: coords.update({AR_PARAM_DIM: list(range(1, self.p + 1))}) diff --git a/pymc_experimental/statespace/models/structural.py b/pymc_experimental/statespace/models/structural.py index 7def3f0d..4c98695c 100644 --- a/pymc_experimental/statespace/models/structural.py +++ b/pymc_experimental/statespace/models/structural.py @@ -1,7 +1,7 @@ import functools as ft import logging from abc import ABC -from typing import Any, Dict, List, Optional, Sequence, Union +from typing import Any, Optional, Sequence, Union import numpy as np import pytensor @@ -44,7 +44,7 @@ def _frequency_transition_block(s, j): return pt.stack([[pt.cos(lam), pt.sin(lam)], [-pt.sin(lam), pt.cos(lam)]]).squeeze() -def block_diagonal(matrices: List[pt.matrix]): +def block_diagonal(matrices: list[pt.matrix]): rows = [x.shape[0] for x in matrices] cols = [x.shape[1] for x in matrices] out = pt.zeros((sum(rows), sum(cols))) @@ -81,15 +81,18 @@ def __init__( self, ssm: PytensorRepresentation, state_names, + data_names, shock_names, param_names, exog_names, param_dims, coords, param_info, + data_info, component_info, measurement_error, name_to_variable, + name_to_data, name=None, verbose=True, filter_type: str = "standard", @@ -104,6 +107,7 @@ def __init__( param_names, param_dims, param_info, k_states ) self._state_names = state_names + self._data_names = data_names self._shock_names = shock_names self._param_names = param_names self._param_dims = param_dims @@ -113,6 +117,7 @@ def __init__( self._coords = coords self._param_info = param_info + self._data_info = data_info self.measurement_error = measurement_error super().__init__( @@ -126,7 +131,10 @@ def __init__( self.ssm = ssm self._component_info = component_info + self._name_to_variable = name_to_variable + self._name_to_data = name_to_data + self._exog_names = exog_names self._needs_exog_data = len(exog_names) > 0 @@ -149,6 +157,10 @@ def _add_inital_state_cov_to_properties(param_names, param_dims, param_info, k_s def param_names(self): return self._param_names + @property + def data_names(self) -> list[str]: + return self._data_names + @property def state_names(self): return self._state_names @@ -166,13 +178,17 @@ def param_dims(self): return self._param_dims @property - def coords(self) -> Dict[str, Sequence]: + def coords(self) -> dict[str, Sequence]: return self._coords @property - def param_info(self) -> Dict[str, Dict[str, Any]]: + def param_info(self) -> dict[str, dict[str, Any]]: return self._param_info + @property + def data_info(self) -> dict[str, dict[str, Any]]: + return self._data_info + def make_symbolic_graph(self) -> None: """ Assign placeholder pytensor variables among statespace matrices in positions where PyMC variables will go. @@ -338,6 +354,7 @@ def __init__( k_states, k_posdef, state_names=None, + data_names=None, shock_names=None, param_names=None, exog_names=None, @@ -354,6 +371,7 @@ def __init__( self.measurement_error = measurement_error self.state_names = state_names if state_names is not None else [] + self.data_names = data_names if data_names is not None else [] self.shock_names = shock_names if shock_names is not None else [] self.param_names = param_names if param_names is not None else [] self.exog_names = exog_names if exog_names is not None else [] @@ -361,7 +379,10 @@ def __init__( self.needs_exog_data = len(self.exog_names) > 0 self.coords = {} self.param_dims = {} + self.param_info = {} + self.data_info = {} + self.param_counts = {} if representation is None: @@ -370,6 +391,7 @@ def __init__( self.ssm = representation self._name_to_variable = {} + self._name_to_data = {} if not component_from_sum: self.populate_component_properties() @@ -429,6 +451,43 @@ def make_and_register_variable(self, name, shape, dtype=floatX) -> Variable: self._name_to_variable[name] = placeholder return placeholder + def make_and_register_data(self, name, shape, dtype=floatX) -> Variable: + r""" + Helper function to create a pytensor symbolic variable and register it in the _name_to_data dictionary + + Parameters + ---------- + name : str + The name of the placeholder data. Must be the name of an expected data variable. + shape : int or tuple of int + Shape of the parameter + dtype : str, default pytensor.config.floatX + dtype of the parameter + + Notes + ----- + See docstring for make_and_register_variable for more details. This function is similar, but handles data + inputs instead of model parameters. + + An error is raised if the provided name has already been registered, or if the name is not present in the + ``data_names`` property. + """ + if name not in self.data_names: + raise ValueError( + f"{name} is not a model parameter. All placeholder variables should correspond to model " + f"parameters." + ) + + if name in self._name_to_data.keys(): + raise ValueError( + f"{name} is already a registered placeholder variable with shape " + f"{self._name_to_data[name].type.shape}" + ) + + placeholder = pt.tensor(name, shape=shape, dtype=dtype) + self._name_to_data[name] = placeholder + return placeholder + def make_symbolic_graph(self) -> None: raise NotImplementedError @@ -481,7 +540,6 @@ def make_slice(name, x, o_x): transition.name = T.name design = pt.concatenate(conform_time_varying_and_time_invariant_matrices(Z, o_Z), axis=-1) - design.name = Z.name selection = block_diagonal([R, o_R]) @@ -542,14 +600,18 @@ def _make_combined_name(self): def __add__(self, other): state_names = self._combine_property(other, "state_names") + data_names = self._combine_property(other, "data_names") param_names = self._combine_property(other, "param_names") shock_names = self._combine_property(other, "shock_names") param_info = self._combine_property(other, "param_info") + data_info = self._combine_property(other, "data_info") param_dims = self._combine_property(other, "param_dims") coords = self._combine_property(other, "coords") exog_names = self._combine_property(other, "exog_names") _name_to_variable = self._combine_property(other, "_name_to_variable") + _name_to_data = self._combine_property(other, "_name_to_data") + measurement_error = any([self.measurement_error, other.measurement_error]) k_states, k_posdef, k_endog = self._get_combined_shapes(other) @@ -567,30 +629,22 @@ def __add__(self, other): new_comp._component_info = self._combine_component_info(other) new_comp.name = new_comp._make_combined_name() - property_names = [ - "state_names", - "param_names", - "shock_names", - "state_dims", - "coords", - "param_dims", - "param_info", - "exog_names", - "_name_to_variable", - ] - property_values = [ - state_names, - param_names, - shock_names, - param_dims, - coords, - param_dims, - param_info, - exog_names, - _name_to_variable, + names_and_props = [ + ("state_names", state_names), + ("data_names", data_names), + ("param_names", param_names), + ("shock_names", shock_names), + ("param_dims", param_dims), + ("coords", coords), + ("param_dims", param_dims), + ("param_info", param_info), + ("data_info", data_info), + ("exog_names", exog_names), + ("_name_to_variable", _name_to_variable), + ("_name_to_data", _name_to_data), ] - for prop, value in zip(property_names, property_values): + for prop, value in names_and_props: setattr(new_comp, prop, value) return new_comp @@ -622,15 +676,18 @@ def build(self, name=None, filter_type="standard", verbose=True): self.ssm, name=name, state_names=self.state_names, + data_names=self.data_names, shock_names=self.shock_names, param_names=self.param_names, param_dims=self.param_dims, coords=self.coords, param_info=self.param_info, + data_info=self.data_info, component_info=self._component_info, measurement_error=self.measurement_error, exog_names=self.exog_names, name_to_variable=self._name_to_variable, + name_to_data=self._name_to_data, filter_type=filter_type, verbose=verbose, ) @@ -881,7 +938,8 @@ def populate_component_properties(self): } def make_symbolic_graph(self) -> None: - error_sigma = self.make_and_register_variable(f"sigma_{self.name}", shape=(self.k_endog,)) + sigma_shape = () if self.k_endog == 1 else (self.k_endog,) + error_sigma = self.make_and_register_variable(f"sigma_{self.name}", shape=sigma_shape) diag_idx = np.diag_indices(self.k_endog) idx = np.s_["obs_cov", diag_idx[0], diag_idx[1]] self.ssm[idx] = error_sigma**2 @@ -1498,7 +1556,7 @@ def __init__( self, k_exog: Optional[int] = None, name: Optional[str] = "Exogenous", - state_names: Optional[List[str]] = None, + state_names: Optional[list[str]] = None, innovations=False, ): self.innovations = innovations @@ -1520,7 +1578,8 @@ def __init__( obs_state_idxs=np.ones(k_states), ) - def _get_state_names(self, k_exog, state_names, name): + @staticmethod + def _get_state_names(k_exog: Optional[int], state_names: Optional[list[str]], name: str): if k_exog is None and state_names is None: raise ValueError("Must specify at least one of k_exog or state_names") if state_names is not None and k_exog is not None: @@ -1533,7 +1592,7 @@ def _get_state_names(self, k_exog, state_names, name): return k_exog, state_names - def _handle_input_data(self, k_exog: int, state_names: Optional[List[str]], name) -> int: + def _handle_input_data(self, k_exog: int, state_names: Optional[list[str]], name) -> int: k_exog, state_names = self._get_state_names(k_exog, state_names, name) self.state_names = state_names @@ -1541,7 +1600,7 @@ def _handle_input_data(self, k_exog: int, state_names: Optional[List[str]], name def make_symbolic_graph(self) -> None: betas = self.make_and_register_variable(f"beta_{self.name}", shape=(self.k_states,)) - regression_data = self.make_and_register_variable( + regression_data = self.make_and_register_data( f"data_{self.name}", shape=(None, self.k_states) ) @@ -1560,17 +1619,19 @@ def make_symbolic_graph(self) -> None: def populate_component_properties(self) -> None: self.shock_names = self.state_names - self.param_names = [f"beta_{self.name}", f"data_{self.name}"] + self.param_names = [f"beta_{self.name}"] + self.data_names = [f"data_{self.name}"] self.param_dims = { f"beta_{self.name}": ("exog_state",), - f"data_{self.name}": (TIME_DIM, "exog_state"), } self.param_info = { f"beta_{self.name}": {"shape": (1,), "constraints": None, "dims": ("exog_state",)}, + } + + self.data_info = { f"data_{self.name}": { "shape": (None, self.k_states), - "constraints": None, "dims": (TIME_DIM, "exog_state"), }, } diff --git a/pymc_experimental/statespace/models/utilities.py b/pymc_experimental/statespace/models/utilities.py index e3f907f2..40f38093 100644 --- a/pymc_experimental/statespace/models/utilities.py +++ b/pymc_experimental/statespace/models/utilities.py @@ -1,5 +1,3 @@ -from typing import List - import numpy as np import pytensor.tensor as pt @@ -29,7 +27,7 @@ def make_default_coords(ss_mod): return coords -def cleanup_states(states: List[str]) -> List[str]: +def cleanup_states(states: list[str]) -> list[str]: """ Remove meaningless symbols from state names @@ -60,25 +58,25 @@ def cleanup_states(states: List[str]) -> List[str]: return out -def make_harvey_state_names(p, d, q, P, D, Q, S) -> List[str]: +def make_harvey_state_names(p: int, d: int, q: int, P: int, D: int, Q: int, S: int) -> list[str]: """ Generate informative names for the SARIMA states in the Harvey representation Parameters ---------- - p, int + p: int AR order - d, int + d: int Number of ARIMA differences - q, int + q: int MA order - P, int + P: int Seasonal AR order - D, int + D: int Number of seasonal differences - Q, int + Q: int Seasonal MA order - S, int + S: int Seasonal length Returns @@ -90,9 +88,7 @@ def make_harvey_state_names(p, d, q, P, D, Q, S) -> List[str]: a list of state names that can help users understand what they are getting back from the statespace. In particular, it is helpful to know how differences and seasonal differences are incorporated into the model """ - n_diffs = S * D + d k_lags = max(p + P * S, q + Q * S + 1) - k_states = k_lags + n_diffs has_diff = (d + D) > 0 # First state is always data @@ -111,7 +107,7 @@ def make_harvey_state_names(p, d, q, P, D, Q, S) -> List[str]: season_diff = [S, 0] curr_state = f"D{arma_diff[0]}^{arma_diff[1]}" for i in range(D): - states.extend([f"L{j+1}{curr_state}.data" for j in range(S - 1)]) + states.extend([f"L{j + 1}{curr_state}.data" for j in range(S - 1)]) season_diff[1] += 1 curr_state = f"D{arma_diff[0]}^{arma_diff[1]}D{season_diff[0]}^{season_diff[1]}" if i != (D - 1): @@ -124,7 +120,7 @@ def make_harvey_state_names(p, d, q, P, D, Q, S) -> List[str]: # Next, we add the time series dynamics states. These don't have a immediately obvious interpretation, so just call # them "state_1" .., "state_n". suffix = "_star" if "star" in states[-1] else "" - states.extend([f"state{suffix}_{i+1}" for i in range(k_lags - 1)]) + states.extend([f"state{suffix}_{i + 1}" for i in range(k_lags - 1)]) states = cleanup_states(states) @@ -139,19 +135,19 @@ def make_SARIMA_transition_matrix( Parameters ---------- - p, int + p: int AR order - d, int + d: int Number of ARIMA differences - q, int + q: int MA order - P, int + P: int Seasonal AR order - D, int + D: int Number of seasonal differences - Q, int + Q: int Seasonal MA order - S, int + S: int Seasonal length Returns @@ -200,10 +196,10 @@ def make_SARIMA_transition_matrix( \begin{bmatrix} x_{t-1} \\ \Delta x_{t-1} \\ \Delta^2 x_{t-1} \\ x_{t-1}^\star \end{bmatrix} Next are the seasonal differences. The highest seasonal difference stored in the states is one less than the - seasonal difference order, ``D``. That is, if ``D = 1, S = 4``, there will be states :math:``x_{t-1}, x_{t-2}, x_{t-3}, - x_{t-4}, x_t^\star`, with :math:`x_t^\star = \Delta_4 x_t = x_t - x_{t-4}`. The level state can be recovered by - adding :math:`x_t^\star + x_{t-4}`. To accomplish all of this, two things need to be inserted into the transition - matrix: + seasonal difference order, ``D``. That is, if ``D = 1, S = 4``, there will be states :math:``x_{t-1}, x_{t-2}, + x_{t-3}, x_{t-4}, x_t^\star`, with :math:`x_t^\star = \Delta_4 x_t = x_t - x_{t-4}`. The level state can be + recovered by adding :math:`x_t^\star + x_{t-4}`. To accomplish all of this, two things need to be inserted into the + transition matrix: 1. A shifted identity matrix to "roll" the lagged states forward each transition, and 2. A pair of 1's to recover the level state by adding the last 2 states (:math:`x_t^\star + x_{t-4}`) diff --git a/pymc_experimental/tests/statespace/test_SARIMAX.py b/pymc_experimental/tests/statespace/test_SARIMAX.py index c02cbed4..1e03bd54 100644 --- a/pymc_experimental/tests/statespace/test_SARIMAX.py +++ b/pymc_experimental/tests/statespace/test_SARIMAX.py @@ -180,7 +180,7 @@ def pymc_mod(arima_mod): ar_params = pm.Normal("ar_params", sigma=0.1, dims=["ar_lag"]) ma_params = pm.Normal("ma_params", sigma=1, dims=["ma_lag"]) sigma_state = pm.Exponential("sigma_state", 0.5) - arima_mod.build_statespace_graph(data=data) + arima_mod.build_statespace_graph(data=data, save_kalman_filter_outputs_in_idata=True) return pymc_mod @@ -210,7 +210,8 @@ def pymc_mod_interp(arima_mod_interp): ma_params = pm.Normal("ma_params", sigma=1, dims=["ma_lag"]) sigma_state = pm.Exponential("sigma_state", 0.5) sigma_obs = pm.Exponential("sigma_obs", 0.1) - arima_mod_interp.build_statespace_graph(data=data) + + arima_mod_interp.build_statespace_graph(data=data, save_kalman_filter_outputs_in_idata=True) return pymc_mod @@ -296,9 +297,7 @@ def test_SARIMAX_update_matches_statsmodels(p, d, q, P, D, Q, S, data, rng): ), ) - pm.Deterministic( - "sigma_state", pt.as_tensor_variable(np.sqrt(np.array([param_d["sigma2"]]))) - ) + pm.Deterministic("sigma_state", pt.as_tensor_variable(np.sqrt(param_d["sigma2"]))) mod._insert_random_variables() matrices = pm.draw(mod.subbed_ssm) diff --git a/pymc_experimental/tests/statespace/test_VARMAX.py b/pymc_experimental/tests/statespace/test_VARMAX.py index 03d8f59b..f40620a8 100644 --- a/pymc_experimental/tests/statespace/test_VARMAX.py +++ b/pymc_experimental/tests/statespace/test_VARMAX.py @@ -63,7 +63,7 @@ def pymc_mod(varma_mod, data): ) sigma_obs = pm.Exponential("sigma_obs", 1, dims=["observed_state"]) - varma_mod.build_statespace_graph(data=data) + varma_mod.build_statespace_graph(data=data, save_kalman_filter_outputs_in_idata=True) return pymc_mod @@ -71,10 +71,8 @@ def pymc_mod(varma_mod, data): @pytest.fixture(scope="session") def idata(pymc_mod, rng): with pymc_mod: - # idata = pm.sample(draws=10, tune=0, chains=1, random_seed=rng) idata = pm.sample_prior_predictive(samples=10, random_seed=rng) - # idata.extend(idata_prior) return idata @@ -158,11 +156,11 @@ def test_all_prior_covariances_are_PSD(filter_output, pymc_mod, rng): parameters = [ - {"steps": 10, "shock_size": None}, - {"steps": 10, "shock_size": 1.0}, - {"steps": 10, "shock_size": np.array([1.0, 0.0, 0.0])}, + {"n_steps": 10, "shock_size": None}, + {"n_steps": 10, "shock_size": 1.0}, + {"n_steps": 10, "shock_size": np.array([1.0, 0.0, 0.0])}, { - "steps": 10, + "n_steps": 10, "shock_cov": np.array([[1.38, 0.58, -1.84], [0.58, 0.99, -0.82], [-1.84, -0.82, 2.51]]), }, { diff --git a/pymc_experimental/tests/statespace/test_coord_assignment.py b/pymc_experimental/tests/statespace/test_coord_assignment.py index 74fdc740..eadbc85f 100644 --- a/pymc_experimental/tests/statespace/test_coord_assignment.py +++ b/pymc_experimental/tests/statespace/test_coord_assignment.py @@ -11,8 +11,6 @@ from pymc_experimental.statespace.utils.constants import ( FILTER_OUTPUT_DIMS, FILTER_OUTPUT_NAMES, - MATRIX_DIMS, - MATRIX_NAMES, SMOOTHER_OUTPUT_NAMES, TIME_DIM, ) @@ -85,21 +83,12 @@ def _create_model(f): P0 = pm.Deterministic("P0", pt.diag(P0_diag), dims=("state", "state_aux")) initial_trend = pm.Normal("initial_trend", dims="trend_state") sigma_trend = pm.Exponential("sigma_trend", 1, dims="trend_shock") - ss_mod.build_statespace_graph(data) + ss_mod.build_statespace_graph(data, save_kalman_filter_outputs_in_idata=True) return mod return _create_model -@pytest.mark.parametrize("f, warning", func_inputs, ids=function_names) -def test_matrix_coord_assignment(f, warning, create_model): - with warning: - pymc_model = create_model(f) - - for matrix in MATRIX_NAMES: - assert pymc_model.named_vars_to_dims[matrix] == MATRIX_DIMS[matrix] - - @pytest.mark.parametrize("f, warning", func_inputs, ids=function_names) def test_filter_output_coord_assignment(f, warning, create_model): with warning: diff --git a/pymc_experimental/tests/statespace/test_distributions.py b/pymc_experimental/tests/statespace/test_distributions.py index e094a32f..910a8929 100644 --- a/pymc_experimental/tests/statespace/test_distributions.py +++ b/pymc_experimental/tests/statespace/test_distributions.py @@ -92,6 +92,7 @@ def ss_mod_no_me(): @pytest.mark.parametrize("kfilter", filter_names, ids=filter_names) def test_loglike_vectors_agree(kfilter, pymc_model): + # TODO: This test might be flakey, I've gotten random failures ss_mod = structural.LevelTrendComponent(order=2).build( "data", verbose=False, filter_type=kfilter ) @@ -184,6 +185,8 @@ def test_lgss_with_time_varying_inputs(output_name, rng): mod.add_exogenous(trend) mod._insert_random_variables() + mod._insert_data_variables() + matrices = mod.unpack_statespace() # pylint: disable=unpacking-non-sequence diff --git a/pymc_experimental/tests/statespace/test_statespace.py b/pymc_experimental/tests/statespace/test_statespace.py index d67568b5..c6f73449 100644 --- a/pymc_experimental/tests/statespace/test_statespace.py +++ b/pymc_experimental/tests/statespace/test_statespace.py @@ -65,11 +65,11 @@ def coords(self): return make_default_coords(self) def make_symbolic_graph(self): - theta = pt.vector("theta", shape=(self.k_states,), dtype=floatX) - self.ssm["transition", 0, :] = theta + rho = self.make_and_register_variable("rho", ()) + zeta = self.make_and_register_variable("zeta", ()) + self.ssm["transition", 0, 0] = rho + self.ssm["transition", 1, 0] = zeta - T = np.zeros((2, 2)).astype(floatX) - T[1, 0] = 1.0 Z = np.array([[1.0, 0.0]], dtype=floatX) R = np.array([[1.0], [0.0]], dtype=floatX) H = np.array([[0.1]], dtype=floatX) @@ -80,8 +80,8 @@ def make_symbolic_graph(self): k_endog=nile.shape[1], k_states=2, k_posdef=1, filter_type="standard", verbose=False ) for X, name in zip( - [T, Z, R, H, Q, P0], - ["transition", "design", "selection", "obs_cov", "state_cov", "initial_state_cov"], + [Z, R, H, Q, P0], + ["design", "selection", "obs_cov", "state_cov", "initial_state_cov"], ): ss_mod.ssm[name] = X @@ -93,7 +93,11 @@ def pymc_mod(ss_mod): with pm.Model(coords=ss_mod.coords) as pymc_mod: rho = pm.Beta("rho", 1, 1) zeta = pm.Deterministic("zeta", 1 - rho) - ss_mod.build_statespace_graph(data=nile, include_smoother=True) + + ss_mod.build_statespace_graph(data=nile, save_kalman_filter_outputs_in_idata=True) + names = ["x0", "P0", "c", "d", "T", "Z", "R", "H", "Q"] + for name, matrix in zip(names, ss_mod.unpack_statespace()): + pm.Deterministic(name, matrix) return pymc_mod @@ -213,7 +217,7 @@ def test_build_statespace_graph_warns_if_data_has_nans(): ss_mod = st.LevelTrendComponent(order=1, innovations_order=0).build(verbose=False) with pm.Model() as pymc_mod: - initial_trend = pm.Normal("initial_trend") + initial_trend = pm.Normal("initial_trend", shape=(1,)) P0 = pm.Deterministic("P0", pt.eye(1, dtype=floatX)) with pytest.warns(pm.ImputationWarning): ss_mod.build_statespace_graph( @@ -226,7 +230,7 @@ def test_build_statespace_graph_raises_if_data_has_missing_fill(): ss_mod = st.LevelTrendComponent(order=1, innovations_order=0).build(verbose=False) with pm.Model() as pymc_mod: - initial_trend = pm.Normal("initial_trend") + initial_trend = pm.Normal("initial_trend", shape=(1,)) P0 = pm.Deterministic("P0", pt.eye(1, dtype=floatX)) with pytest.raises(ValueError, match="Provided data contains the value 1.0"): data = np.ones((10, 1), dtype=floatX) @@ -290,7 +294,7 @@ def test_forecast(filter_output, ss_mod, idata, rng): @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") def test_forecast_fails_if_exog_needed(exog_ss_mod, idata_exog): - time_idx = idata_exog.posterior.coords["time"].values + time_idx = idata_exog.observed_data.coords["time"].values with pytest.xfail("Scenario-based forcasting with exogenous variables not currently supported"): forecast_idata = exog_ss_mod.forecast( idata_exog, start=time_idx[-1], periods=10, random_seed=rng @@ -313,11 +317,16 @@ def test_add_exogenous(rng, shape): y = rng.normal(size=(10, 1)) with pm.Model() as m: - initial_trend = pm.Normal("initial_trend") + initial_trend = pm.Normal("initial_trend", shape=(1,)) P0 = pm.Deterministic("P0", pt.eye(1, dtype=floatX)) constant = pm.Normal("constant", shape=shape) ss_mod.add_exogenous(constant) ss_mod.build_statespace_graph(data=y) + names = ["x0", "P0", "c", "d", "T", "Z", "R", "H", "Q"] + for name, matrix in zip(names, ss_mod.unpack_statespace()): + if name not in [x.name for x in m.deterministics]: + pm.Deterministic(name, matrix) + d, const = pm.draw([m["d"].squeeze(), m["constant"].squeeze()], 10) assert_allclose(d, const) diff --git a/pymc_experimental/tests/statespace/test_structural.py b/pymc_experimental/tests/statespace/test_structural.py index f9267c03..4af329e6 100644 --- a/pymc_experimental/tests/statespace/test_structural.py +++ b/pymc_experimental/tests/statespace/test_structural.py @@ -196,7 +196,7 @@ def create_structural_model_and_equivalent_statsmodel( components = [] if irregular: - sigma2 = np.abs(rng.normal(size=(1,))).astype(floatX) + sigma2 = np.abs(rng.normal()).astype(floatX) params["sigma_irregular"] = np.sqrt(sigma2) sm_params["sigma2.irregular"] = sigma2.item() expected_param_dims["sigma_irregular"] += ("observed_state",) @@ -608,7 +608,7 @@ def test_frequency_seasonality(n, s, rng): k = get_shift_factor(s) T = int(s * k) - x, y = simulate_from_numpy_model(mod, rng, params, 2 * T) + x, y = simulate_from_numpy_model(mod, rng, params, steps=2 * T) assert_pattern_repeats(y, T, atol=ATOL, rtol=RTOL) # Check coords @@ -672,8 +672,9 @@ def test_exogenous_component(rng): data = rng.normal(size=(100, 2)).astype(floatX) mod = st.RegressionComponent(state_names=["feature_1", "feature_2"], name="exog") - params = {"beta_exog": np.array([1.0, 2.0], dtype=floatX), "data_exog": data} - x, y = simulate_from_numpy_model(mod, rng, params) + params = {"beta_exog": np.array([1.0, 2.0], dtype=floatX)} + exog_data = {"data_exog": data} + x, y = simulate_from_numpy_model(mod, rng, params, exog_data) # Check that the generated data is just a linear regression assert_allclose(y, data @ params["beta_exog"], atol=ATOL, rtol=RTOL) @@ -756,6 +757,9 @@ def test_filter_scans_time_varying_design_matrix(rng): beta_exog = pm.Normal("beta_exog", dims=["exog_state"]) mod.build_statespace_graph(y) + x0, P0, c, d, T, Z, R, H, Q = mod.unpack_statespace() + pm.Deterministic("Z", Z) + prior = pm.sample_prior_predictive(samples=10) prior_Z = prior.prior.Z.values @@ -788,6 +792,8 @@ def test_extract_components_from_idata(rng): sigma_obs = pm.Exponential("sigma_obs", 1) mod.build_statespace_graph(y) + + x0, P0, c, d, T, Z, R, H, Q = mod.unpack_statespace() prior = pm.sample_prior_predictive(samples=10) filter_prior = mod.sample_conditional_prior(prior) diff --git a/pymc_experimental/tests/statespace/utilities/test_helpers.py b/pymc_experimental/tests/statespace/utilities/test_helpers.py index 1ff9fd29..078c6649 100644 --- a/pymc_experimental/tests/statespace/utilities/test_helpers.py +++ b/pymc_experimental/tests/statespace/utilities/test_helpers.py @@ -1,5 +1,3 @@ -from typing import List - import numpy as np import pandas as pd import pytensor @@ -170,7 +168,7 @@ def fast_eval(var): return pytensor.function([], var, mode="FAST_COMPILE")() -def delete_rvs_from_model(rv_names: List[str]) -> None: +def delete_rvs_from_model(rv_names: list[str]) -> None: """Remove all model mappings referring to rv This can be used to "delete" an RV from a model @@ -207,22 +205,28 @@ def unpack_statespace(ssm): return [ssm[SHORT_NAME_TO_LONG[x]] for x in MATRIX_NAMES] -def unpack_symbolic_matrices_with_params(mod, param_dict, mode="FAST_COMPILE"): +def unpack_symbolic_matrices_with_params(mod, param_dict, data_dict=None, mode="FAST_COMPILE"): + inputs = list(mod._name_to_variable.values()) + if data_dict is not None: + inputs += list(mod._name_to_data.values()) + else: + data_dict = {} + f_matrices = pytensor.function( - list(mod._name_to_variable.values()), + inputs, unpack_statespace(mod.ssm), on_unused_input="raise", mode=mode, ) - x0, P0, c, d, T, Z, R, H, Q = f_matrices(**param_dict) + x0, P0, c, d, T, Z, R, H, Q = f_matrices(**param_dict, **data_dict) return x0, P0, c, d, T, Z, R, H, Q -def simulate_from_numpy_model(mod, rng, param_dict, steps=100): +def simulate_from_numpy_model(mod, rng, param_dict, data_dict=None, steps=100): """ Helper function to visualize the components outside of a PyMC model context """ - x0, P0, c, d, T, Z, R, H, Q = unpack_symbolic_matrices_with_params(mod, param_dict) + x0, P0, c, d, T, Z, R, H, Q = unpack_symbolic_matrices_with_params(mod, param_dict, data_dict) k_states = mod.k_states k_posdef = mod.k_posdef @@ -276,9 +280,7 @@ def make_stationary_params(data, p, d, q, P, D, Q, S): sm_sarimax = sm.tsa.SARIMAX(data, order=(p, d, q), seasonal_order=(P, D, Q, S)) res = sm_sarimax.fit(disp=False) - param_dict = dict( - ar_params=[], ma_params=[], seasonal_ar_params=[], seasonal_ma_params=[], sigma_state=[] - ) + param_dict = dict(ar_params=[], ma_params=[], seasonal_ar_params=[], seasonal_ma_params=[]) for name, param in zip(res.param_names, res.params): if name.startswith("ar.S"): @@ -290,7 +292,11 @@ def make_stationary_params(data, p, d, q, P, D, Q, S): elif name.startswith("ma."): param_dict["ma_params"].append(param) else: - param_dict["sigma_state"].append(param) + param_dict["sigma_state"] = param - param_dict = {k: np.array(v, dtype=floatX) for k, v in param_dict.items() if len(v) > 0} + param_dict = { + k: np.array(v, dtype=floatX) + for k, v in param_dict.items() + if isinstance(v, float) or len(v) > 0 + } return param_dict