Source code for pymc_marketing.mmm.tvp

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Time Varying Gaussian Process Multiplier for Marketing Mix Modeling (MMM).

Designed to model time-varying effects in marketing mix models (MMM).

This module provides a time-varying Gaussian Process (GP) multiplier,
using the Hilbert Space Gaussian Process (HSGP) approximation.

Examples
--------
Create a basic PyMC model using the time-varying GP multiplier:

.. code-block:: python

    import numpy as np
    import pymc as pm
    import pandas as pd

    from pymc_marketing.hsgp_kwargs import HSGPKwargs
    from pymc_marketing.mmm.tvp import (
        create_time_varying_gp_multiplier,
        infer_time_index,
    )

    # Generate example data
    np.random.seed(0)
    dates = pd.Series(pd.date_range(start="2020-01-01", periods=365))
    sales = np.random.normal(100, 10, size=len(dates))

    # Infer time index
    time_index = infer_time_index(dates, dates, time_resolution=5)

    # Define model configuration
    hsgp_kwargs = HSGPKwargs(
        m=200,
        L=None,
        eta_lam=1,
        ls_mu=10,
        ls_sigma=5,
        cov_func=None,
    )

    coords = {"time": dates}
    with pm.Model(coords=coords) as model:
        # Shared time index variable
        time_index_shared = pm.Data("time_index", time_index)

        # Base parameter
        base_sales = pm.Normal("base_sales", mu=100, sigma=10)

        # Time-varying GP multiplier
        varying_coefficient = create_time_varying_gp_multiplier(
            name="sales",
            dims="time",
            time_index=time_index_shared,
            time_index_mid=int(len(dates) / 2),
            time_resolution=5,
            hsgp_kwargs=hsgp_kwargs,
        )

        # Final sales parameter
        sales_estimated = base_sales * varying_coefficient

        # Likelihood
        pm.Normal("obs", mu=sales_estimated, sigma=10, observed=sales)

    # Sample from the model
    with model:
        trace = pm.sample()

    # Plot results
    import matplotlib.pyplot as plt

    pm.plot_trace(trace, var_names=["base_sales"])
    plt.show()

"""

import numpy as np
import numpy.typing as npt
import pandas as pd
import pytensor.tensor as pt
from pymc.distributions.shape_utils import Dims

from pymc_marketing.constants import DAYS_IN_YEAR
from pymc_marketing.hsgp_kwargs import HSGPKwargs
from pymc_marketing.mmm.hsgp import CovFunc, SoftPlusHSGP
from pymc_marketing.prior import Prior


def _create_hsgp_instance(
    X,
    X_mid,
    dims: Dims,
    hsgp_kwargs: HSGPKwargs,
) -> SoftPlusHSGP:
    X = pt.as_tensor_variable(X)
    eta = Prior("Exponential", lam=hsgp_kwargs.eta_lam)
    ls = Prior("InverseGamma", mu=hsgp_kwargs.ls_mu, sigma=hsgp_kwargs.ls_sigma)
    cov_func = (
        hsgp_kwargs.cov_func
        if isinstance(hsgp_kwargs.cov_func, CovFunc)
        else CovFunc.Matern52
    )

    if X_mid is None:
        X_mid = float(X.mean().eval())

    if hsgp_kwargs.L is None:
        L = X_mid * 2
    else:
        L = hsgp_kwargs.L

    return SoftPlusHSGP(
        eta=eta,
        ls=ls,
        m=hsgp_kwargs.m,
        L=L,
        cov_func=cov_func,
        X=X,
        X_mid=X_mid,
        centered=False,
        dims=dims,
        drop_first=False,
    )


[docs] def time_varying_prior( name: str, X: pt.sharedvar.TensorSharedVariable, dims: Dims, X_mid: int | float | None = None, hsgp_kwargs: HSGPKwargs | None = None, ) -> pt.TensorVariable: """Time varying prior, based on the Hilbert Space Gaussian Process (HSGP). For more information see `pymc.gp.HSGP <https://www.pymc.io/projects/docs/en/stable/api/gp/generated/pymc.gp.HSGP.html>`_. Parameters ---------- name : str Name of the prior and associated variables. X : 1d array-like of int or float Time points. X_mid : int or float Midpoint of the time points. dims : tuple of str or str Dimensions of the prior. If a tuple, the first element is the name of the time dimension, and the second may be any other dimension, across which independent time varying priors for each coordinate are desired (e.g. channels). hsgp_kwargs : HSGPKwargs Keyword arguments for the Hilbert Space Gaussian Process. By default it is None, in which case the default parameters are used. See `HSGPKwargs` for more information. Returns ------- pt.TensorVariable Time-varying prior. References ---------- - Ruitort-Mayol, G., and Anderson, M., and Solin, A., and Vehtari, A. (2022). Practical Hilbert Space Approximate Bayesian Gaussian Processes for Probabilistic Programming - Solin, A., Sarkka, S. (2019) Hilbert Space Methods for Reduced-Rank Gaussian Process Regression. """ if hsgp_kwargs is None: hsgp_kwargs = HSGPKwargs() hsgp = _create_hsgp_instance( X=X, X_mid=X_mid, dims=dims, hsgp_kwargs=hsgp_kwargs, ) return hsgp.create_variable(name)
[docs] def create_time_varying_gp_multiplier( name: str, dims: Dims, time_index: pt.sharedvar.TensorSharedVariable, time_index_mid: int, time_resolution: int, hsgp_kwargs: HSGPKwargs, ) -> pt.TensorVariable: """Create a time-varying Gaussian Process multiplier. Create a time-varying Gaussian Process multiplier based on the provided parameters. Parameters ---------- name : str Name of the Gaussian Process multiplier. dims : tuple[str, str] | str Dimensions for the multiplier. time_index : pt.sharedvar.TensorSharedVariable Shared variable containing time points. time_index_mid : int Midpoint of the time points. time_resolution : int Resolution of time points. hsgp_kwargs : HSGPKwargs Keyword arguments for the Hilbert Space Gaussian Process (HSGP) component. Returns ------- pt.TensorVariable Time-varying Gaussian Process multiplier for a given variable. """ if hsgp_kwargs.L is None: hsgp_kwargs.L = time_index_mid + DAYS_IN_YEAR / time_resolution if hsgp_kwargs.ls_mu is None: hsgp_kwargs.ls_mu = DAYS_IN_YEAR / time_resolution * 2 return time_varying_prior( name=f"{name}_temporal_latent_multiplier", X=time_index, X_mid=time_index_mid, dims=dims, hsgp_kwargs=hsgp_kwargs, )
[docs] def infer_time_index( date_series_new: pd.Series, date_series: pd.Series, time_resolution: int, ) -> npt.NDArray[np.int_]: """Infer the time-index given a new dataset. Infers the time-indices by calculating the number of days since the first date in the dataset. Parameters ---------- date_series_new : pd.Series New date series. date_series : pd.Series Original date series. time_resolution : int Resolution of time points in days. Returns ------- np.ndarray Time index. """ return (date_series_new - date_series.iloc[0]).dt.days.values // time_resolution