Source code for pymc_marketing.clv.distributions

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Distributions for the CLV module."""

from functools import reduce

import numpy as np
import pytensor.tensor as pt
from pymc.distributions.continuous import PositiveContinuous
from pymc.distributions.dist_math import betaln, check_parameters
from pymc.distributions.distribution import Discrete
from pytensor import scan
from pytensor.graph import vectorize_graph
from pytensor.tensor.random.op import RandomVariable

__all__ = [
    "BetaGeoBetaBinom",
    "BetaGeoNBD",
    "ContContract",
    "ContNonContract",
    "ModifiedBetaGeoNBD",
    "ParetoNBD",
]


class ContNonContractRV(RandomVariable):
    name = "continuous_non_contractual"
    signature = "(),(),()->(2)"
    dtype = "floatX"
    _print_name = ("ContNonContract", "\\operatorname{ContNonContract}")

    def __call__(self, lam, p, T, size=None, **kwargs):
        return super().__call__(lam, p, T, size=size, **kwargs)

    @classmethod
    def rng_fn(cls, rng, lam, p, T, size):
        if size is None:
            size = np.broadcast_shapes(lam.shape, p.shape, T.shape)

        lam = np.broadcast_to(lam, size)
        p = np.broadcast_to(p, size)
        T = np.broadcast_to(T, size)

        x_1 = rng.poisson(lam * T)
        x_2 = rng.geometric(p)
        x = np.minimum(x_1, x_2)

        nzp = x == 0  # nzp = non-zero purchases

        if x.shape == ():
            if nzp:
                return np.array([0, 0])
            else:
                return np.array([rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T, x])

        x[nzp] = 1.0  # temporary to avoid errors in rng.beta below
        t_x = rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T

        x[nzp] = 0.0
        t_x[nzp] = 0.0

        return np.stack([t_x, x], axis=-1)


continuous_non_contractual = ContNonContractRV()


[docs] class ContNonContract(PositiveContinuous): r"""Individual-level model for the customer lifetime value. See equation (3) from Fader et al. (2005) [1]_. .. math:: f(\lambda, p | x, t_1, \dots, t_x, T) = f(\lambda, p | t_x, T) = (1 - p)^x \lambda^x \exp(-\lambda T) + \delta_{x > 0} p (1 - p)^{x-1} \lambda^x \exp(-\lambda t_x) ======== =============================================== Support :math:`t_j > 0` for :math:`j = 1, \dots, x` Mean :math:`\mathbb{E}[X(t) | \lambda, p] = \frac{1}{p} - \frac{1}{p}\exp\left(-\lambda p \min(t, T)\right)` ======== =============================================== References ---------- .. [1] Fader, Peter S., Bruce GS Hardie, and Ka Lok Lee. "“Counting your customers” the easy way: An alternative to the Pareto/NBD model." Marketing science 24.2 (2005): 275-284. """ rv_op = continuous_non_contractual
[docs] @classmethod def dist(cls, lam, p, T, **kwargs): """Get the distribution from the parameters.""" return super().dist([lam, p, T], **kwargs)
[docs] def logp(value, lam, p, T): """Log-likelihood of the distribution.""" t_x = value[..., 0] x = value[..., 1] zero_observations = pt.eq(x, 0) A = x * pt.log(1 - p) + x * pt.log(lam) - lam * T B = pt.log(p) + (x - 1) * pt.log(1 - p) + x * pt.log(lam) - lam * t_x logp = pt.switch( zero_observations, A, pt.logaddexp(A, B), ) logp = pt.switch( reduce( pt.bitwise_or, [ pt.and_(pt.ge(t_x, 0), zero_observations), pt.lt(t_x, 0), pt.lt(x, 0), pt.gt(t_x, T), ], ), -np.inf, logp, ) return check_parameters( logp, lam > 0, 0 <= p, p <= 1, msg="lam > 0, 0 <= p <= 1", )
class ContContractRV(RandomVariable): name = "continuous_contractual" signature = "(),(),()->(3)" dtype = "floatX" _print_name = ("ContinuousContractual", "\\operatorname{ContinuousContractual}") def __call__(self, lam, p, T, size=None, **kwargs): return super().__call__(lam, p, T, size=size, **kwargs) @classmethod def rng_fn(cls, rng, lam, p, T, size): if size is None: size = np.broadcast_shapes(lam.shape, p.shape, T.shape) lam = np.broadcast_to(lam, size) p = np.broadcast_to(p, size) T = np.broadcast_to(T, size) x_1 = rng.poisson(lam * T) x_2 = rng.geometric(p) x = np.minimum(x_1, x_2) nzp = x == 0 # nzp = non-zero purchases if x.shape == (): if nzp: return np.array([0, 0, float(x_1 > x_2)]) else: return np.array( [rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T, x, float(x_1 > x_2)] ) x[nzp] = 1.0 # temporary to avoid errors in rng.beta below t_x = rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T x[nzp] = 0.0 t_x[nzp] = 0.0 return np.stack([t_x, x, (x_1 > x_2).astype(float)], axis=-1) def _supp_shape_from_params(*args, **kwargs): return (3,) continuous_contractual = ContContractRV()
[docs] class ContContract(PositiveContinuous): r"""Distribution class of a continuous contractual data-generating process. That is where purchases can occur at any time point (continuous) and churning/dropping out is explicit (contractual). .. math:: f(\lambda, p | d, x, t_1, \dots, t_x, T) = f(\lambda, p | t_x, T) = (1 - p)^{x-1} \lambda^x \exp(-\lambda t_x) p^d \left\{(1-p)\exp(-\lambda*(T - t_x))\right\}^{1 - d} ======== =============================================== Support :math:`t_j > 0` for :math:`j = 1, \dots, x` Mean :math:`\mathbb{E}[X(t) | \lambda, p, d] = \frac{1}{p} - \frac{1}{p}\exp\left(-\lambda p \min(t, T)\right)` ======== =============================================== """ rv_op = continuous_contractual
[docs] @classmethod def dist(cls, lam, p, T, **kwargs): """Get the distribution from the parameters.""" return super().dist([lam, p, T], **kwargs)
[docs] def logp(value, lam, p, T): """Log-likelihood of the distribution.""" t_x = value[..., 0] x = value[..., 1] churn = value[..., 2] zero_observations = pt.eq(x, 0) logp = (x - 1) * pt.log(1 - p) + x * pt.log(lam) - lam * t_x logp += churn * pt.log(p) + (1 - churn) * (pt.log(1 - p) - lam * (T - t_x)) logp = pt.switch( zero_observations, -lam * T, logp, ) logp = pt.switch( reduce( pt.bitwise_or, [ zero_observations, pt.lt(t_x, 0), pt.lt(x, 0), pt.gt(t_x, T), pt.bitwise_not(pt.bitwise_or(pt.eq(churn, 0), pt.eq(churn, 1))), ], ), -np.inf, logp, ) return check_parameters( logp, lam > 0, 0 <= p, p <= 1, pt.all(0 < T), msg="lam > 0, 0 <= p <= 1", )
class ParetoNBDRV(RandomVariable): name = "pareto_nbd" signature = "(),(),(),(),()->(2)" dtype = "floatX" _print_name = ("ParetoNBD", "\\operatorname{ParetoNBD}") def __call__(self, r, alpha, s, beta, T, size=None, **kwargs): return super().__call__(r, alpha, s, beta, T, size=size, **kwargs) @classmethod def rng_fn(cls, rng, r, alpha, s, beta, T, size): if size is None: size = np.broadcast_shapes( r.shape, alpha.shape, s.shape, beta.shape, T.shape ) r = np.broadcast_to(r, size) alpha = np.broadcast_to(alpha, size) s = np.broadcast_to(s, size) beta = np.broadcast_to(beta, size) T = np.broadcast_to(T, size) output = np.zeros(shape=size + (2,)) # noqa:RUF005 lam = rng.gamma(shape=r, scale=1 / alpha, size=size) mu = rng.gamma(shape=s, scale=1 / beta, size=size) def sim_data(lam, mu, T): t = 0 n = 0 dropout_time = rng.exponential(scale=1 / mu) wait = rng.exponential(scale=1 / lam) final_t = min(dropout_time, T) while (t + wait) < final_t: t += wait n += 1 wait = rng.exponential(scale=1 / lam) return np.array( [ t, n, ], ) for index in np.ndindex(*size): output[index] = sim_data(lam[index], mu[index], T[index]) return output pareto_nbd = ParetoNBDRV()
[docs] class ParetoNBD(PositiveContinuous): r"""Population-level distribution class for a continuous, non-contractual, Pareto/NBD process. It is based on Schmittlein, et al. in [2]_. The likelihood function is derived from equations (22) and (23) of [3]_, with terms rearranged for numerical stability. The modified expression is provided below: .. math:: \begin{align} \text{if }\alpha > \beta: \\ \\ \mathbb{L}(r, \alpha, s, \beta | x, t_x, T) &= \frac{\Gamma(r+x)\alpha^r\beta}{\Gamma(r)+(\alpha +t_x)^{r+s+x}} [(\frac{s}{r+s+x})_2F_1(r+s+x,s+1;r+s+x+1;\frac{\alpha-\beta}{\alpha+t_x}) \\ &+ (\frac{r+x}{r+s+x}) \frac{_2F_1(r+s+x,s;r+s+x+1;\frac{\alpha-\beta}{\alpha+T})(\alpha +t_x)^{r+s+x}} {(\alpha +T)^{r+s+x}}] \\ \\ \text{if }\beta >= \alpha: \\ \\ \mathbb{L}(r, \alpha, s, \beta | x, t_x, T) &= \frac{\Gamma(r+x)\alpha^r\beta}{\Gamma(r)+(\beta +t_x)^{r+s+x}} [(\frac{s}{r+s+x})_2F_1(r+s+x,r+x;r+s+x+1;\frac{\beta-\alpha}{\beta+t_x}) \\ &+ (\frac{r+x}{r+s+x}) \frac{_2F_1(r+s+x,r+x+1;r+s+x+1;\frac{\beta-\alpha}{\beta+T})(\beta +t_x)^{r+s+x}} {(\beta +T)^{r+s+x}}] \end{align} ======== =============================================== Support :math:`t_j >= 0` for :math:`j = 1, \dots, x` Mean :math:`\mathbb{E}[X(t) | r, \alpha, s, \beta] = \frac{r\beta}{\alpha(s-1)}[1-(\frac{\beta}{\beta + t})^{s-1}]` ======== =============================================== References ---------- .. [2] David C. Schmittlein, Donald G. Morrison and Richard Colombo. "Counting Your Customers: Who Are They and What Will They Do Next." Management Science,Vol. 33, No. 1 (Jan., 1987), pp. 1-24. .. [3] Fader, Peter & G. S. Hardie, Bruce (2005). "A Note on Deriving the Pareto/NBD Model and Related Expressions." http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf """ # noqa: E501 rv_op = pareto_nbd
[docs] @classmethod def dist(cls, r, alpha, s, beta, T, **kwargs): """Get the distribution from the parameters.""" return super().dist([r, alpha, s, beta, T], **kwargs)
[docs] def logp(value, r, alpha, s, beta, T): """Log-likelihood of the distribution.""" t_x = value[..., 0] x = value[..., 1] rsx = r + s + x rx = r + x cond = alpha >= beta larger_param = pt.switch(cond, alpha, beta) smaller_param = pt.switch(cond, beta, alpha) param_diff = larger_param - smaller_param hyp2f1_t1_2nd_param = pt.switch(cond, s + 1, rx) hyp2f1_t2_2nd_param = pt.switch(cond, s, rx + 1) # This term is factored out of the denominator of hyp2f_t1 for numerical stability refactored = rsx * pt.log(larger_param + t_x) hyp2f1_t1 = pt.log( pt.hyp2f1( rsx, hyp2f1_t1_2nd_param, rsx + 1, param_diff / (larger_param + t_x) ) ) hyp2f1_t2 = ( pt.log( pt.hyp2f1( rsx, hyp2f1_t2_2nd_param, rsx + 1, param_diff / (larger_param + T) ) ) - rsx * pt.log(larger_param + T) + refactored ) A1 = ( pt.gammaln(rx) - pt.gammaln(r) + r * pt.log(alpha) + s * pt.log(beta) - refactored ) A2 = pt.log(s) - pt.log(rsx) + hyp2f1_t1 A3 = pt.log(rx) - pt.log(rsx) + hyp2f1_t2 logp = A1 + pt.logaddexp(A2, A3) logp = pt.switch( pt.or_( pt.or_( pt.lt(t_x, 0), pt.lt(x, 0), ), pt.gt(t_x, T), ), -np.inf, logp, ) return check_parameters( logp, r > 0, alpha > 0, s > 0, beta > 0, msg="r > 0, alpha > 0, s > 0, beta > 0", )
class BetaGeoBetaBinomRV(RandomVariable): name = "beta_geo_beta_binom" signature = "(),(),(),(),()->(2)" dtype = "floatX" _print_name = ("BetaGeoBetaBinom", "\\operatorname{BetaGeoBetaBinom}") def __call__(self, alpha, beta, gamma, delta, T, size=None, **kwargs): return super().__call__(alpha, beta, gamma, delta, T, size=size, **kwargs) @classmethod def rng_fn(cls, rng, alpha, beta, gamma, delta, T, size) -> np.ndarray: if size is None: size = np.broadcast_shapes( alpha.shape, beta.shape, gamma.shape, delta.shape, T.shape ) alpha = np.broadcast_to(alpha, size) beta = np.broadcast_to(beta, size) gamma = np.broadcast_to(gamma, size) delta = np.broadcast_to(delta, size) T = np.broadcast_to(T, size) output = np.zeros(shape=(*size, 2)) purchase_prob = rng.beta(a=alpha, b=beta, size=size) churn_prob = rng.beta(a=delta, b=gamma, size=size) def sim_data(purchase_prob, churn_prob, T): t_x = 0 x = 0 active = True recency = 0 while t_x <= T and active: t_x += 1 active = rng.binomial(1, churn_prob) purchase = rng.binomial(1, purchase_prob) if active and purchase: recency = t_x x += 1 return np.array( [ recency if x > 0 else T, x, ], ) for index in np.ndindex(*size): output[index] = sim_data(purchase_prob[index], churn_prob[index], T[index]) return output beta_geo_beta_binom = BetaGeoBetaBinomRV()
[docs] class BetaGeoBetaBinom(Discrete): r"""Population-level distribution class for a discrete, non-contractual, Beta-Geometric/Beta-Binomial process. It is based on equation(5) from Fader, et al. in [1]_. .. math:: \mathbb{L}(\alpha, \beta, \gamma, \delta | x, t_x, n) &= \frac{B(\alpha+x,\beta+n-x)}{B(\alpha,\beta)} \frac{B(\gamma,\delta+n)}{B(\gamma,\delta)} \\ &+ \sum_{i=0}^{n-t_x-1}\frac{B(\alpha+x,\beta+t_x-x+i)}{B(\alpha,\beta)} \\ &\cdot \frac{B(\gamma+1,\delta+t_x+i)}{B(\gamma,\delta)} ======== =============================================== Support :math:`t_j >= 0` for :math:`j = 1, \dots,x` Mean :math:`\mathbb{E}[X(n) | \alpha, \beta, \gamma, \delta] = (\frac{\alpha}{\alpha+\beta})(\frac{\delta}{\gamma-1}) \cdot{1-\frac{\Gamma(\gamma+\delta)}{\Gamma(\gamma+\delta+n)}\frac{\Gamma(1+\delta+n)}{\Gamma(1+ \delta)}}` ======== =============================================== References ---------- .. [1] Fader, Peter S., Bruce G.S. Hardie, and Jen Shang (2010), "Customer-Base Analysis in a Discrete-Time Noncontractual Setting," Marketing Science, 29 (6), 1086-1108. https://www.brucehardie.com/papers/020/fader_et_al_mksc_10.pdf """ # noqa: E501 rv_op = beta_geo_beta_binom
[docs] @classmethod def dist(cls, alpha, beta, gamma, delta, T, **kwargs): """Get the distribution from the parameters.""" return super().dist([alpha, beta, gamma, delta, T], **kwargs)
[docs] def logp(value, alpha, beta, gamma, delta, T): """Log-likelihood of the distribution.""" t_x = pt.atleast_1d(value[..., 0]) x = pt.atleast_1d(value[..., 1]) for param in (t_x, x, alpha, beta, gamma, delta, T): if param.type.ndim > 1: raise NotImplementedError( f"BetaGeoBetaBinom logp only implemented for vector parameters, got ndim={param.type.ndim}" ) # Broadcast all the parameters so they are sequences. # Potentially inefficient, but otherwise ugly logic needed to unpack arguments in the scan function, # since sequences always precede non-sequences. t_x, alpha, beta, gamma, delta, T = pt.broadcast_arrays( t_x, alpha, beta, gamma, delta, T ) def logp_customer_died(t_x_i, x_i, alpha_i, beta_i, gamma_i, delta_i, T_i): i = pt.scalar("i", dtype=int) died = pt.lt(t_x_i + i, T_i) unnorm_logprob_customer_died_at_tx_plus_i = betaln( alpha_i + x_i, beta_i + t_x_i - x_i + i ) + betaln(gamma_i + died, delta_i + t_x_i + i) # Maximum prevents invalid T - t_x values from crashing logp i_vec = pt.arange(pt.maximum(T_i - t_x_i, 0) + 1) unnorm_logprob_customer_died_at_tx_plus_i_vec = vectorize_graph( unnorm_logprob_customer_died_at_tx_plus_i, replace={i: i_vec} ) return pt.logsumexp(unnorm_logprob_customer_died_at_tx_plus_i_vec) unnorm_logp, _ = scan( fn=logp_customer_died, outputs_info=[None], sequences=[t_x, x, alpha, beta, gamma, delta, T], ) logp = unnorm_logp - betaln(alpha, beta) - betaln(gamma, delta) logp = pt.switch( pt.or_( pt.or_( pt.lt(t_x, 0), pt.lt(x, 0), ), pt.gt(t_x, T), ), -np.inf, logp, ) if value.ndim == 1: logp = pt.specify_shape(logp, 1).squeeze(0) return check_parameters( logp, alpha > 0, beta > 0, gamma > 0, delta > 0, msg="alpha > 0, beta > 0, gamma > 0, delta > 0", )
class BetaGeoNBDRV(RandomVariable): name = "bg_nbd" signature = "(),(),(),(),()->(2)" dtype = "floatX" _print_name = ("BetaGeoNBD", "\\operatorname{BetaGeoNBD}") def __call__(self, a, b, r, alpha, T, size=None, **kwargs): return super().__call__(a, b, r, alpha, T, size=size, **kwargs) @classmethod def rng_fn(cls, rng, a, b, r, alpha, T, size): if size is None: size = np.broadcast_shapes(a.shape, b.shape, r.shape, alpha.shape, T.shape) a = np.asarray(a) b = np.asarray(b) r = np.asarray(r) alpha = np.asarray(alpha) T = np.asarray(T) if size == (): size = np.broadcast_shapes(a.shape, b.shape, r.shape, alpha.shape, T.shape) a = np.broadcast_to(a, size) b = np.broadcast_to(b, size) r = np.broadcast_to(r, size) alpha = np.broadcast_to(alpha, size) T = np.broadcast_to(T, size) output = np.zeros(shape=size + (2,)) # noqa:RUF005 lam = rng.gamma(shape=r, scale=1 / alpha, size=size) p = rng.beta(a=a, b=b, size=size) def sim_data(lam, p, T): t = 0 # recency n = 0 # frequency churn = 0 # BG/NBD assumes all non-repeat customers are active wait = rng.exponential(scale=1 / lam) while t + wait < T and not churn: churn = rng.random() < p n += 1 t += wait wait = rng.exponential(scale=1 / lam) return np.array([t, n]) for index in np.ndindex(*size): output[index] = sim_data(lam[index], p[index], T[index]) return output bg_nbd = BetaGeoNBDRV()
[docs] class BetaGeoNBD(PositiveContinuous): r"""Population-level distribution class for a discrete, non-contractual, Beta-Geometric/Negative-Binomial process. Based on Fader, et al. in [1]_, [2]_ and enhancements for numerical stability in [3]_. .. math:: \mathbb{LL}(r, \alpha, a, b | x, t_x, T) = D_1 + D_2 + \ln(C_3 + \delta_{x>0} C_4) \text{, where:} \\ \begin{align} D_1 &= \ln \left[ \Gamma(r+x) \right] - \ln \left[ \Gamma(r) \right] + \ln \left[ \Gamma(a+b) \right] + \ln \left[ \Gamma(b+x) \right] \\ D_2 &= r \ln(\alpha) - (r+x) \ln(\alpha + t_x) \\ C_3 &= \left(\frac{\alpha + t_x}{\alpha + T} \right)^{r+x} \\ C_4 &= \left(\frac{a}{b+x-1} \right) \\ \end{align} ======== =============================================== Support :math:`t_j >= 0` for :math:`j = 1, \dots,x` Mean :math:`\mathbb{E}[X(n) | r, \alpha, a, b] = \frac{a+b-1}{a-1} \left[ 1 - \left(\frac{\alpha}{\alpha + T}\right)^r {_2}{F}{_1}(r,b;a+b-1;\frac{t}{\alpha + t}) \right]` ======== =============================================== References ---------- .. [1] Fader, Peter S., Bruce G.S. Hardie, and Jen Shang (2010), "Counting Your Customers" the Easy Way: An Alternative to the Pareto/NBD Model Marketing Science, 24 (Spring), 275-284 .. [2] Implementing the BG/NBD Model for Customer Base Analysis in Excel http://brucehardie.com/notes/004/bgnbd_spreadsheet_note.pdf .. [3] Overcoming the BG/NBD Model's #NUM! Error Problem https://brucehardie.com/notes/027/bgnbd_num_error.pdf """ # noqa: E501 rv_op = bg_nbd
[docs] @classmethod def dist(cls, a, b, r, alpha, T, **kwargs): """Get the distribution from the parameters.""" return super().dist([a, b, r, alpha, T], **kwargs)
[docs] def logp(value, a, b, r, alpha, T): """Log-likelihood of the distribution.""" t_x = pt.atleast_1d(value[..., 0]) x = pt.atleast_1d(value[..., 1]) for param in (t_x, x, a, b, r, alpha, T): if param.type.ndim > 1: raise NotImplementedError( f"BetaGeoNBD logp only implemented for vector parameters, got ndim={param.type.ndim}" ) x_non_zero = x > 0 d1 = ( pt.gammaln(r + x) - pt.gammaln(r) + pt.gammaln(a + b) + pt.gammaln(b + x) - pt.gammaln(b) - pt.gammaln(a + b + x) ) d2 = r * pt.log(alpha) - (r + x) * pt.log(alpha + t_x) c3 = ((alpha + t_x) / (alpha + T)) ** (r + x) c4 = a / (b + x - 1) logp = d1 + d2 + pt.log(c3 + pt.switch(x_non_zero, c4, 0)) logp = pt.switch( pt.or_( pt.or_( pt.lt(t_x, 0), pt.lt(x, 0), ), pt.gt(t_x, T), ), -np.inf, logp, ) return check_parameters( logp, a > 0, b > 0, alpha > 0, r > 0, msg="a > 0, b > 0, alpha > 0, r > 0", )
class ModifiedBetaGeoNBDRV(RandomVariable): name = "mbg_nbd" signature = "(),(),(),(),()->(2)" dtype = "floatX" _print_name = ("ModifiedBetaGeoNBD", "\\operatorname{ModifiedBetaGeoNBD}") def __call__(self, a, b, r, alpha, T, size=None, **kwargs): return super().__call__(a, b, r, alpha, T, size=size, **kwargs) @classmethod def rng_fn(cls, rng, a, b, r, alpha, T, size): if size is None: size = np.broadcast_shapes(a.shape, b.shape, r.shape, alpha.shape, T.shape) a = np.asarray(a) b = np.asarray(b) r = np.asarray(r) alpha = np.asarray(alpha) T = np.asarray(T) if size == (): size = np.broadcast_shapes(a.shape, b.shape, r.shape, alpha.shape, T.shape) a = np.broadcast_to(a, size) b = np.broadcast_to(b, size) r = np.broadcast_to(r, size) alpha = np.broadcast_to(alpha, size) T = np.broadcast_to(T, size) output = np.zeros(shape=size + (2,)) # noqa:RUF005 lam = rng.gamma(shape=r, scale=1 / alpha, size=size) p = rng.beta(a=a, b=b, size=size) def sim_data(lam, p, T): t = 0 # recency n = 0 # frequency churn = ( rng.random() < p ) # MBG/NBD customer active with probability p at time 0 wait = rng.exponential(scale=1 / lam) while t + wait < T and not churn: churn = rng.random() < p n += 1 t += wait wait = rng.exponential(scale=1 / lam) return np.array([t, n]) for index in np.ndindex(*size): output[index] = sim_data(lam[index], p[index], T[index]) return output mbg_nbd = ModifiedBetaGeoNBDRV()
[docs] class ModifiedBetaGeoNBD(PositiveContinuous): r"""Population-level distribution for a discrete, non-contractual Modified-Beta-Geometric/Negative-Binomial process. In MBG/NBD, a customer may drop out at time zero. This is in contrast with the BG/NBD model, which assumes all non-repeat customers are still active. Based on Batislam, et al. in [1]_, and Wagner & Hopper in [2]_ . .. math:: \mathbb{LL}(a, b, \alpha, r | x, t_x, T) = \ln \left[ A_1 * A_2 * (A_3 + \delta_{x>0} A_4) \right] \text{, where:} \\ \begin{align} A_1 &= \frac{\Gamma(r+x) \alpha^r)}{\Gamma(x)} \\ A_2 &= \frac{\Gamma(a+b) \Gamma(b+x+1)}{\Gamma(b) \Gamma(a+b+x+1)} \\ A_3 &= \left( \frac{1}{\alpha + T} \right)^(r+x) \\ A_4 &= \left( \frac{a}{b+x} \right) \left( \frac{1}{\alpha + t_x} \right)^(r+x) \\ \end{align} ======== =============================================== Support :math:`t_j >= 0` for :math:`j = 1, \dots,x` Mean :math:`\mathbb{E}[X(n) | r, \alpha, a, b] = \frac{a+b-1}{a-1} \left[ 1 - \left(\frac{\alpha}{\alpha + T}\right)^r {_2}{F}{_1}(r,b;a+b-1;\frac{t}{\alpha + t}) \right]` ======== =============================================== References ---------- .. [1] Batislam, E.P., M. Denizel, A. Filiztekin (2007), "Empirical validation and comparison of models for customer base analysis," International Journal of Research in Marketing, 24 (3), 201-209. .. [2] Wagner, U. and Hoppe D. (2008), "Erratum on the MBG/NBD Model," International Journal of Research in Marketing, 25 (3), 225-226. """ # noqa: E501 rv_op = mbg_nbd
[docs] @classmethod def dist(cls, a, b, r, alpha, T, **kwargs): """Get the distribution from the parameters.""" return super().dist([a, b, r, alpha, T], **kwargs)
[docs] def logp(value, a, b, r, alpha, T): """Log-likelihood of the distribution.""" t_x = pt.atleast_1d(value[..., 0]) x = pt.atleast_1d(value[..., 1]) for param in (t_x, x, a, b, r, alpha, T): if param.type.ndim > 1: raise NotImplementedError( f"ModifiedBetaGeoNBD logp only implemented for vector parameters, got ndim={param.type.ndim}" ) a1 = pt.gammaln(r + x) - pt.gammaln(r) + r * pt.log(alpha) a2 = ( pt.gammaln(a + b) + pt.gammaln(b + x + 1) - pt.gammaln(b) - pt.gammaln(a + b + x + 1) ) a3 = -(r + x) * pt.log(alpha + T) a4 = ( pt.log(a) - pt.log(b + x) + (r + x) * (pt.log(alpha + T) - pt.log(alpha + t_x)) ) logp = a1 + a2 + a3 + pt.logaddexp(a4, 0) logp = pt.switch( pt.or_( pt.or_( pt.lt(t_x, 0), pt.lt(x, 0), ), pt.gt(t_x, T), ), -np.inf, logp, ) return check_parameters( logp, a > 0, b > 0, alpha > 0, r > 0, msg="a > 0, b > 0, alpha > 0, r > 0", )