Source code for pymc_marketing.mmm.hsgp

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""HSGP components."""

from __future__ import annotations

from collections.abc import Iterable
from enum import Enum
from typing import Literal, cast

import numpy as np
import numpy.typing as npt
import pymc as pm
import pytensor.tensor as pt
import xarray as xr
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from pydantic import BaseModel, Field, InstanceOf, model_validator, validate_call
from pymc.distributions.shape_utils import Dims
from pytensor.tensor import TensorLike
from pytensor.tensor.variable import TensorVariable
from typing_extensions import Self

from pymc_marketing.plot import SelToString, plot_curve
from pymc_marketing.prior import Prior, _get_transform, create_dim_handler


[docs] @validate_call def create_complexity_penalizing_prior( *, alpha: float = Field(0.1, gt=0, lt=1), lower: float = Field(1.0, gt=0), ) -> Prior: R"""Create prior that penalizes complexity for GP lengthscale. The prior is defined with the following property: .. math:: P[\ell < \text{lower}] = \alpha Where :math:`\ell` is the lengthscale Parameters ---------- alpha : float Tail probability. lower : float Lower bound for the lengthscale. Returns ------- Prior Weibull prior for the lengthscale with the given properties. References ---------- .. [1] Geir-Arne Fuglstad, Daniel Simpson, Finn Lindgren, Håvard Rue (2015). """ lam_ell = -pt.log(alpha) * (1.0 / pt.sqrt(lower)) return Prior( "Weibull", alpha=0.5, beta=1.0 / pt.square(lam_ell), transform="reciprocal", )
[docs] def create_constrained_inverse_gamma_prior( *, upper: float, lower: float = 1.0, mass: float = 0.9, ) -> Prior: """Create a lengthscale prior for the HSGP model. Parameters ---------- upper : float Upper bound for the lengthscale. lower : float Lower bound for the lengthscale. Default is 1.0. mass : float Mass of the lengthscales. Default is 0.9 or 90%. Returns ------- Prior The prior for the lengthscale. """ return Prior( "InverseGamma", ).constrain(lower=lower, upper=upper, mass=mass)
[docs] def approx_hsgp_hyperparams( x, x_center, lengthscale_range: tuple[float, float], cov_func: Literal["expquad", "matern32", "matern52"], ) -> tuple[int, float]: """Use heuristics for minimum `m` and `c` values. Based on recommendations from Ruitort-Mayol et. al. In practice, you need to choose `c` large enough to handle the largest lengthscales, and `m` large enough to accommodate the smallest lengthscales. NOTE: These recommendations are based on a one-dimensional GP. Parameters ---------- x : tensor_like The x values the HSGP will be evaluated over. x_center : tensor_like The center of the data. lengthscale_range : tuple[float, float] The range of the lengthscales. Should be a list with two elements [lengthscale_min, lengthscale_max]. cov_func : Literal["expquad", "matern32", "matern52"] The covariance function to use. Supported options are "expquad", "matern52", and "matern32". Returns ------- - `m` : int Number of basis vectors. Increasing it helps approximate smaller lengthscales, but increases computational cost. - `c` : float Scaling factor such that L = c * S, where L is the boundary of the approximation. Increasing it helps approximate larger lengthscales, but may require increasing m. Raises ------ ValueError If either `x_range` or `lengthscale_range` is not in the correct order. References ---------- .. [1] Ruitort-Mayol, G., Anderson, M., Solin, A., Vehtari, A. (2022). Practical Hilbert Space Approximate Bayesian Gaussian Processes for Probabilistic Programming """ lengthscale_min, lengthscale_max = lengthscale_range if lengthscale_min >= lengthscale_max: raise ValueError( "The boundaries are out of order. {lengthscale_min} should be less than {lengthscale_max}" ) Xs = x - x_center S = np.max(np.abs(Xs), axis=0) if cov_func.lower() == "expquad": a1, a2 = 3.2, 1.75 elif cov_func.lower() == "matern52": a1, a2 = 4.1, 2.65 elif cov_func.lower() == "matern32": a1, a2 = 4.5, 3.42 else: raise ValueError( "Unsupported covariance function. Supported options are 'expquad', 'matern52', and 'matern32'." ) c = max(a1 * (lengthscale_max / S), 1.2) m = int(a2 * c / (lengthscale_min / S)) return m, c
[docs] class CovFunc(str, Enum): """Supported covariance functions for the HSGP model.""" ExpQuad = "expquad" Matern52 = "matern52" Matern32 = "matern32"
[docs] def create_eta_prior(mass: float = 0.05, upper: float = 1.0) -> Prior: """Create prior for the variance. Parameters ---------- mass : float Mass of the variance. Default is 0.05 or 5%. upper : float Upper bound for the variance. Default is 1.0. """ return Prior( "Exponential", lam=-pt.log(mass) / upper, )
[docs] def create_m_and_L_recommendations( X: np.ndarray, X_mid: float, ls_lower: float = 1.0, ls_upper: float | None = None, cov_func: CovFunc = CovFunc.ExpQuad, ) -> tuple[int, float]: """Create recommendations for the number of basis functions based on the data. Parameters ---------- X : np.ndarray The data. X_mid : float The mean of the data. ls_lower : float Lower bound for the lengthscale. Default is 1.0. ls_upper : float, optional Upper bound for the lengthscale. Default is None. cov_func : CovFunc The covariance function. Default is CovFunc.ExpQuad. Returns ------- tuple[int, float] The number of basis functions and the boundary of the approximation. """ if ls_upper is None: ls_upper = 2 * X_mid else: ls_upper = ls_upper m, c = approx_hsgp_hyperparams( X, X_mid, lengthscale_range=(ls_lower, ls_upper), cov_func=cast(Literal["expquad", "matern32", "matern52"], cov_func), ) L = c * X_mid return m, L
[docs] class HSGPBase(BaseModel): """Shared logic between HSGP and HSGPPeriodic.""" m: int = Field(..., description="Number of basis functions") X: InstanceOf[TensorVariable] | InstanceOf[np.ndarray] | None = Field( None, description="The data to be used in the model", exclude=True, ) X_mid: float | None = Field(None, description="The mean of the training data") dims: Dims = Field(..., description="The dimensions of the variable") transform: str | None = Field( None, description=( "Optional transformation for the variable. " "Must be registered or from either pytensor.tensor " "or pymc.math namespaces." ), ) demeaned_basis: bool = Field( False, description="Whether each basis has its mean subtracted from it.", ) @model_validator(mode="after") def _transform_is_valid(self) -> Self: if self.transform is None: return self # Not storing but will get again later _get_transform(self.transform) return self
[docs] @staticmethod def deterministics_to_replace(name: str) -> list[str]: """Name of the Deterministic variables that are replaced with pm.Flat for out-of-sample. This is required for out-of-sample predictions for some HSGP variables. Replacing with pm.Flat will sample from the values learned in the training data. """ return []
[docs] def register_data(self, X: TensorLike) -> Self: """Register the data to be used in the model. To be used before creating a variable but not for out-of-sample prediction. For out-of-sample prediction, use `pm.Data` and `pm.set_data`. Parameters ---------- X : tensor_like The data to be used in the model. Returns ------- Self The object with the data registered. """ self.X = pt.as_tensor_variable(X) return self
@model_validator(mode="after") def _register_user_input_X(self) -> Self: if self.X is None: return self return self.register_data(self.X) @model_validator(mode="after") def _dim_is_at_least_one(self) -> Self: if isinstance(self.dims, str): self.dims = (self.dims,) return self if isinstance(self.dims, tuple) and len(self.dims) < 1: raise ValueError("At least one dimension is required") if any(not isinstance(dim, str) for dim in self.dims): raise ValueError("All dimensions must be strings") return self
[docs] def create_variable(self, name: str) -> TensorVariable: """Create a variable from configuration.""" raise NotImplementedError
[docs] def to_dict(self) -> dict: """Convert the object to a dictionary. Returns ------- dict The object as a dictionary. """ data = self.model_dump() def handle_prior(value): return value if not hasattr(value, "to_dict") else value.to_dict() return {key: handle_prior(value) for key, value in data.items()}
[docs] def sample_prior( self, coords: dict | None = None, **sample_prior_predictive_kwargs, ) -> xr.Dataset: """Sample from the prior distribution. Parameters ---------- coords : dict, optional The coordinates for the prior. By default it is None. sample_prior_predictive_kwargs Additional keyword arguments for `pm.sample_prior_predictive`. Returns ------- xr.Dataset The prior distribution. """ coords = coords or {} with pm.Model(coords=coords) as model: self.create_variable("f") return pm.sample_prior_predictive( model=model, **sample_prior_predictive_kwargs, ).prior
[docs] def plot_curve( self, curve: xr.DataArray, n_samples: int = 10, hdi_probs: float | list[float] | None = None, random_seed: np.random.Generator | None = None, subplot_kwargs: dict | None = None, sample_kwargs: dict | None = None, hdi_kwargs: dict | None = None, axes: npt.NDArray[Axes] | None = None, same_axes: bool = False, colors: Iterable[str] | None = None, legend: bool | None = None, sel_to_string: SelToString | None = None, ) -> tuple[Figure, npt.NDArray[Axes]]: """Plot the curve. Parameters ---------- curve : xr.DataArray Curve to plot n_samples : int, optional Number of samples hdi_probs : float | list[float], optional HDI probabilities. Defaults to None which uses arviz default for stats.ci_prob which is 94% random_seed : int | random number generator, optional Random number generator. Defaults to None subplot_kwargs : dict, optional Additional kwargs to while creating the fig and axes sample_kwargs : dict, optional Kwargs for the :func:`plot_samples` function hdi_kwargs : dict, optional Kwargs for the :func:`plot_hdi` function same_axes : bool If all of the plots are on the same axis colors : Iterable[str], optional Colors for the plots legend : bool, optional If to include a legend. Defaults to True if same_axes sel_to_string : Callable[[Selection], str], optional Function to convert selection to a string. Defaults to ", ".join(f"{key}={value}" for key, value in sel.items()) Returns ------- tuple[plt.Figure, npt.NDArray[plt.Axes]] Figure and the axes """ first_dim: str = cast(tuple[str, ...], self.dims)[0] return plot_curve( curve, non_grid_names={first_dim}, n_samples=n_samples, hdi_probs=hdi_probs, random_seed=random_seed, subplot_kwargs=subplot_kwargs, sample_kwargs=sample_kwargs, hdi_kwargs=hdi_kwargs, axes=axes, same_axes=same_axes, colors=colors, legend=legend, sel_to_string=sel_to_string, )
[docs] class HSGP(HSGPBase): """HSGP component. Examples -------- Literature recommended HSGP configuration: .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGP seed = sum(map(ord, "Out of the box GP")) rng = np.random.default_rng(seed) n = 52 X = np.arange(n) hsgp = HSGP.parameterize_from_data( X=X, dims="time", ) dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = { "time": dates, } prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] hsgp.plot_curve(curve, random_seed=rng) plt.show() Using a demeaned basis to remove "intercept" effect of first basis: .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import xarray as xr import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGP from pymc_marketing.plot import plot_curve seed = sum(map(ord, "Out of the box GP")) rng = np.random.default_rng(seed) n = 52 X = np.arange(n) kwargs = dict(X=X, ls=25, eta=1, dims="time", m=200, L=150, drop_first=False) hsgp = HSGP(demeaned_basis=False, **kwargs) hsgp_demeaned = HSGP(demeaned_basis=True, **kwargs) dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = {"time": dates} def sample_curve(hsgp): return hsgp.sample_prior(coords=coords, random_seed=rng)["f"] non_demeaned = sample_curve(hsgp).rename("False") demeaned = sample_curve(hsgp_demeaned).rename("True") combined = xr.merge([non_demeaned, demeaned]).to_array("demeaned") _, axes = combined.pipe(plot_curve, "time", same_axes=True) axes[0].set(title="Demeaned the intercepty first basis") plt.show() HSGP with different covariance function .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGP seed = sum(map(ord, "Change of covariance function")) rng = np.random.default_rng(seed) n = 52 X = np.arange(n) hsgp = HSGP.parameterize_from_data( X=X, cov_func="matern32", dims="time", ) dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = { "time": dates, } prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] hsgp.plot_curve(curve, random_seed=rng) plt.show() HSGP with different link function via `transform` argument .. note:: The `transform` parameter must be registered or from either `pytensor.tensor` or `pymc.math` namespaces. See the :func:`pymc_marketing.prior.register_tensor_transform` .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGP seed = sum(map(ord, "Change of covariance function")) rng = np.random.default_rng(seed) n = 52 X = np.arange(n) hsgp = HSGP.parameterize_from_data( X=X, dims="time", transform="sigmoid", ) dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = { "time": dates, } prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] hsgp.plot_curve(curve, random_seed=rng) plt.show() New data predictions with HSGP .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import pymc as pm import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGP from pymc_marketing.prior import Prior seed = sum(map(ord, "New data predictions")) rng = np.random.default_rng(seed) eta = Prior("Exponential", lam=1) ls = Prior("InverseGamma", alpha=2, beta=1) hsgp = HSGP( eta=eta, ls=ls, m=20, L=150, dims=("time", "channel"), ) n = 52 X = np.arange(n) dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = {"time": dates, "channel": ["A", "B"]} with pm.Model(coords=coords) as model: data = pm.Data("data", X, dims="time") hsgp.register_data(data).create_variable("f") idata = pm.sample_prior_predictive(random_seed=rng) prior = idata.prior n_new = 10 X_new = np.arange(n, n + n_new) new_dates = pd.date_range("2023-01-01", periods=n_new, freq="W-MON") with model: pm.set_data( new_data={ "data": X_new, }, coords={"time": new_dates}, ) post = pm.sample_posterior_predictive( prior, var_names=["f"], random_seed=rng, ) chain, draw = 0, 50 colors = ["C0", "C1"] def get_sample(curve): return curve.loc[chain, draw].to_series().unstack() ax = prior["f"].pipe(get_sample).plot(color=colors) post.posterior_predictive["f"].pipe(get_sample).plot( ax=ax, color=colors, linestyle="--", legend=False ) ax.set(xlabel="time", ylabel="f", title="New data predictions") plt.show() Higher dimensional HSGP .. plot:: :include-source: True :context: reset import numpy as np import pymc as pm import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGP seed = sum(map(ord, "Higher dimensional HSGP")) rng = np.random.default_rng(seed) n = 52 X = np.arange(n) hsgp = HSGP.parameterize_from_data( X=X, dims=("time", "channel", "product"), ) coords = { "time": range(n), "channel": ["A", "B"], "product": ["X", "Y", "Z"], } prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] fig, _ = hsgp.plot_curve( curve, random_seed=rng, subplot_kwargs={"figsize": (12, 8), "ncols": 3}, ) fig.suptitle("Higher dimensional HSGP prior") plt.show() """ ls: InstanceOf[Prior] | float = Field(..., description="Prior for the lengthscales") eta: InstanceOf[Prior] | float = Field(..., description="Prior for the variance") L: float = Field(..., gt=0, description="Extent of basis functions") centered: bool = Field(False, description="Whether the model is centered or not") drop_first: bool = Field( True, description="Whether to drop the first basis function" ) cov_func: CovFunc = Field(CovFunc.ExpQuad, description="The covariance function") @model_validator(mode="after") def _ls_is_scalar_prior(self) -> Self: if not isinstance(self.ls, Prior): return self if self.ls.dims != (): raise ValueError("The lengthscale prior must be scalar random variable.") return self @model_validator(mode="after") def _eta_is_scalar_prior(self) -> Self: if not isinstance(self.eta, Prior): return self if self.eta.dims != (): raise ValueError("The eta prior must be scalar random variable.") return self
[docs] @classmethod def parameterize_from_data( cls, X: TensorLike | np.ndarray, dims: Dims, X_mid: float | None = None, eta_mass: float = 0.05, eta_upper: float = 1.0, ls_lower: float = 1.0, ls_upper: float | None = None, ls_mass: float = 0.9, cov_func: CovFunc = CovFunc.ExpQuad, centered: bool = False, drop_first: bool = True, transform: str | None = None, demeaned_basis: bool = False, ) -> HSGP: """Create a HSGP informed by the data with literature-based recommendations.""" eta = create_eta_prior(mass=eta_mass, upper=eta_upper) if ls_upper is None: ls = create_complexity_penalizing_prior( lower=ls_lower, alpha=ls_mass, ) else: ls = create_constrained_inverse_gamma_prior( lower=ls_lower, upper=ls_upper, mass=ls_mass, ) numeric_X = None if isinstance(X, np.ndarray): numeric_X = np.asarray(X) elif isinstance(X, TensorVariable): numeric_X = X.get_value(borrow=False) else: raise ValueError( "X must be a NumPy array (or list) or a TensorVariable. " "If it's a plain symbolic tensor, you must manually specify m, L." ) numeric_X = np.asarray(numeric_X) if X_mid is None: X_mid = float(numeric_X.mean()) m, L = create_m_and_L_recommendations( numeric_X, X_mid, ls_lower=ls_lower, ls_upper=ls_upper, cov_func=cov_func, ) return cls( ls=ls, eta=eta, m=m, L=L, X=X, # store the original reference (even if symbolic) X_mid=X_mid, cov_func=cov_func, dims=dims, centered=centered, drop_first=drop_first, transform=transform, demeaned_basis=demeaned_basis, )
[docs] def create_variable(self, name: str) -> TensorVariable: """Create a variable from HSGP configuration. Parameters ---------- name : str The name of the variable. Returns ------- pt.TensorVariable The variable created from the HSGP configuration. """ def add_suffix(suffix: str) -> str: if self.transform is None: return f"{name}_{suffix}" return f"{name}_raw_{suffix}" if self.X is None: raise ValueError("The data must be registered before creating a variable.") if self.X_mid is None: self.X_mid = float(self.X.mean().eval()) model = pm.modelcontext(None) coord_name: str = add_suffix("m") model.add_coord( coord_name, np.arange(self.m - 1 if self.drop_first else self.m), ) cov_funcs = { "expquad": pm.gp.cov.ExpQuad, "matern52": pm.gp.cov.Matern52, "matern32": pm.gp.cov.Matern32, } eta = ( self.eta if not hasattr(self.eta, "create_variable") else self.eta.create_variable(add_suffix("eta")) ) ls = ( self.ls if not hasattr(self.ls, "create_variable") else self.ls.create_variable(add_suffix("ls")) ) cov_func = eta**2 * cov_funcs[self.cov_func.lower()](input_dim=1, ls=ls) gp = pm.gp.HSGP( m=[self.m], L=[self.L], cov_func=cov_func, drop_first=self.drop_first, ) phi, sqrt_psd = gp.prior_linearized( self.X[:, None] - self.X_mid, ) if self.demeaned_basis: phi = phi - phi.mean(axis=0).eval() first_dim, *rest_dims = self.dims hsgp_dims: Dims = (*rest_dims, coord_name) hsgp_coefs = Prior( "Normal", mu=0, sigma=sqrt_psd, dims=hsgp_dims, centered=self.centered, ).create_variable(add_suffix("hsgp_coefs")) # (date, m-1) and (*rest_dims, m-1) -> (date, *rest_dims) if len(rest_dims) <= 1: f = phi @ hsgp_coefs.T else: result_dims = (first_dim, coord_name, *rest_dims) dim_handler = create_dim_handler(desired_dims=result_dims) f = ( dim_handler(phi, (first_dim, coord_name)) * dim_handler( hsgp_coefs, hsgp_dims, ) ).sum(axis=1) if self.transform is not None: raw_name = f"{name}_raw" f = pm.Deterministic(raw_name, f, dims=self.dims) transform = _get_transform(self.transform) f = pm.Deterministic(name, transform(f), dims=self.dims) else: f = pm.Deterministic(name, f, dims=self.dims) return f
[docs] @classmethod def from_dict(cls, data) -> HSGP: """Create an object from a dictionary. Parameters ---------- data : dict The data to create the object from. Returns ------- HSGP The object created from the data. """ for key in ["eta", "ls"]: if isinstance(data[key], dict): data[key] = Prior.from_dict(data[key]) return cls(**data)
[docs] class PeriodicCovFunc(str, Enum): """Supported covariance functions for the HSGP model.""" Periodic = "periodic"
[docs] class HSGPPeriodic(HSGPBase): """HSGP component for periodic data. Examples -------- HSGPPeriodic with default configuration: .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGPPeriodic from pymc_marketing.prior import Prior seed = sum(map(ord, "Periodic GP")) rng = np.random.default_rng(seed) n = 52 * 3 dates = pd.date_range("2023-01-01", periods=n, freq="W-MON") X = np.arange(n) coords = { "time": dates, } scale = Prior("HalfNormal", sigma=1) ls = Prior("InverseGamma", alpha=2, beta=1) hsgp = HSGPPeriodic( scale=scale, m=20, cov_func="periodic", ls=ls, period=52, dims="time", ) hsgp.register_data(X) prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] fig, axes = hsgp.plot_curve( curve, n_samples=3, random_seed=rng, ) ax = axes[0] ax.set(xlabel="Date", ylabel="f", title="HSGP with period of 52 weeks") plt.show() HSGPPeriodic with link function via `transform` argument .. note:: The `transform` parameter must be registered or from either `pytensor.tensor` or `pymc.math` namespaces. See the :func:`pymc_marketing.prior.register_tensor_transform` .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGPPeriodic from pymc_marketing.prior import Prior seed = sum(map(ord, "Periodic GP")) rng = np.random.default_rng(seed) n = 52 * 3 dates = pd.date_range("2023-01-01", periods=n, freq="W-MON") X = np.arange(n) coords = { "time": dates, } scale = Prior("Gamma", mu=0.25, sigma=0.1) ls = Prior("InverseGamma", alpha=2, beta=1) hsgp = HSGPPeriodic( scale=scale, m=20, cov_func="periodic", ls=ls, period=52, dims="time", transform="exp", ) hsgp.register_data(X) prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] fig, axes = hsgp.plot_curve( curve, n_samples=3, random_seed=rng, ) ax = axes[0] ax.set(xlabel="Date", ylabel="f", title="HSGP with period of 52 weeks") plt.show() Demeaned basis for HSGPPeriodic .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import xarray as xr import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGPPeriodic from pymc_marketing.plot import plot_curve seed = sum(map(ord, "Periodic GP")) rng = np.random.default_rng(seed) scale = 0.25 ls = 1 kwargs = dict(ls=ls, scale=scale, period=52, cov_func="periodic", dims="time", m=20) n = 52 * 3 dates = pd.date_range("2023-01-01", periods=n, freq="W-MON") X = np.arange(n) coords = {"time": dates} hsgp = HSGPPeriodic(demeaned_basis=False, **kwargs).register_data(X) hsgp_demeaned = HSGPPeriodic(demeaned_basis=True, **kwargs).register_data(X) def sample_curve(hsgp): return hsgp.sample_prior(coords=coords, random_seed=rng)["f"] non_demeaned = sample_curve(hsgp).rename("False") demeaned = sample_curve(hsgp_demeaned).rename("True") combined = xr.merge([non_demeaned, demeaned]).to_array("demeaned") _, axes = combined.pipe(plot_curve, "time", same_axes=True) axes[0].set(title="Demeaned the intercepty first basis") plt.show() Higher dimensional HSGPPeriodic with periodic data .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import pymc as pm import matplotlib.pyplot as plt from pymc_marketing.mmm import HSGPPeriodic from pymc_marketing.prior import Prior seed = sum(map(ord, "Higher dimensional HSGP with periodic data")) rng = np.random.default_rng(seed) n = 52 * 3 dates = pd.date_range("2023-01-01", periods=n, freq="W-MON") X = np.arange(n) scale = Prior("HalfNormal", sigma=1) ls = Prior("InverseGamma", alpha=2, beta=1) hsgp = HSGPPeriodic( X=X, scale=scale, ls=ls, m=20, cov_func="periodic", period=52, dims=("time", "channel", "product"), ) coords = { "time": dates, "channel": ["A", "B"], "product": ["X", "Y", "Z"], } prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] fig, axes = hsgp.plot_curve( curve, n_samples=3, random_seed=rng, subplot_kwargs={"figsize": (12, 8), "ncols": 3}, ) plt.show() """ ls: InstanceOf[Prior] | float = Field(..., description="Prior for the lengthscale") scale: InstanceOf[Prior] | float = Field(..., description="Prior for the scale") cov_func: PeriodicCovFunc = Field( PeriodicCovFunc.Periodic, description="The covariance function", ) period: float = Field(..., description="The period of the function") @model_validator(mode="after") def _ls_is_scalar_prior(self) -> Self: if not isinstance(self.ls, Prior): return self if self.ls.dims != (): raise ValueError("The lengthscale prior must be scalar random variable.") return self @model_validator(mode="after") def _scale_is_scalar_prior(self) -> Self: if not isinstance(self.scale, Prior): return self if self.scale.dims != (): raise ValueError("The scale prior must be scalar random variable.") return self
[docs] def create_variable(self, name: str) -> TensorVariable: """Create HSGP variable. Parameters ---------- name : str The name of the variable. Returns ------- pt.TensorVariable The variable created from the HSGP configuration. """ def add_suffix(suffix: str) -> str: if self.transform is None: return f"{name}_{suffix}" return f"{name}_raw_{suffix}" if self.X is None: raise ValueError("The data must be registered before creating a variable.") if self.X_mid is None: self.X_mid = float(self.X.mean().eval()) scale = ( self.scale if not hasattr(self.scale, "create_variable") else self.scale.create_variable(add_suffix("scale")) ) ls = ( self.ls if not hasattr(self.ls, "create_variable") else self.ls.create_variable(add_suffix("ls")) ) cov_func = pm.gp.cov.Periodic(1, period=self.period, ls=ls) gp = pm.gp.HSGPPeriodic(m=self.m, scale=scale, cov_func=cov_func) (phi_cos, phi_sin), psd = gp.prior_linearized( self.X[:, None] - self.X_mid, ) if self.demeaned_basis: phi_cos = phi_cos - phi_cos.mean(axis=0).eval() phi_sin = phi_sin - phi_sin.mean(axis=0).eval() model = pm.modelcontext(None) coord_name: str = add_suffix("m") model.add_coord(coord_name, np.arange((self.m * 2) - 1)) first_dim, *rest_dims = self.dims hsgp_dims: Dims = (*rest_dims, coord_name) sigma = pt.concatenate([psd, psd[..., 1:]]) hsgp_coefs = Prior( "Normal", mu=0, sigma=sigma, dims=hsgp_dims, centered=False, ).create_variable(add_suffix("hsgp_coefs")) phi = pt.concatenate([phi_cos, phi_sin[..., 1:]], axis=1) if len(rest_dims) <= 1: f = phi @ hsgp_coefs.T else: result_dims = (first_dim, coord_name, *rest_dims) dim_handler = create_dim_handler(desired_dims=result_dims) f = ( dim_handler(phi, (first_dim, coord_name)) * dim_handler( hsgp_coefs, hsgp_dims, ) ).sum(axis=1) if self.transform is not None: raw_name = f"{name}_raw" f = pm.Deterministic(raw_name, f, dims=self.dims) transform = _get_transform(self.transform) f = pm.Deterministic(name, transform(f), dims=self.dims) else: f = pm.Deterministic(name, f, dims=self.dims) return f
[docs] @classmethod def from_dict(cls, data) -> HSGPPeriodic: """Create an object from a dictionary. Parameters ---------- data : dict The data to create the object from. Returns ------- HSGPPeriodic The object created from the data. """ for key in ["scale", "ls"]: if isinstance(data[key], dict): data[key] = Prior.from_dict(data[key]) return cls(**data)
[docs] class SoftPlusHSGP(HSGP): """HSGP with softplus transformation. The use of the softplus transformation centers the data around 1 and keeps the values positive. Examples -------- Literature recommended SoftPlusHSGP configuration: .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import matplotlib.pyplot as plt from pymc_marketing.mmm import SoftPlusHSGP seed = sum(map(ord, "Out of the box GP")) rng = np.random.default_rng(seed) n = 52 X = np.arange(n) hsgp = SoftPlusHSGP.parameterize_from_data( X=X, dims="time", ) dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = { "time": dates, } prior = hsgp.sample_prior(coords=coords, random_seed=rng) curve = prior["f"] hsgp.plot_curve(curve, random_seed=rng) plt.show() New data predictions with SoftPlusHSGP .. note:: In making the predictions, the model needs to be transformed in order to keep the data centered around 1. There is a helper function :func:`pymc_marketing.model_graph.deterministics_to_flat` that can be used to transform the model upon out of sample predictions. This transformation is automatically handled in the provided MMMs. .. plot:: :include-source: True :context: reset import numpy as np import pandas as pd import pymc as pm import matplotlib.pyplot as plt from pymc_marketing.mmm import SoftPlusHSGP from pymc_marketing.model_graph import deterministics_to_flat from pymc_marketing.prior import Prior seed = sum(map(ord, "New data predictions with SoftPlusHSGP")) rng = np.random.default_rng(seed) eta = Prior("Exponential", lam=1) ls = Prior("InverseGamma", alpha=2, beta=1) hsgp = SoftPlusHSGP( eta=eta, ls=ls, m=20, L=150, dims=("time", "channel"), ) n = 52 X = np.arange(n) channels = ["A", "B", "C"] dates = pd.date_range("2022-01-01", periods=n, freq="W-MON") coords = {"time": dates, "channel": channels} with pm.Model(coords=coords) as model: data = pm.Data("data", X, dims="time") hsgp.register_data(data).create_variable("f") idata = pm.sample_prior_predictive(random_seed=rng) prior = idata.prior n_new = 10 X_new = np.arange(n - 1 , n + n_new) last_date = dates[-1] new_dates = pd.date_range(last_date, periods=n_new + 1, freq="W-MON") with deterministics_to_flat(model, hsgp.deterministics_to_replace("f")): pm.set_data( new_data={ "data": X_new, }, coords={"time": new_dates}, ) post = pm.sample_posterior_predictive( prior, var_names=["f"], random_seed=rng, ) chain, draw = 0, rng.choice(prior.sizes["draw"]) colors = [f"C{i}" for i in range(len(channels))] def get_sample(curve): return curve.loc[chain, draw].to_series().unstack() ax = prior["f"].pipe(get_sample).plot(color=colors) post.posterior_predictive["f"].pipe(get_sample).plot( ax=ax, color=colors, linestyle="--", legend=False ) ax.set(xlabel="time", ylabel="f", title="New data predictions") plt.show() """
[docs] @staticmethod def deterministics_to_replace(name: str) -> list[str]: """Name of the Deterministic variables that are replaced with pm.Flat for out-of-sample. This is required for in order to keep out-of-sample predictions use mean of 1.0 from the training set. Without this, the training and test data will not be continuous, showing a jump from the training data to the test data. """ return [f"{name}_f_mean"]
[docs] def create_variable(self, name: str) -> TensorVariable: """Create the variable.""" f = super().create_variable(f"{name}_raw") f = pt.softplus(f) _, *f_mean_dims = self.dims f_mean_name = f"{name}_f_mean" f_mean = pm.Deterministic(f_mean_name, f.mean(axis=0), dims=f_mean_dims) centered_f = f - f_mean + 1 return pm.Deterministic(name, centered_f, dims=self.dims)