Source code for pymc_marketing.mmm.components.base

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Base class for adstock and saturation functions used in MMM.

Use the subclasses directly for custom transformations:

- Adstock Transformations: :class:`pymc_marketing.mmm.components.adstock.AdstockTransformation`
- Saturation Transformations: :class:`pymc_marketing.mmm.components.saturation.SaturationTransformation`

"""

import warnings
from collections.abc import Iterable
from copy import deepcopy
from inspect import signature
from typing import Any

import numpy as np
import numpy.typing as npt
import pymc as pm
import xarray as xr
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from pydantic import InstanceOf
from pymc.distributions.shape_utils import Dims
from pytensor import tensor as pt
from pytensor.tensor.variable import TensorVariable

from pymc_marketing.model_config import parse_model_config
from pymc_marketing.plot import (
    SelToString,
    plot_curve,
    plot_hdi,
    plot_samples,
)
from pymc_marketing.prior import Prior, VariableFactory, create_dim_handler

# "x" for saturation, "time since exposure" for adstock
NON_GRID_NAMES: frozenset[str] = frozenset({"x", "time since exposure"})

SupportedPrior = (
    InstanceOf[Prior] | float | InstanceOf[TensorVariable] | InstanceOf[VariableFactory]
)


class ParameterPriorException(Exception):
    """Error when the functions and specified priors don't match up."""

    def __init__(self, priors: set[str], parameters: set[str]) -> None:
        self.priors = priors
        self.parameters = parameters

        msg = "The function parameters and priors don't line up."

        if self.priors:
            msg = f"{msg} Missing default prior: {self.priors}."

        if self.parameters:
            msg = f"{msg} Missing function parameter: {self.parameters}."

        super().__init__(msg)


RESERVED_DATA_PARAMETER_NAMES = {"x", "data"}


class MissingDataParameter(Exception):
    """Error if the function doesn't have a data parameter."""

    def __init__(self) -> None:
        msg = (
            f"The function must have a data parameter."
            " The first parameter is assumed to be the data"
            f" with name being one of: {RESERVED_DATA_PARAMETER_NAMES}"
        )

        super().__init__(msg)


