|
| 1 | +import aesara |
| 2 | +import numpy as np |
| 3 | +import scipy.interpolate |
| 4 | +from aesara.graph.op import Op, Apply |
| 5 | +import aesara.tensor as at |
| 6 | +import aesara.sparse |
| 7 | + |
| 8 | + |
| 9 | +def numpy_bspline_basis(eval_points: np.ndarray, k: int, degree=3): |
| 10 | + k_knots = k + degree + 1 |
| 11 | + knots = np.linspace(0, 1, k_knots - 2 * degree) |
| 12 | + knots = np.r_[[0] * degree, knots, [1] * degree] |
| 13 | + basis_funcs = scipy.interpolate.BSpline(knots, np.eye(k), k=degree) |
| 14 | + Bx = basis_funcs(eval_points).astype(eval_points.dtype) |
| 15 | + return Bx |
| 16 | + |
| 17 | + |
| 18 | +class BSplineBasis(Op): |
| 19 | + __props__ = ("sparse",) |
| 20 | + |
| 21 | + def __init__(self, sparse=True) -> None: |
| 22 | + super().__init__() |
| 23 | + if not isinstance(sparse, bool): |
| 24 | + raise TypeError("sparse should be True or False") |
| 25 | + self.sparse = sparse |
| 26 | + |
| 27 | + def make_node(self, *inputs) -> Apply: |
| 28 | + eval_points, k, d = map(at.as_tensor, inputs) |
| 29 | + if not (eval_points.ndim == 1 and np.issubdtype(eval_points.dtype, np.floating)): |
| 30 | + raise TypeError("eval_points should be a vector of floats") |
| 31 | + if not k.type in at.int_types: |
| 32 | + raise TypeError("k should be integer") |
| 33 | + if not d.type in at.int_types: |
| 34 | + raise TypeError("degree should be integer") |
| 35 | + if self.sparse: |
| 36 | + out_type = aesara.sparse.SparseTensorType("csr", eval_points.dtype)() |
| 37 | + else: |
| 38 | + out_type = aesara.tensor.matrix(dtype=eval_points.dtype) |
| 39 | + return Apply(self, [eval_points, k, d], [out_type]) |
| 40 | + |
| 41 | + def perform(self, node, inputs, output_storage, params=None) -> None: |
| 42 | + eval_points, k, d = inputs |
| 43 | + Bx = numpy_bspline_basis(eval_points, int(k), int(d)) |
| 44 | + if self.sparse: |
| 45 | + Bx = scipy.sparse.csr_matrix(Bx, dtype=eval_points.dtype) |
| 46 | + output_storage[0][0] = Bx |
| 47 | + |
| 48 | + def infer_shape(self, fgraph, node, ins_shapes): |
| 49 | + return [(node.inputs[0].shape[0], node.inputs[1])] |
| 50 | + |
| 51 | + |
| 52 | +def bspline_basis(n, k, degree=3, dtype=None, sparse=True): |
| 53 | + dtype = dtype or aesara.config.floatX |
| 54 | + eval_points = np.linspace(0, 1, n, dtype=dtype) |
| 55 | + return BSplineBasis(sparse=sparse)(eval_points, k, degree) |
| 56 | + |
| 57 | + |
| 58 | +def bspline_interpolation(x, *, n=None, eval_points=None, degree=3, sparse=True): |
| 59 | + """Interpolate sparse grid to dense grid using bsplines. |
| 60 | +
|
| 61 | + Parameters |
| 62 | + ---------- |
| 63 | + x : Variable |
| 64 | + Input Variable to interpolate. |
| 65 | + 0th coordinate assumed to be mapped regularly on [0, 1] interval |
| 66 | + n : int (optional) |
| 67 | + Resolution of interpolation |
| 68 | + eval_points : vector (optional) |
| 69 | + Custom eval points in [0, 1] interval (or scaled properly using min/max scaling) |
| 70 | + degree : int, optional |
| 71 | + BSpline degree, by default 3 |
| 72 | + sparse : bool, optional |
| 73 | + Use sparse operation, by default True |
| 74 | +
|
| 75 | + Returns |
| 76 | + ------- |
| 77 | + Variable |
| 78 | + The interpolated variable, interpolation is across 0th axis |
| 79 | +
|
| 80 | + Examples |
| 81 | + -------- |
| 82 | + >>> import pymc as pm |
| 83 | + >>> import numpy as np |
| 84 | + >>> half_months = np.linspace(0, 365, 12*2) |
| 85 | + >>> with pm.Model(coords=dict(knots_time=half_months, time=np.arange(365))) as model: |
| 86 | + ... kernel = pm.gp.cov.ExpQuad(1, ls=365/12) |
| 87 | + ... # ready to define gp (a latent process over parameters) |
| 88 | + ... gp = pm.gp.gp.Latent( |
| 89 | + ... cov_func=kernel |
| 90 | + ... ) |
| 91 | + ... y_knots = gp.prior("y_knots", half_months[:, None], dims="knots_time") |
| 92 | + ... y = pm.Deterministic( |
| 93 | + ... "y", |
| 94 | + ... bspline_interpolation(y_knots, n=365, degree=3), |
| 95 | + ... dims="time" |
| 96 | + ... ) |
| 97 | + ... trace = pm.sample_prior_predictive(1) |
| 98 | +
|
| 99 | + Notes |
| 100 | + ----- |
| 101 | + Adopted from `BayesAlpha <https://github.com/quantopian/bayesalpha/blob/676f4f194ad20211fd040d3b0c6e82969aafb87e/bayesalpha/dists.py#L97>`_ |
| 102 | + where it was written by @aseyboldt |
| 103 | + """ |
| 104 | + x = at.as_tensor(x) |
| 105 | + if n is not None and eval_points is not None: |
| 106 | + raise ValueError("Please provide one of n or eval_points") |
| 107 | + elif n is not None: |
| 108 | + eval_points = np.linspace(0, 1, n, dtype=x.dtype) |
| 109 | + elif eval_points is None: |
| 110 | + raise ValueError("Please provide one of n or eval_points") |
| 111 | + basis = BSplineBasis(sparse=sparse)(eval_points, x.shape[0], degree) |
| 112 | + if sparse: |
| 113 | + return aesara.sparse.dot(basis, x) |
| 114 | + else: |
| 115 | + return aesara.tensor.dot(basis, x) |
0 commit comments