-
-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathstatespace.py
2312 lines (1891 loc) · 94.7 KB
/
statespace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
from collections.abc import Callable, Sequence
from typing import Any
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
from arviz import InferenceData
from pymc.model import modelcontext
from pymc.model.transform.optimization import freeze_dims_and_data
from pymc.util import RandomState
from pytensor import Variable, graph_replace
from pytensor.compile import get_mode
from rich.box import SIMPLE_HEAD
from rich.console import Console
from rich.table import Table
from pymc_extras.statespace.core.representation import PytensorRepresentation
from pymc_extras.statespace.filters import (
KalmanSmoother,
SquareRootFilter,
StandardFilter,
UnivariateFilter,
)
from pymc_extras.statespace.filters.distributions import (
LinearGaussianStateSpace,
SequenceMvNormal,
)
from pymc_extras.statespace.filters.utilities import stabilize
from pymc_extras.statespace.utils.constants import (
ALL_STATE_AUX_DIM,
ALL_STATE_DIM,
FILTER_OUTPUT_DIMS,
FILTER_OUTPUT_TYPES,
JITTER_DEFAULT,
MATRIX_DIMS,
MATRIX_NAMES,
OBS_STATE_DIM,
SHOCK_DIM,
SHORT_NAME_TO_LONG,
TIME_DIM,
VECTOR_VALUED,
)
from pymc_extras.statespace.utils.data_tools import register_data_with_pymc
_log = logging.getLogger("pymc.experimental.statespace")
floatX = pytensor.config.floatX
FILTER_FACTORY = {
"standard": StandardFilter,
"univariate": UnivariateFilter,
"cholesky": SquareRootFilter,
}
def _validate_filter_arg(filter_arg):
if filter_arg.lower() not in FILTER_OUTPUT_TYPES:
raise ValueError(
f'filter_output should be one of {", ".join(FILTER_OUTPUT_TYPES)}, received {filter_arg}'
)
def _verify_group(group):
if group not in ["prior", "posterior"]:
raise ValueError(f'Argument "group" must be one of "prior" or "posterior", found {group}')
class PyMCStateSpace:
r"""
Base class for Linear Gaussian Statespace models in PyMC.
Holds a ``PytensorRepresentation`` and ``KalmanFilter``, and provides a mapping between a PyMC model and the
statespace model.
Parameters
----------
k_endog : int
The number of endogenous variables (observed time series).
k_states : int
The number of state variables.
k_posdef : int
The number of shocks in the model
filter_type : str, optional
The type of Kalman filter to use. Valid options are "standard", "univariate", "single", "cholesky", and
"steady_state". For more information, see the docs for each filter. Default is "standard".
verbose : bool, optional
If True, displays information about the initialized model. Defaults to True.
measurement_error : bool, optional
If true, the model contains measurement error. Needed by post-estimation sampling methods to decide how to
compute the observation errors. If False, these errors are deterministically zero; if True, they are sampled
from a multivariate normal.
Notes
-----
Based on the statsmodels statespace implementation https://github.com/statsmodels/statsmodels/blob/main/statsmodels/tsa/statespace/representation.py,
described in [1].
All statespace models inherit from this base class, which is responsible for providing an interface between a
PyMC model and a PytensorRepresentation of a linear statespace model. This is done via the ``update`` method,
which takes as input a vector of PyMC random variables and assigns them to their correct positions inside the
underlying ``PytensorRepresentation``. Construction of the parameter vector, called ``theta``, is done
automatically, but depend on the names provided in the ``param_names`` property.
To implement a new statespace model, one needs to:
1. Overload the ``param_names`` property to return a list of parameter names.
2. Overload the ``update`` method to put these parameters into their respective statespace matrices
In addition, a number of additional properties can be overloaded to provide users with additional resources
when writing their PyMC models. For details, see the attributes section of the docs for this class.
Finally, this class holds post-estimation methods common to all statespace models, which do not need to be
overloaded when writing a custom statespace model.
Examples
--------
The local level model is a simple statespace model. It is a Gaussian random walk with a drift term that itself also
follows a Gaussian random walk, as described by the following two equations:
.. math::
\begin{align}
y_{t} &= y_{t-1} + x_t + \nu_t \tag{1} \\
x_{t} &= x_{t-1} + \eta_t \tag{2}
\end{align}
Where :math:`y_t` is the observed data, and :math:`x_t` is an unobserved trend term. The model has two unknown
parameters, the variances on the two innovations, :math:`sigma_\nu` and :math:`sigma_\eta`. Take the hidden state
vector to be :math:`\begin{bmatrix} y_t & x_t \end{bmatrix}^T` and the shock vector
:math:`\varepsilon_t = \begin{bmatrix} \nu_t & \eta_t \end{bmatrix}^T`. Then this model can be cast into state-space
form with the following matrices:
.. math::
\begin{align}
T &= \begin{bmatrix}1 & 1 \\ 0 & 1 \end{bmatrix} &
R &= \begin{bmatrix}1 & 0 \\ 0 & 1 \end{bmatrix} &
Q &= \begin{bmatrix} \sigma_\nu & 0 \\ 0 & \sigma_\eta \end{bmatrix} &
Z &= \begin{bmatrix} 1 & 0 \end{bmatrix}
\end{align}
With the remaining statespace matrices as zero matrices of the appropriate sizes. The model has two states,
two shocks, and one observed state. Knowing all this, a very simple local level model can be implemented as
follows:
.. code:: python
from pymc_extras.statespace.core import PyMCStateSpace
import numpy as np
class LocalLevel(PyMCStateSpace):
def __init__():
# Initialize the superclass. This creates the PytensorRepresentation and the Kalman Filter
super().__init__(k_endog=1, k_states=2, k_posdef=2)
# Declare the non-zero, non-parameterized matrices
self.ssm['transition', :, :] = np.array([[1.0, 1.0], [0.0, 1.0]])
self.ssm['selection', :, :] = np.eye(2)
self.ssm['design', :, :] = np.array([[1.0, 0.0]])
@property
def param_names(self):
return ['x0', 'P0', 'sigma_nu', 'sigma_eta']
def make_symbolic_graph(self):
# Declare symbolic variables that represent parameters of the model
# In this case, we have 4: x0 (initial state), P0 (initial state covariance), sigma_nu, and sigma_eta
x0 = self.make_and_register_variable('x0', shape=(2,))
P0 = self.make_and_register_variable('P0', shape=(2,2))
sigma_mu = self.make_and_register_variable('sigma_nu')
sigma_eta = self.make_and_register_variable('sigma_eta')
# Next, use these symbolic variables to build the statespace matrices by assigning each parameter
# to its correct location in the correct matrix
self.ssm['initial_state', :] = x0
self.ssm['initial_state_cov', :, :] = P0
self.ssm['state_cov', 0, 0] = sigma_nu
self.ssm['state_cov', 1, 1] = sigma_eta
After defining priors over the named parameters ``P0``, ``x0``, ``sigma_eta``, and ``sigma_nu``, we can sample
from this model:
.. code:: python
import pymc as pm
ll = LocalLevel()
with pm.Model() as mod:
x0 = pm.Normal('x0', shape=(2,))
P0_diag = pm.Exponential('P0_diag', 1, shape=(2,))
P0 = pm.Deterministic('P0', pt.diag(P0_diag))
sigma_nu = pm.Exponential('sigma_nu', 1)
sigma_eta = pm.Exponential('sigma_eta', 1)
ll.build_statespace_graph(data = data)
idata = pm.sample()
References
----------
.. [1] Fulton, Chad. "Estimating time series models by state space methods in Python: Statsmodels." (2015).
http://www.chadfulton.com/files/fulton_statsmodels_2017_v1.pdf
"""
def __init__(
self,
k_endog: int,
k_states: int,
k_posdef: int,
filter_type: str = "standard",
verbose: bool = True,
measurement_error: bool = False,
):
self._fit_mode: str | None = None
self._fit_coords: dict[str, Sequence[str]] | None = None
self._fit_dims: dict[str, Sequence[str]] | None = None
self._fit_data: pt.TensorVariable | None = None
self._needs_exog_data = None
self._exog_names = []
self._exog_data_info = {}
self._name_to_variable = {}
self._name_to_data = {}
self.k_endog = k_endog
self.k_states = k_states
self.k_posdef = k_posdef
self.measurement_error = measurement_error
# All models contain a state space representation and a Kalman filter
self.ssm = PytensorRepresentation(k_endog, k_states, k_posdef)
# This will be populated with PyMC random matrices after calling _insert_random_variables
self.subbed_ssm: list[pt.TensorVariable] | None = None
if filter_type.lower() not in FILTER_FACTORY.keys():
raise NotImplementedError(
"The following are valid filter types: " + ", ".join(list(FILTER_FACTORY.keys()))
)
if filter_type == "single" and self.k_endog > 1:
raise ValueError('Cannot use filter_type = "single" with multiple observed time series')
self.kalman_filter = FILTER_FACTORY[filter_type.lower()]()
self.kalman_smoother = KalmanSmoother()
self.make_symbolic_graph()
self.requirement_table = None
self._populate_prior_requirements()
self._populate_data_requirements()
if verbose and self.requirement_table:
console = Console()
console.print(self.requirement_table)
def _populate_prior_requirements(self) -> None:
"""
Add requirements about priors needed for the model to a rich table, including their names,
shapes, named dimensions, and any parameter constraints.
"""
# Check that the param_info class is implemented, and also that it's a dictionary. We can't proceed if either
# is not true.
try:
if not isinstance(self.param_info, dict):
return
except NotImplementedError:
return
if self.requirement_table is None:
self._initialize_requirement_table()
for param, info in self.param_info.items():
self.requirement_table.add_row(
param, str(info["shape"]), info["constraints"], str(info["dims"])
)
def _populate_data_requirements(self) -> None:
"""
Add requirements about the data needed for the model, including their names, shapes, and named dimensions.
"""
try:
if not isinstance(self.data_info, dict):
return
except NotImplementedError:
return
if self.requirement_table is None:
self._initialize_requirement_table()
else:
self.requirement_table.add_section()
for data, info in self.data_info.items():
self.requirement_table.add_row(data, str(info["shape"]), "pm.Data", str(info["dims"]))
def _initialize_requirement_table(self) -> None:
self.requirement_table = Table(
show_header=True,
show_edge=True,
box=SIMPLE_HEAD,
highlight=True,
)
self.requirement_table.title = "Model Requirements"
self.requirement_table.caption = (
"These parameters should be assigned priors inside a PyMC model block before "
"calling the build_statespace_graph method."
)
self.requirement_table.add_column("Variable", justify="left")
self.requirement_table.add_column("Shape", justify="left")
self.requirement_table.add_column("Constraints", justify="left")
self.requirement_table.add_column("Dimensions", justify="right")
def _unpack_statespace_with_placeholders(
self,
) -> tuple[
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
pt.TensorVariable,
]:
"""
Helper function to quickly obtain all statespace matrices in the standard order. Matrices returned by this
method will include pytensor placeholders.
"""
a0 = self.ssm["initial_state"]
P0 = self.ssm["initial_state_cov"]
c = self.ssm["state_intercept"]
d = self.ssm["obs_intercept"]
T = self.ssm["transition"]
Z = self.ssm["design"]
R = self.ssm["selection"]
H = self.ssm["obs_cov"]
Q = self.ssm["state_cov"]
return a0, P0, c, d, T, Z, R, H, Q
def unpack_statespace(self) -> list[pt.TensorVariable]:
"""
Helper function to quickly obtain all statespace matrices in the standard order.
"""
if self.subbed_ssm is None:
raise ValueError(
"Cannot unpack the complete statespace system until PyMC model variables have been "
"inserted. To build the random statespace matrices, call build_statespace_graph() inside"
"a PyMC model context. "
)
return self.subbed_ssm
@property
def param_names(self) -> list[str]:
"""
Names of model parameters
A list of all parameters expected by the model. Each parameter will be sought inside the active PyMC model
context when ``build_statespace_graph`` is invoked.
"""
raise NotImplementedError("The param_names property has not been implemented!")
@property
def data_names(self) -> list[str]:
"""
Names of data variables expected by the model.
This does not include the observed data series, which is automatically handled by PyMC. This property only
needs to be implemented for models that expect exogenous data.
"""
return []
@property
def param_info(self) -> dict[str, dict[str, Any]]:
"""
Information about parameters needed to declare priors
A dictionary of param_name: dictionary key-value pairs. The return value is used by the
``_print_prior_requirements`` method, to print a message telling users how to define the necessary priors for
the model. Each dictionary should have the following key-value pairs:
* key: "shape", value: a tuple of integers
* key: "constraints", value: a string describing the support of the prior (positive,
positive semi-definite, etc)
* key: "dims", value: tuple of strings
"""
raise NotImplementedError("The params_info property has not been implemented!")
@property
def data_info(self) -> dict[str, dict[str, Any]]:
"""
Information about Data variables that need to be declared in the PyMC model block.
Returns a dictionary of data_name: dictionary of property-name:property description pairs. The return value is
used by the ``_print_data_requirements`` method, to print a message telling users how to define the necessary
data for the model. Each dictionary should have the following key-value pairs:
* key: "shape", value: a tuple of integers
* key: "dims", value: tuple of strings
"""
raise NotImplementedError("The data_info property has not been implemented!")
@property
def state_names(self) -> list[str]:
"""
A k_states length list of strings, associated with the model's hidden states
"""
raise NotImplementedError("The state_names property has not been implemented!")
@property
def observed_states(self) -> list[str]:
"""
A k_endog length list of strings, associated with the model's observed states
"""
raise NotImplementedError("The observed_states property has not been implemented!")
@property
def shock_names(self) -> list[str]:
"""
A k_posdef length list of strings, associated with the model's shock processes
"""
raise NotImplementedError("The shock_names property has not been implemented!")
@property
def default_priors(self) -> dict[str, Callable]:
"""
Dictionary of parameter names and callable functions to construct default priors for the model
Returns a dictionary with param_name: Callable key-value pairs. Used by the ``add_default_priors()`` method
to automatically add priors to the PyMC model.
"""
raise NotImplementedError("The default_priors property has not been implemented!")
@property
def coords(self) -> dict[str, Sequence[str]]:
"""
PyMC model coordinates
Returns a dictionary of dimension: coordinate key-value pairs, to be provided to ``pm.Model``. Dimensions
should come from the default names defined in ``statespace.utils.constants`` for them to be detected by
sampling methods.
"""
raise NotImplementedError("The coords property has not been implemented!")
@property
def param_dims(self) -> dict[str, Sequence[str]]:
"""
Dictionary of named dimensions for each model parameter
Returns a dictionary of param_name: dimension key-value pairs, to be provided to the ``dims`` argument of a
PyMC random variable. Dimensions should come from the default names defined in ``statespace.utils.constants``
for them to be detected by sampling methods.
"""
raise NotImplementedError("The param_dims property has not been implemented!")
def add_default_priors(self) -> None:
"""
Add default priors to the active PyMC model context
"""
raise NotImplementedError("The add_default_priors property has not been implemented!")
def make_and_register_variable(
self, name, shape: int | tuple[int, ...] | None = None, dtype=floatX
) -> pt.TensorVariable:
"""
Helper function to create a pytensor symbolic variable and register it in the _name_to_variable dictionary
Parameters
----------
name : str
The name of the placeholder variable. Must be the name of a model parameter.
shape : int or tuple of int
Shape of the parameter
dtype : str, default pytensor.config.floatX
dtype of the parameter
Notes
-----
Symbolic pytensor variables are used in the ``make_symbolic_graph`` method as placeholders for PyMC random
variables. The change is made in the ``_insert_random_variables`` method via ``pytensor.graph_replace``. To
make the change, a dictionary mapping pytensor variables to PyMC random variables needs to be constructed.
The purpose of this method is to:
1. Create the placeholder symbolic variables
2. Register the placeholder variable in the ``_name_to_variable`` dictionary
The shape provided here will define the shape of the prior that will need to be provided by the user.
An error is raised if the provided name has already been registered, or if the name is not present in the
``param_names`` property.
"""
if name not in self.param_names:
raise ValueError(
f"{name} is not a model parameter. All placeholder variables should correspond to model "
f"parameters."
)
if name in self._name_to_variable.keys():
raise ValueError(
f"{name} is already a registered placeholder variable with shape "
f"{self._name_to_variable[name].type.shape}"
)
placeholder = pt.tensor(name, shape=shape, dtype=dtype)
self._name_to_variable[name] = placeholder
return placeholder
def make_and_register_data(
self, name: str, shape: int | tuple[int], dtype: str = floatX
) -> Variable:
r"""
Helper function to create a pytensor symbolic variable and register it in the _name_to_data dictionary
Parameters
----------
name : str
The name of the placeholder data. Must be the name of an expected data variable.
shape : int or tuple of int
Shape of the parameter
dtype : str, default pytensor.config.floatX
dtype of the parameter
Notes
-----
See docstring for make_and_register_variable for more details. This function is similar, but handles data
inputs instead of model parameters.
An error is raised if the provided name has already been registered, or if the name is not present in the
``data_names`` property.
"""
if name not in self.data_names:
raise ValueError(
f"{name} is not a model parameter. All placeholder variables should correspond to model "
f"parameters."
)
if name in self._name_to_data.keys():
raise ValueError(
f"{name} is already a registered placeholder variable with shape "
f"{self._name_to_data[name].type.shape}"
)
placeholder = pt.tensor(name, shape=shape, dtype=dtype)
self._name_to_data[name] = placeholder
return placeholder
def make_symbolic_graph(self) -> None:
"""
The purpose of the make_symbolic_graph function is to hide tedious parameter allocations from the user.
In statespace models, it is extremely rare for an entire matrix to be defined by a single prior distribution.
Instead, users expect to place priors over single entries of the matrix. The purpose of this function is to
meet that expectation.
Every statespace model needs to implement this function.
Examples
--------
As an example, consider an ARMA(2,2) model, which has five parameters (excluding the initial state distribution):
2 AR parameters (:math:`\rho_1` and :math:`\rho_2`), 2 MA parameters (:math:`\theta_1` and :math:`theta_2`),
and a single innovation covariance (:math:`\\sigma`). A common way of writing this statespace is:
..math::
\begin{align}
T &= \begin{bmatrix} \rho_1 & 1 & 0 \\
\rho_2 & 0 & 1 \\
0 & 0 & 0
\\end{bmatrix} \\
R & = \begin{bmatrix} 1 \\ \theta_1 \\ \theta_2 \\end{bmatrix} \\
Q &= \begin{bmatrix} \\sigma \\end{bmatrix}
\\end{align}
To implement this model, we begin by creating the required matrices, and fill in the "fixed" values -- the ones
at position (0, 1) and (0, 2) in the T matrix, and at position (0, 0) in the R matrix. These are then saved
to the class's PytensorRepresentation -- called ``ssm``.
.. code:: python
T = np.eye(2, k=1)
R = np.concatenate([np.ones(1,1), np.zeros((2, 1))], axis=0)
self.ssm['transition'] = T
self.ssm['selection'] = R
Next, placeholders need to be inserted for the random variables rho_1, rho_2, theta_1, theta_2, and sigma.
This can be done many ways: we could define two vectors, rho and theta, and a scalar for sigma, or five
scalars. Whatever is chosen, the choice needs to be consistent with the ``param_names`` property.
Suppose the ``param_names`` are ``[rho, theta, sigma]``, then we make one placeholder for each, and insert it
into the correct ``ssm`` matrix, at the correct location. To create placeholders, use the
``make_and_register_variable`` helper method, which will maintain an internal registry of variables.
.. code:: python
rho_parmas = self.make_and_register_variable(name='rho', shape=(2,))
theta_params = self.make_and_register_variable(name='theta', shape=(2,))
sigma = self.make_and_register_variable(name='sigma', shape=(1,))
self.ssm['transition', :, 0] = rho_params
self.ssm['selection', 1:, 0] = theta_params
self.ssm['state_cov', 0, 0] = sigma
"""
raise NotImplementedError("The make_symbolic_statespace method has not been implemented!")
def _get_matrix_shape_and_dims(self, name: str) -> tuple[tuple[int] | None, tuple[str] | None]:
"""
Get the shape and dimensions of a matrix associated with the specified name.
This method retrieves the shape and dimensions of a matrix associated with the given name. Importantly,
it will only find named dimension if they are the "default" dimension names defined in the
statespace.utils.constant.py file.
Parameters
----------
name : str
The name of the matrix whose shape and dimensions need to be retrieved.
Returns
-------
shape: tuple or None
If no named dimension are found, the shape of the requested matrix, otherwise None.
dims: tuple or None
If named dimensions are found, a tuple of strings, otherwise None
"""
pm_mod = modelcontext(None)
dims = MATRIX_DIMS.get(name, None)
dims = dims if all([dim in pm_mod.coords.keys() for dim in dims]) else None
data_len = len(self._fit_data)
if name in self.kalman_filter.seq_names:
shape = (data_len, *self.ssm[SHORT_NAME_TO_LONG[name]].type.shape)
dims = (TIME_DIM, *dims)
else:
shape = self.ssm[SHORT_NAME_TO_LONG[name]].type.shape
shape = shape if dims is None else None
return shape, dims
def _save_exogenous_data_info(self):
"""
Store exogenous data required by posterior sampling functions
"""
pymc_mod = modelcontext(None)
for data_name in self.data_names:
data = pymc_mod[data_name]
self._exog_data_info[data_name] = {
"name": data_name,
"value": data.get_value(),
"dims": pymc_mod.named_vars_to_dims.get(data_name, None),
}
def _insert_random_variables(self):
"""
Replace pytensor symbolic variables with PyMC random variables.
Examples
--------
.. code:: python
ss_mod = pmss.BayesianSARIMA(order=(2, 0, 2), verbose=False, stationary_initialization=True)
with pm.Model():
x0 = pm.Normal('x0', size=ss_mod.k_states)
ar_params = pm.Normal('ar_params', size=ss_mod.p)
ma_parama = pm.Normal('ma_params', size=ss_mod.q)
sigma_state = pm.Normal('sigma_state')
ss_mod._insert_random_variables()
matrics = ss_mod.unpack_statespace()
pm.draw(matrices['transition'], random_seed=RANDOM_SEED)
>>> array([[-0.90590386, 1. , 0. ],
>>> [ 1.25190143, 0. , 1. ],
>>> [ 0. , 0. , 0. ]])
pm.draw(matrices['selection'], random_seed=RANDOM_SEED)
>>> array([[ 1. ],
>>> [-2.46741039],
>>> [-0.28947689]])
pm.draw(matrices['state_cov'], random_seed=RANDOM_SEED)
>>> array([[-1.69353533]])
"""
pymc_model = modelcontext(None)
found_params = []
with pymc_model:
for param_name in self.param_names:
param = getattr(pymc_model, param_name, None)
if param is not None:
found_params.append(param.name)
missing_params = list(set(self.param_names) - set(found_params))
if len(missing_params) > 0:
raise ValueError(
"The following required model parameters were not found in the PyMC model: "
+ ", ".join(missing_params)
)
excess_params = list(set(found_params) - set(self.param_names))
if len(excess_params) > 0:
raise ValueError(
"The following parameters were found in the PyMC model but are not required by the statespace model: "
+ ", ".join(excess_params)
)
matrices = list(self._unpack_statespace_with_placeholders())
replacement_dict = {var: pymc_model[name] for name, var in self._name_to_variable.items()}
self.subbed_ssm = graph_replace(matrices, replace=replacement_dict, strict=True)
def _insert_data_variables(self):
"""
Replace symbolic pytensor variables with PyMC data containers.
Only used when models require exogenous data. The observed data is not added to the model using this method!
"""
try:
data_names = self.data_names
except NotImplementedError:
return
pymc_model = modelcontext(None)
found_data = []
with pymc_model:
for data_name in data_names:
data = getattr(pymc_model, data_name, None)
if data is not None:
found_data.append(data.name)
missing_data = list(set(data_names) - set(found_data))
if len(missing_data) > 0:
raise ValueError(
"The following required exogenous data were not found in the PyMC model: "
+ ", ".join(missing_data)
)
replacement_dict = {data: pymc_model[name] for name, data in self._name_to_data.items()}
self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=True)
def _register_matrices_with_pymc_model(self) -> list[pt.TensorVariable]:
"""
Add all statespace matrices to the PyMC model currently on the context stack as pm.Deterministic nodes, and
adds named dimensions if they are found.
Returns
-------
registered_matrices: list of pt.TensorVariable
list of statespace matrices, wrapped in pm.Deterministic
"""
pm_mod = modelcontext(None)
matrices = self.unpack_statespace()
registered_matrices = []
for i, (matrix, name) in enumerate(zip(matrices, MATRIX_NAMES)):
time_varying_ndim = 2 if name in VECTOR_VALUED else 3
if not getattr(pm_mod, name, None):
shape, dims = self._get_matrix_shape_and_dims(name)
has_dims = dims is not None
if matrix.ndim == time_varying_ndim and has_dims:
dims = (TIME_DIM, *dims)
x = pm.Deterministic(name, matrix, dims=dims)
registered_matrices.append(x)
else:
registered_matrices.append(matrices[i])
return registered_matrices
@staticmethod
def _register_kalman_filter_outputs_with_pymc_model(outputs: tuple[pt.TensorVariable]) -> None:
mod = modelcontext(None)
coords = mod.coords
states, covs = outputs[:4], outputs[4:]
state_names = [
"filtered_state",
"predicted_state",
"predicted_observed_state",
"smoothed_state",
]
cov_names = [
"filtered_covariance",
"predicted_covariance",
"predicted_observed_covariance",
"smoothed_covariance",
]
with mod:
for var, name in zip(states + covs, state_names + cov_names):
dim_names = FILTER_OUTPUT_DIMS.get(name, None)
dims = tuple([dim if dim in coords.keys() else None for dim in dim_names])
pm.Deterministic(name, var, dims=dims)
def build_statespace_graph(
self,
data: np.ndarray | pd.DataFrame | pt.TensorVariable,
register_data: bool = True,
mode: str | None = None,
missing_fill_value: float | None = None,
cov_jitter: float | None = JITTER_DEFAULT,
save_kalman_filter_outputs_in_idata: bool = False,
) -> None:
"""
Given a parameter vector `theta`, constructs the full computational graph describing the state space model and
the associated log probability of the data. Hidden states and log probabilities are computed via the Kalman
Filter.
Parameters
----------
data : Union[np.ndarray, pd.DataFrame, pt.TensorVariable]
The observed data used to fit the state space model. It can be a NumPy array, a Pandas DataFrame,
or a Pytensor tensor variable.
register_data : bool, optional, default=True
If True, the observed data will be registered with PyMC as a pm.Data variable. In addition,
a "time" dim will be created an added to the model's coords.
mode : Optional[str], optional, default=None
The Pytensor mode used for the computation graph construction. If None, the default mode will be used.
Other options include "JAX" and "NUMBA".
missing_fill_value: float, optional, default=-9999
A value to mask in missing values. NaN values in the data need to be filled with an arbitrary value to
avoid triggering PyMC's automatic imputation machinery (missing values are instead filled by treating them
as hidden states during Kalman filtering).
In general this never needs to be set. But if by a wild coincidence your data includes the value -9999.0,
you will need to change the missing_fill_value to something else, to avoid incorrectly mark in
data as missing.
cov_jitter: float, default 1e-8 or 1e-6 if pytensor.config.floatX is float32
The Kalman filter is known to be numerically unstable, especially at half precision. This value is added to
the diagonal of every covariance matrix -- predicted, filtered, and smoothed -- at every step, to ensure
all matrices are strictly positive semi-definite.
Obviously, if this can be zero, that's best. In general:
- Having measurement error makes Kalman Filters more robust. A large source of numerical errors come
from the Filtered and Smoothed covariance matrices having a zero in the (0, 0) position, which always
occurs when there is no measurement error. You can lower this value in the presence of measurement
error.
- The Univariate Filter is more robust than other filters, and can tolerate a lower jitter value
save_kalman_filter_outputs_in_idata: bool, optional, default=False
If True, Kalman Filter outputs will be saved in the model as deterministics. Useful for debugging, but
should not be necessary for the majority of users.
"""
pm_mod = modelcontext(None)
self._insert_random_variables()
self._save_exogenous_data_info()
self._insert_data_variables()
obs_coords = pm_mod.coords.get(OBS_STATE_DIM, None)
self._fit_data = data
data, nan_mask = register_data_with_pymc(
data,
n_obs=self.ssm.k_endog,
obs_coords=obs_coords,
register_data=register_data,
missing_fill_value=missing_fill_value,
)
filter_outputs = self.kalman_filter.build_graph(
pt.as_tensor_variable(data),
*self.unpack_statespace(),
mode=mode,
missing_fill_value=missing_fill_value,
cov_jitter=cov_jitter,
)
logp = filter_outputs.pop(-1)
states, covs = filter_outputs[:3], filter_outputs[3:]
filtered_states, predicted_states, observed_states = states
filtered_covariances, predicted_covariances, observed_covariances = covs
if save_kalman_filter_outputs_in_idata:
smooth_states, smooth_covariances = self._build_smoother_graph(
filtered_states, filtered_covariances, self.unpack_statespace(), mode=mode
)
all_kf_outputs = [*states, smooth_states, *covs, smooth_covariances]
self._register_kalman_filter_outputs_with_pymc_model(all_kf_outputs)
obs_dims = FILTER_OUTPUT_DIMS["predicted_observed_state"]
obs_dims = obs_dims if all([dim in pm_mod.coords.keys() for dim in obs_dims]) else None
SequenceMvNormal(
"obs",
mus=observed_states,
covs=observed_covariances,
logp=logp,
observed=data,
dims=obs_dims,
)
self._fit_coords = pm_mod.coords.copy()
self._fit_dims = pm_mod.named_vars_to_dims.copy()
self._fit_mode = mode
def _build_smoother_graph(
self,
filtered_states: pt.TensorVariable,
filtered_covariances: pt.TensorVariable,
matrices,
mode: str | None = None,
cov_jitter=JITTER_DEFAULT,
):
"""
Build the computation graph for the Kalman smoother.
This method constructs the computation graph for applying the Kalman smoother to the filtered states
and covariances obtained from the Kalman filter. The Kalman smoother is used to generate smoothed
estimates of the latent states and their covariances in a state space model.
The Kalman smoother provides a more accurate estimate of the latent states by incorporating future
information in the backward pass, resulting in smoothed state trajectories.
Parameters
----------
filtered_states : pytensor.tensor.TensorVariable
The filtered states obtained from the Kalman filter. Returned by the `build_statespace_graph` method.
filtered_covariances : pytensor.tensor.TensorVariable
The filtered state covariances obtained from the Kalman filter. Returned by the `build_statespace_graph`
method.
mode : Optional[str], default=None
The mode used by pytensor for the construction of the logp graph. If None, the mode provided to
`build_statespace_graph` will be used.
Returns
-------
Tuple[pytensor.tensor.TensorVariable, pytensor.tensor.TensorVariable]
A tuple containing TensorVariables representing the smoothed states and smoothed state covariances
obtained from the Kalman smoother.
"""
pymc_model = modelcontext(None)
with pymc_model:
*_, T, Z, R, H, Q = matrices
smooth_states, smooth_covariances = self.kalman_smoother.build_graph(
T, R, Q, filtered_states, filtered_covariances, mode=mode, cov_jitter=cov_jitter
)
smooth_states.name = "smooth_states"
smooth_covariances.name = "smooth_covariances"
return smooth_states, smooth_covariances
def _build_dummy_graph(self) -> None:
"""
Build a dummy computation graph for the state space model matrices.
This method creates "dummy" pm.Flat variables representing the deep parameters used in the state space model.
Returns
-------
list[pm.Flat]
A list of pm.Flat variables representing all parameters estimated by the model.
"""
def infer_variable_shape(name):
shape = self._name_to_variable[name].type.shape
if not any(dim is None for dim in shape):
return shape
dim_names = self._fit_dims.get(name, None)
if dim_names is None:
raise ValueError(
f"Could not infer shape for {name}, because it was not given coords during model"
f"fitting"
)
shape_from_coords = tuple([len(self._fit_coords[dim]) for dim in dim_names])
return tuple(
[