[docs] class Transformation: """Base class for adstock and saturation functions. The subclasses will need to implement the following attributes: - function: The function that will be applied to the data. - prefix: The prefix for the variables that will be created. - default_priors: The default priors for the parameters of the function. In order to make a new saturation or adstock function, use the specific subclasses: - :class:`pymc_marketing.mmm.components.saturation.SaturationTransformation` - :class:`pymc_marketing.mmm.components.adstock.AdstockTransformation` View the documentation for those classes for more information. Parameters ---------- priors : dict[str, Prior | float | TensorVariable | VariableFactory], optional Dictionary with the priors for the parameters of the function. The keys should be the parameter names and the values the priors. If not provided, it will use the default priors from the subclass. prefix : str, optional The prefix for the variables that will be created. If not provided, it will use the prefix from the subclass. """ prefix: str default_priors: dict[str, Prior] function: Any lookup_name: str
[docs] def __init__( self, priors: dict[str, Prior | float | TensorVariable | VariableFactory] | None = None, prefix: str | None = None, ) -> None: self._checks() self.function_priors = priors # type: ignore self.prefix = prefix or self.prefix
def __repr__(self) -> str: """Representation of the transformation.""" return ( f"{self.__class__.__name__}(" f"prefix={self.prefix!r}, " f"priors={self.function_priors}" ")" )
[docs] def set_dims_for_all_priors(self, dims: Dims): """Set the dims for all priors. Convenience method to loop through all the priors and set the dims. Parameters ---------- dims : Dims The dims for the priors. Returns ------- Transformation """ for prior in self.function_priors.values(): prior.dims = dims return self
[docs] def to_dict(self) -> dict[str, Any]: """Convert the transformation to a dictionary. Returns ------- dict The dictionary defining the transformation. """ return { "lookup_name": self.lookup_name, "prefix": self.prefix, "priors": { key: _serialize_value(value) for key, value in self.function_priors.items() }, }
def __eq__(self, other: Any) -> bool: """Check if two transformations are equal.""" if not isinstance(other, self.__class__): return False return self.to_dict() == other.to_dict() @property def function_priors(self) -> dict[str, Prior]: """Get the priors for the function.""" return self._function_priors @function_priors.setter def function_priors(self, priors: dict[str, Any | Prior] | None) -> None: priors = priors or {} non_distributions = [ key for key, value in priors.items() if not isinstance(value, Prior) and not isinstance(value, dict) ] priors = parse_model_config(priors, non_distributions=non_distributions) self._function_priors = {**deepcopy(self.default_priors), **priors}
[docs] def update_priors(self, priors: dict[str, Prior]) -> None: """Update the priors for a function after initialization. Uses {prefix}_{parameter_name} as the key for the priors instead of the parameter name in order to be used in the larger MMM. Parameters ---------- priors : dict[str, Prior] Dictionary with the new priors for the parameters of the function. Examples -------- Update the priors for a transformation after initialization. .. code-block:: python from pymc_marketing.mmm.components.base import Transformation from pymc_marketing.prior import Prior class MyTransformation(Transformation): lookup_name: str = "my_transformation" prefix: str = "transformation" function = lambda x, lam: x * lam default_priors = {"lam": Prior("Gamma", alpha=3, beta=1)} transformation = MyTransformation() transformation.update_priors( {"transformation_lam": Prior("HalfNormal", sigma=1)}, ) """ new_priors = { parameter_name: priors[variable_name] for parameter_name, variable_name in self.variable_mapping.items() if variable_name in priors } if not new_priors: available_priors = list(self.variable_mapping.values()) warnings.warn( f"No priors were updated. Available parameters are {available_priors}", UserWarning, stacklevel=2, ) self.function_priors.update(new_priors)
@property def model_config(self) -> dict[str, Any]: """Mapping from variable name to prior for the model.""" return { variable_name: self.function_priors[parameter_name] for parameter_name, variable_name in self.variable_mapping.items() } def _checks(self) -> None: self._has_all_attributes() self._function_works_on_instances() self._has_defaults_for_all_arguments() def _has_all_attributes(self) -> None: if not hasattr(self, "prefix"): raise NotImplementedError("prefix must be implemented in the subclass") if not hasattr(self, "default_priors"): raise NotImplementedError( "default_priors must be implemented in the subclass" ) if not hasattr(self, "function"): raise NotImplementedError("function must be implemented in the subclass") if not hasattr(self, "lookup_name"): raise NotImplementedError("lookup_name must be implemented in the subclass") def _has_defaults_for_all_arguments(self) -> None: function_signature = signature(self.function) # Remove the first one as assumed to be the data parameters_that_need_priors = set( list(function_signature.parameters.keys())[1:] ) parameters_with_priors = set(self.default_priors.keys()) missing_priors = parameters_that_need_priors - parameters_with_priors missing_parameters = parameters_with_priors - parameters_that_need_priors if missing_priors or missing_parameters: raise ParameterPriorException(missing_priors, missing_parameters) def _function_works_on_instances(self) -> None: class_function = self.__class__.function function_parameters = list(signature(class_function).parameters) is_method = function_parameters[0] == "self" data_parameter_idx = 1 if is_method else 0 has_data_parameter = ( function_parameters[data_parameter_idx] in RESERVED_DATA_PARAMETER_NAMES ) if not has_data_parameter: raise MissingDataParameter() if is_method: return self.function = class_function @property def variable_mapping(self) -> dict[str, str]: """Mapping from parameter name to variable name in the model.""" return { parameter: f"{self.prefix}_{parameter}" for parameter in self.default_priors.keys() } @property def combined_dims(self) -> tuple[str, ...]: """Get the combined dims for all the parameters.""" return tuple(self._infer_output_core_dims()) def _infer_output_core_dims(self) -> tuple[str, ...]: parameter_dims = sorted( [ (dims,) if isinstance(dims, str) else dims for dist in self.function_priors.values() if (dims := getattr(dist, "dims", None)) is not None ], key=len, reverse=True, ) return tuple(list({str(dim): None for dims in parameter_dims for dim in dims})) def _create_distributions( self, dims: Dims | None = None ) -> dict[str, TensorVariable]: dim_handler = create_dim_handler(dims or self._infer_output_core_dims()) def create_variable(parameter_name: str, variable_name: str) -> TensorVariable: dist = self.function_priors[parameter_name] if not hasattr(dist, "create_variable"): return dist var = dist.create_variable(variable_name) return dim_handler(var, dist.dims) return { parameter_name: create_variable(parameter_name, variable_name) for parameter_name, variable_name in self.variable_mapping.items() }
[docs] def sample_prior( self, coords: dict | None = None, **sample_prior_predictive_kwargs ) -> xr.Dataset: """Sample the priors for the transformation. Parameters ---------- coords : dict, optional The coordinates for the associated with dims **sample_prior_predictive_kwargs Keyword arguments for the pm.sample_prior_predictive function. Returns ------- xr.Dataset The dataset with the sampled priors. """ coords = coords or {} dims = tuple(coords.keys()) with pm.Model(coords=coords): self._create_distributions(dims=dims) return pm.sample_prior_predictive(**sample_prior_predictive_kwargs).prior
[docs] def plot_curve( self, curve: xr.DataArray, n_samples: int = 10, hdi_probs: float | list[float] | None = None, random_seed: np.random.Generator | None = None, subplot_kwargs: dict | None = None, sample_kwargs: dict | None = None, hdi_kwargs: dict | None = None, axes: npt.NDArray[Axes] | None = None, same_axes: bool = False, colors: Iterable[str] | None = None, legend: bool | None = None, sel_to_string: SelToString | None = None, ) -> tuple[Figure, npt.NDArray[Axes]]: """Plot curve HDI and samples. Parameters ---------- curve : xr.DataArray The curve to plot. n_samples : int, optional Number of samples hdi_probs : float | list[float], optional HDI probabilities. Defaults to None which uses arviz default for stats.ci_prob which is 94% random_seed : int | random number generator, optional Random number generator. Defaults to None subplot_kwargs : dict, optional Keyword arguments for plt.subplots sample_kwargs : dict, optional Keyword arguments for the plot_curve_sample function. Defaults to None. hdi_kwargs : dict, optional Keyword arguments for the plot_curve_hdi function. Defaults to None. axes : npt.NDArray[plt.Axes], optional The exact axes to plot on. Overrides any subplot_kwargs same_axes : bool, optional If the axes should be the same for all plots. Defaults to False. colors : Iterable[str], optional The colors to use for the plot. Defaults to None. legend : bool, optional If the legend should be shown. Defaults to None. sel_to_string : SelToString, optional The function to convert the selection to a string. Defaults to None. Returns ------- tuple[plt.Figure, npt.NDArray[plt.Axes]] """ return plot_curve( curve, non_grid_names=set(NON_GRID_NAMES), n_samples=n_samples, hdi_probs=hdi_probs, random_seed=random_seed, subplot_kwargs=subplot_kwargs, sample_kwargs=sample_kwargs, hdi_kwargs=hdi_kwargs, axes=axes, same_axes=same_axes, colors=colors, legend=legend, sel_to_string=sel_to_string, )
def _sample_curve( self, var_name: str, parameters: xr.Dataset, x: pt.TensorLike, coords: dict[str, Any], ) -> xr.DataArray: output_core_dims = self._infer_output_core_dims() keys = list(coords.keys()) if len(keys) != 1: msg = "The coords should only have one key." raise ValueError(msg) x_dim = keys[0] # Allow broadcasting x = np.expand_dims( x, axis=tuple(range(1, len(output_core_dims) + 1)), ) coords.update( { dim: np.asarray(coord) for dim, coord in parameters.coords.items() if dim not in ["chain", "draw"] } ) with pm.Model(coords=coords): pm.Deterministic( var_name, self.apply(x, dims=output_core_dims), dims=(x_dim, *output_core_dims), ) return pm.sample_posterior_predictive( parameters, var_names=[var_name], ).posterior_predictive[var_name]
[docs] def plot_curve_samples( self, curve: xr.DataArray, n: int = 10, rng: np.random.Generator | None = None, plot_kwargs: dict | None = None, subplot_kwargs: dict | None = None, axes: npt.NDArray[Axes] | None = None, ) -> tuple[Figure, npt.NDArray[Axes]]: """Plot samples from the curve. Parameters ---------- curve : xr.DataArray The curve to plot. n : int, optional The number of samples to plot. Defaults to 10. rng : np.random.Generator, optional The random number generator to use. Defaults to None. plot_kwargs : dict, optional Keyword arguments for the DataFrame plot function. Defaults to None. subplot_kwargs : dict, optional Keyword arguments for plt.subplots axes : npt.NDArray[plt.Axes], optional The exact axes to plot on. Overrides any subplot_kwargs Returns ------- tuple[plt.Figure, npt.NDArray[plt.Axes]] plt.Axes The axes with the plot. """ return plot_samples( curve, non_grid_names=set(NON_GRID_NAMES), n=n, rng=rng, axes=axes, subplot_kwargs=subplot_kwargs, plot_kwargs=plot_kwargs, )
[docs] def plot_curve_hdi( self, curve: xr.DataArray, hdi_kwargs: dict | None = None, plot_kwargs: dict | None = None, subplot_kwargs: dict | None = None, axes: npt.NDArray[Axes] | None = None, ) -> tuple[Figure, npt.NDArray[Axes]]: """Plot the HDI of the curve. Parameters ---------- curve : xr.DataArray The curve to plot. hdi_kwargs : dict, optional Keyword arguments for the az.hdi function. Defaults to None. plot_kwargs : dict, optional Keyword arguments for the fill_between function. Defaults to None. subplot_kwargs : dict, optional Keyword arguments for plt.subplots axes : npt.NDArray[plt.Axes], optional The exact axes to plot on. Overrides any subplot_kwargs Returns ------- tuple[plt.Figure, npt.NDArray[plt.Axes]] """ return plot_hdi( curve, non_grid_names=set(NON_GRID_NAMES), axes=axes, subplot_kwargs=subplot_kwargs, plot_kwargs=plot_kwargs, hdi_kwargs=hdi_kwargs, )
[docs] def apply(self, x: pt.TensorLike, dims: Dims | None = None) -> TensorVariable: """Call within a model context. Used internally of the MMM to apply the transformation to the data. Parameters ---------- x : pt.TensorLike The data to be transformed. dims : str, sequence[str], optional The dims of the parameters. Defaults to None. Not the dims of the data! Returns ------- pt.TensorVariable The transformed data. Examples -------- Call the function for custom use-case .. code-block:: python import pymc as pm transformation = ... coords = {"channel": ["TV", "Radio", "Digital"]} with pm.Model(coords=coords): transformed_data = transformation.apply(data, dims="channel") """ kwargs = self._create_distributions(dims=dims) return self.function(x, **kwargs)
def _serialize_value(value: Any) -> Any: if hasattr(value, "to_dict"): return value.to_dict() if isinstance(value, TensorVariable): value = value.eval() if isinstance(value, np.ndarray): return value.tolist() return value class DuplicatedTransformationError(Exception): """Exception when a transformation is duplicated.""" def __init__(self, name: str, lookup_name: str): self.name = name self.lookup_name = lookup_name super().__init__(f"Duplicate {name}. The name {lookup_name!r} already exists.")
[docs] def create_registration_meta(subclasses: dict[str, Any]) -> type[type]: """Create a metaclass for registering subclasses. Parameters ---------- subclasses : dict[str, type[Transformation]] The subclasses to register. Returns ------- type The metaclass for registering subclasses. """ class RegistrationMeta(type): def __new__(cls, name, bases, attrs): new_cls = super().__new__(cls, name, bases, attrs) if "lookup_name" not in attrs: return new_cls base_name = bases[0].__name__ lookup_name = attrs["lookup_name"] if lookup_name in subclasses: raise DuplicatedTransformationError(base_name, lookup_name) subclasses[lookup_name] = new_cls return new_cls return RegistrationMeta