# Copyright 2022 - 2025 The PyMC Labs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pareto NBD Model."""
import warnings
from collections.abc import Sequence
from typing import Literal, cast
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
import xarray
from numpy import exp, log
from pymc.util import RandomState
from pytensor.compile import Mode, get_default_mode
from pytensor.graph import Constant, node_rewriter
from pytensor.scalar import Grad2F1Loop
from pytensor.tensor.elemwise import Elemwise
from scipy.special import betaln, gammaln, hyp2f1
from xarray_einstats.stats import logsumexp as xr_logsumexp
from pymc_marketing.clv.distributions import ParetoNBD
from pymc_marketing.clv.models.basic import CLVModel
from pymc_marketing.clv.utils import to_xarray
from pymc_marketing.model_config import ModelConfig
from pymc_marketing.prior import Prior
@node_rewriter([Elemwise])
def local_reduce_max_num_iters_hyp2f1_grad(fgraph, node):
"""Rewrite that reduces the maximum number of iterations in the hyp2f1 grad scalar loop.
This is critical to get NUTS to converge in the beginning.
Otherwise, it can take a long time to get started.
"""
if not isinstance(node.op.scalar_op, Grad2F1Loop):
return
max_steps, *other_args = node.inputs
# max_steps = switch(skip_loop, 0, 1e6) by default
if max_steps.owner and max_steps.owner.op == pt.switch:
cond, zero, max_steps_const = max_steps.owner.inputs
if (isinstance(zero, Constant) and np.all(zero.data == 0)) and (
isinstance(max_steps_const, Constant)
and np.all(max_steps_const.data == 1e6)
):
new_max_steps = pt.switch(cond, zero, np.array(int(1e5), dtype="int32"))
return node.op.make_node(new_max_steps, *other_args).outputs
pytensor.compile.optdb["specialize"].register(
"local_reduce_max_num_iters_hyp2f1_grad",
local_reduce_max_num_iters_hyp2f1_grad,
use_db_name_as_tag=False, # Not included by default
)
[docs]
class ParetoNBDModel(CLVModel):
"""Pareto Negative Binomial Model (Pareto/NBD).
Model for continuous, non-contractual customers, first introduced by Schmittlein et al. [1]_,
with additional derivations and predictive methods by Hardie & Fader [2]_ [3]_ [4]_ [5]_.
The Pareto/NBD model assumes the time duration a customer is active follows a Gamma distribution,
and time between purchases is also Gamma-distributed while the customer is still active.
This model requires data to be summarized by *recency*, *frequency*, and *T* for each customer,
using `clv.rfm_summary()` or equivalent. Covariates impacting customer dropouts and transaction rates are optional.
Parameters
----------
data : ~pandas.DataFrame
DataFrame containing the following columns:
* `customer_id`: Unique customer identifier
* `frequency`: Number of repeat purchases
* `recency`: Time between the first and the last purchase
* `T`: Time between the first purchase and the end of the observation period.
Model assumptions require *T >= recency*
Along with optional covariate columns.
model_config : dict, optional
Dictionary containing model parameters and covariate column names:
* `r`: Shape parameter of time between purchases; defaults to `Weibull(alpha=2, beta=1)`
* `alpha`: Scale parameter of time between purchases; defaults to `Weibull(alpha=2, beta=10)`
* `s`: Shape parameter of time until dropout; defaults to `Weibull(alpha=2, beta=1)`
* `beta`: Scale parameter of time until dropout; defaults to `Weibull(alpha=2, beta=10)`
* `purchase_covariates`: Coefficients for purchase rate covariates; defaults to `Normal(0, 3)`
* `dropout_covariates`: Coefficients for dropout covariates; defaults to `Normal.dist(0, 3)`
* `purchase_covariate_cols`: List containing column names of covariates for customer purchase rates.
* `dropout_covariate_cols`: List containing column names of covariates for customer dropouts.
If not provided, the model will use default priors specified in the `default_model_config` class attribute.
sampler_config : dict, optional
Dictionary of sampler parameters. Defaults to None.
Examples
--------
.. code-block:: python
import pymc as pm
from pymc_marketing.prior import Prior
from pymc_marketing.clv import ParetoNBDModel, rfm_summary
rfm_df = rfm_summary(raw_data,'id_col_name','date_col_name')
# Initialize model with customer data; `model_config` parameter is optional
model = ParetoNBDModel(
data=rfm_df,
model_config={
"r": Prior("Weibull", alpha=2, beta=1),
"alpha: Prior("Weibull", alpha=2, beta=10),
"s": Prior("Weibull", alpha=2, beta=1),
"beta": Prior("Weibull", alpha=2, beta=10),
},
)
# Fit model quickly to large datasets via the default Maximum a Posteriori method
model.fit(method='map')
print(model.fit_summary())
# Use 'demz' for more informative predictions and reliable performance on smaller datasets
model.fit(method='demz')
print(model.fit_summary())
# Predict number of purchases for customers over the next 10 time periods
expected_purchases = model.expected_purchases(
data=rfm_df,
future_t=10,
)
# Predict probability of customer making 'n' purchases over 't' time periods
# Data parameter is omitted here because predictions are ran on original dataset
expected_num_purchases = model.expected_purchase_probability(
n=[0, 1, 2, 3],
future_t=[10,20,30,40],
)
new_data = pd.DataFrame(
data = {
"customer_id": [0, 1, 2, 3],
"frequency": [5, 2, 1, 8],
"recency": [7, 4, 2.5, 11],
"T": [10, 8, 10, 22]
}
)
# Predict probability customers will still be active in 'future_t' time periods
probability_alive = model.expected_probability_alive(
data=new_data,
future_t=[0, 3, 6, 9],
)
# Predict number of purchases for a new customer over 't' time periods.
expected_purchases_new_customer = model.expected_purchases_new_customer(
t=[2, 5, 7, 10],
)
References
----------
.. [1] David C. Schmittlein, Donald G. Morrison and Richard Colombo.
"Counting Your Customers: Who Are They and What Will They Do Next".
Management Science,Vol. 33, No. 1 (Jan., 1987), pp. 1-24.
.. [2] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions".
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
.. [3] Fader, Peter & G. S. Hardie, Bruce (2014).
"Additional Results for the Pareto/NBD Model".
https://www.brucehardie.com/notes/015/additional_pareto_nbd_results.pdf
.. [4] Fader, Peter & G. S. Hardie, Bruce (2014).
"Deriving the Conditional PMF of the Pareto/NBD Model".
https://www.brucehardie.com/notes/028/pareto_nbd_conditional_pmf.pdf
.. [5] Fader, Peter & G. S. Hardie, Bruce (2007).
"Incorporating Time-Invariant Covariates into the Pareto/NBD and BG/NBD Models".
https://www.brucehardie.com/notes/019/time_invariant_covariates.pdf
"""
_model_type = "Pareto/NBD" # Pareto Negative-Binomial Distribution
[docs]
def __init__(
self,
data: pd.DataFrame,
*,
model_config: ModelConfig | None = None,
sampler_config: dict | None = None,
):
super().__init__(
data=data,
model_config=model_config,
sampler_config=sampler_config,
non_distributions=["purchase_covariate_cols", "dropout_covariate_cols"],
)
self.purchase_covariate_cols = list(
self.model_config["purchase_covariate_cols"]
)
self.dropout_covariate_cols = list(self.model_config["dropout_covariate_cols"])
self.covariate_cols = self.purchase_covariate_cols + self.dropout_covariate_cols
self._validate_cols(
data,
required_cols=[
"customer_id",
"frequency",
"recency",
"T",
*self.covariate_cols,
],
must_be_unique=["customer_id"],
)
@property
def default_model_config(self) -> ModelConfig:
"""Default model configuration."""
return {
"r": Prior("Weibull", alpha=2, beta=1),
"alpha": Prior("Weibull", alpha=2, beta=10),
"s": Prior("Weibull", alpha=2, beta=1),
"beta": Prior("Weibull", alpha=2, beta=10),
"purchase_coefficient": Prior("Normal", mu=0, sigma=1),
"dropout_coefficient": Prior("Normal", mu=0, sigma=1),
"purchase_covariate_cols": [],
"dropout_covariate_cols": [],
}
[docs]
def build_model(self) -> None: # type: ignore[override]
"""Build the model."""
coords = {
"purchase_covariate": self.purchase_covariate_cols,
"dropout_covariate": self.dropout_covariate_cols,
"obs_var": ["recency", "frequency"],
"customer_id": self.data["customer_id"],
}
with pm.Model(coords=coords) as self.model:
if self.purchase_covariate_cols:
purchase_data = pm.Data(
"purchase_data",
self.data[self.purchase_covariate_cols],
dims=["customer_id", "purchase_covariate"],
)
self.model_config["purchase_coefficient"].dims = "purchase_covariate"
purchase_coefficient = self.model_config[
"purchase_coefficient"
].create_variable("purchase_coefficient")
alpha_scale = self.model_config["alpha"].create_variable("alpha_scale")
alpha = pm.Deterministic(
"alpha",
(
alpha_scale
* pm.math.exp(-pm.math.dot(purchase_data, purchase_coefficient))
),
dims="customer_id",
)
else:
alpha = self.model_config["alpha"].create_variable("alpha")
# churn priors
if self.dropout_covariate_cols:
dropout_data = pm.Data(
"dropout_data",
self.data[self.dropout_covariate_cols],
dims=["customer_id", "dropout_covariate"],
)
self.model_config["dropout_coefficient"].dims = "dropout_covariate"
dropout_coefficient = self.model_config[
"dropout_coefficient"
].create_variable(
"dropout_coefficient",
)
beta_scale = self.model_config["beta"].create_variable("beta_scale")
beta = pm.Deterministic(
"beta",
(
beta_scale
* pm.math.exp(-pm.math.dot(dropout_data, dropout_coefficient))
),
dims="customer_id",
)
else:
beta = self.model_config["beta"].create_variable("beta")
r = self.model_config["r"].create_variable("r")
s = self.model_config["s"].create_variable("s")
ParetoNBD(
name="recency_frequency",
r=r,
alpha=alpha,
s=s,
beta=beta,
T=self.data["T"],
observed=np.stack(
(self.data["recency"], self.data["frequency"]), axis=1
),
dims=["customer_id", "obs_var"],
)
[docs]
def fit(self, method: str = "map", fit_method: str | None = None, **kwargs): # type: ignore
"""Infer posteriors of model parameters to run predictions.
Parameters
----------
method : str
Method used to fit the model. Options are:
* "map": Posterior point estimates via Maximum a Posteriori (default).
* "demz": Full posterior distributions via DEMetropolisZ.
* "mcmc": Full posterior distributions via No U-Turn Sampler (NUTS). This can be very slow.
kwargs : dict
Other keyword arguments passed to the underlying PyMC routines
"""
mode = get_default_mode()
if fit_method:
warnings.warn(
"'fit_method' is deprecated and will be removed in a future release. "
"Use 'method' instead.",
DeprecationWarning,
stacklevel=1,
)
method = fit_method
if method == "mcmc":
# Include rewrite in mode
opt_qry = mode.provided_optimizer.including(
"local_reduce_max_num_iters_hyp2f1_grad"
)
mode = Mode(linker=mode.linker, optimizer=opt_qry)
with pytensor.config.change_flags(mode=mode, on_opt_error="raise"):
# Suppress annoying warning
with warnings.catch_warnings():
warnings.simplefilter(
action="ignore",
category=UserWarning,
)
return super().fit(method, **kwargs)
@staticmethod
def _logp(
r: xarray.DataArray,
alpha: xarray.DataArray,
s: xarray.DataArray,
beta: xarray.DataArray,
x: xarray.DataArray,
t_x: xarray.DataArray,
T: xarray.DataArray,
) -> xarray.DataArray:
"""Log-likelihood of the Pareto/NBD model.
Utility function for using ParetoNBD log-likelihood in predictive methods.
"""
# Add one dummy dimension to the right of the scalar parameters, so they broadcast with the `T` vector
pareto_dist = ParetoNBD.dist(
alpha=alpha.values[..., None]
if "customer_id" not in alpha.dims
else alpha.values,
r=r.values[..., None],
beta=beta.values[..., None]
if "customer_id" not in beta.dims
else beta.values,
s=s.values[..., None],
T=T.values,
)
values = np.vstack((t_x.values, x.values)).T
# TODO: Instead of compiling this function everytime this method is called
# we could compile it once (with mutable inputs) and cache it for reuse with new inputs.
loglike = pm.logp(pareto_dist, values).eval()
return xarray.DataArray(data=loglike, dims=("chain", "draw", "customer_id"))
def _extract_predictive_variables(
self,
data: pd.DataFrame,
customer_varnames: Sequence[str] = (),
) -> xarray.Dataset:
"""
Extract predictive variables from the data.
Utility function assigning default customer arguments
for predictive methods and converting to xarrays.
"""
self._validate_cols(
data,
required_cols=[
"customer_id",
*customer_varnames,
*self.purchase_covariate_cols,
*self.dropout_covariate_cols,
],
must_be_unique=["customer_id"],
)
customer_id = data["customer_id"]
model_coords = self.model.coords # type: ignore
if self.purchase_covariate_cols:
purchase_xarray = xarray.DataArray(
data[self.purchase_covariate_cols],
dims=["customer_id", "purchase_covariate"],
coords=[customer_id, list(model_coords["purchase_covariate"])],
)
alpha_scale = self.fit_result["alpha_scale"]
purchase_coefficient = self.fit_result["purchase_coefficient"]
alpha = alpha_scale * np.exp(
-xarray.dot(
purchase_coefficient, purchase_xarray, dim="purchase_covariate"
)
)
alpha.name = "alpha"
else:
alpha = self.fit_result["alpha"]
if self.dropout_covariate_cols:
dropout_xarray = xarray.DataArray(
data[self.dropout_covariate_cols],
dims=["customer_id", "dropout_covariate"],
coords=[customer_id, list(model_coords["dropout_covariate"])],
)
beta_scale = self.fit_result["beta_scale"]
dropout_coefficient = self.fit_result["dropout_coefficient"]
beta = beta_scale * np.exp(
-xarray.dot(
dropout_coefficient, dropout_xarray, dim="dropout_covariate"
)
)
beta.name = "beta"
else:
beta = self.fit_result["beta"]
r = self.fit_result["r"]
s = self.fit_result["s"]
customer_vars = to_xarray(
data["customer_id"],
*[data[customer_varname] for customer_varname in customer_varnames],
)
if len(customer_varnames) == 1:
customer_vars = [customer_vars]
return xarray.combine_by_coords(
(
r,
alpha,
s,
beta,
*customer_vars,
)
)
[docs]
def expected_purchases(
self,
data: pd.DataFrame | None = None,
*,
future_t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Compute expected number of future purchases.
Given *recency*, *frequency*, and *T* for an individual customer, this method predicts the
expected number of future purchases across *future_t* time periods.
Adapted from equation (41) In Bruce Hardie's notes [1]_, and `lifetimes` package:
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/pareto_nbd_fitter.py#L242
Parameters
----------
data : ~pandas.DataFrame, optional
Dataframe containing the following columns:
* `customer_id`: Unique customer identifier
* `frequency`: Number of repeat purchases
* `recency`: Time between the first and the last purchase
* `T`: Time between the first purchase and the end of the observation period.
Model assumptions require *T >= recency*
* `future_t`: Optional column for *future_t* parametrization.
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
future_t : array_like
Number of time periods to predict expected purchases.
Not required if `data` Dataframe contains a *future_t* column.
References
----------
.. [1] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions."
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
"""
if data is None:
data = self.data
if future_t is not None:
data = data.assign(future_t=future_t)
dataset = self._extract_predictive_variables(
data, customer_varnames=["frequency", "recency", "T", "future_t"]
)
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
x = dataset["frequency"]
t_x = dataset["recency"]
T = dataset["T"]
future_t = dataset["future_t"]
loglike = self._logp(r, alpha, s, beta, x, t_x, T)
first_term = (
gammaln(r + x)
- gammaln(r)
+ r * log(alpha)
+ s * log(beta)
- (r + x) * log(alpha + T)
- s * log(beta + T)
)
second_term = log(r + x) + log(beta + T) - log(alpha + T)
third_term = log(
(1 - ((beta + T) / (beta + T + future_t)) ** (s - 1)) / (s - 1)
)
exp_purchases = exp(first_term + second_term + third_term - loglike)
return exp_purchases.transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def expected_probability_alive(
self,
data: pd.DataFrame | None = None,
*,
future_t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Compute expected probability of being alive.
Compute the probability that a customer with history *frequency*, *recency*, and *T*
is currently active.
Can also estimate alive probability for *future_t* periods into the future.
Adapted from equation (18) in Bruce Hardie's notes [1]_.
Parameters
----------
data : ~pandas.DataFrame, optional
Dataframe containing the following columns:
* `customer_id`: Unique customer identifier
* `frequency`: Number of repeat purchases
* `recency`: Time between the first and the last purchase
* `T`: Time between the first purchase and the end of the observation period.
Model assumptions require *T >= recency*
* `future_t`: Optional column for *future_t* parametrization.
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
future_t : array_like
Number of time periods to predict expected purchases.
Not required if `data` Dataframe contains a *future_t* column.
References
----------
.. [1] Fader, Peter & G. S. Hardie, Bruce (2014).
"Additional Results for the Pareto/NBD Model."
https://www.brucehardie.com/notes/015/additional_pareto_nbd_results.pdf
"""
if data is None:
data = self.data
if future_t is not None:
data = data.assign(future_t=future_t)
if "future_t" not in data:
data = data.assign(future_t=0)
dataset = self._extract_predictive_variables(
data, customer_varnames=["frequency", "recency", "T", "future_t"]
)
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
x = dataset["frequency"]
t_x = dataset["recency"]
T = dataset["T"]
future_t = dataset["future_t"]
loglike = self._logp(r, alpha, s, beta, x, t_x, T)
term1 = gammaln(r + x) - gammaln(r)
term2 = r * log(alpha / (alpha + T))
term3 = -x * log(alpha + T)
term4 = s * log(beta / (beta + T + future_t))
prob_alive = exp(term1 + term2 + term3 + term4 - loglike)
return prob_alive.transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def expected_purchase_probability(
self,
data: pd.DataFrame | None = None,
*,
n_purchases: int | None = None,
future_t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Compute expected probability of *n_purchases* over *future_t* time periods.
Estimate probability of *n_purchases* over *future_t* time periods,
given an individual customer's current *frequency*, *recency*, and *T*.
Adapted from equation (16) in Bruce Hardie's notes [1]_, and `lifetimes` package:
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/pareto_nbd_fitter.py#L388
Parameters
----------
data : ~pandas.DataFrame
Optional dataframe containing the following columns:
* `customer_id`: Unique customer identifier
* `frequency`: Number of repeat purchases
* `recency`: Time between the first and the last purchase
* `T`: Time between the first purchase and the end of the observation period.
Model assumptions require *T >= recency*
* `future_t`: Optional column for *future_t* parametrization.
* `n_purchases`: Optional column for *n_purchases* parametrization.
Currently restricted to the same number for all customers.
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
future_t : array_like
Number of time periods to predict expected purchases.
Not required if `data` Dataframe contains a *future_t* column.
n_purchases : int
Number of purchases predicted.
Not required if `data` Dataframe contains an *n_purchases* column.
future_t : array_like
Time periods over which the probability should be estimated.
Not required if `data` Dataframe contains an *n_purchases* column.
References
----------
.. [1] Fader, Peter & G. S. Hardie, Bruce (2014).
"Deriving the Conditional PMF of the Pareto/NBD Model."
https://www.brucehardie.com/notes/028/pareto_nbd_conditional_pmf.pdf
"""
if data is None:
data = self.data
if n_purchases is not None:
data = data.assign(n_purchases=n_purchases)
if future_t is not None:
data = data.assign(future_t=future_t)
dataset = self._extract_predictive_variables(
data,
customer_varnames=["frequency", "recency", "T", "future_t", "n_purchases"],
)
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
x = dataset["frequency"]
t_x = dataset["recency"]
T = dataset["T"]
future_t = cast(xarray.DataArray, dataset["future_t"])
n_purchases = cast(int, dataset["n_purchases"].values[0].item())
if not np.all(n_purchases == dataset["n_purchases"].values):
raise NotImplementedError(
"expected_purchase_probability with distinct numbers of `n_purchases` not implemented"
)
loglike = self._logp(r, alpha, s, beta, x, t_x, T)
_alpha_less_than_beta = alpha < beta
min_of_alpha_beta = xarray.where(_alpha_less_than_beta, alpha, beta)
max_of_alpha_beta = xarray.where(_alpha_less_than_beta, beta, alpha)
p = xarray.where(_alpha_less_than_beta, r + x + n_purchases, s + 1)
abs_alpha_beta = max_of_alpha_beta - min_of_alpha_beta
log_p_zero = (
gammaln(r + x)
+ r * log(alpha)
+ s * log(beta)
- (gammaln(r) + (r + x) * log(alpha + T) + s * log(beta + T) + loglike)
)
log_B_one = (
gammaln(r + x + n_purchases)
+ r * log(alpha)
+ s * log(beta)
- (
gammaln(r)
+ (r + x + n_purchases) * log(alpha + T + future_t)
+ s * log(beta + T + future_t)
)
)
log_B_two = (
r * log(alpha)
+ s * log(beta)
+ gammaln(r + s + x)
+ betaln(r + x + n_purchases, s + 1)
+ log(
hyp2f1(
r + s + x,
p,
r + s + x + n_purchases + 1,
abs_alpha_beta / (max_of_alpha_beta + T),
)
)
- (gammaln(r) + gammaln(s) + (r + s + x) * log(max_of_alpha_beta + T))
)
def _log_B_three(i):
return (
r * log(alpha)
+ s * log(beta)
+ gammaln(r + s + x + i)
+ betaln(r + x + n_purchases, s + 1)
+ log(
hyp2f1(
r + s + x + i,
p,
r + s + x + n_purchases + 1,
abs_alpha_beta / (max_of_alpha_beta + T + future_t),
)
)
- (
gammaln(r)
+ gammaln(s)
+ (r + s + x + i) * log(max_of_alpha_beta + T + future_t)
)
)
zeroth_term = (n_purchases == 0) * (1 - exp(log_p_zero))
# ignore numerical errors when future_t <= 0,
# this is an unusual edge case in practice, so refactoring is unwarranted
with np.errstate(divide="ignore", invalid="ignore"):
first_term = (
n_purchases * log(xarray.DataArray(future_t))
- gammaln(n_purchases + 1)
+ log_B_one
- loglike
)
second_term = log_B_two - loglike
third_term = xr_logsumexp(
xarray.concat(
[
i * log(cast(xarray.DataArray, future_t))
- gammaln(i + 1)
+ _log_B_three(i)
- loglike
for i in range(n_purchases + 1)
],
dim="concat_dim_",
),
dims="concat_dim_",
)
purchase_prob = zeroth_term + exp(
xr_logsumexp(
xarray.concat([first_term, second_term, third_term], dim="_concat_dim"),
b=xarray.DataArray(data=[1, 1, -1], dims="_concat_dim"),
dims="_concat_dim",
)
)
purchase_prob = xarray.where(
cast(xarray.DataArray, future_t) <= 0,
purchase_prob.fillna(0),
purchase_prob,
)
return purchase_prob.transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def expected_purchases_new_customer(
self,
data: pd.DataFrame | None = None,
*,
t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""Compute the expected number of purchases for a new customer across *t* time periods.
In a model with covariates, if `data` is not specified, the dataset used for fitting will be used and
a prediction will be computed for a *new customer* with each set of covariates.
*This is not a conditional prediction for observed customers!*
Adapted from equation (27) in Bruce Hardie's notes [1]_, and `lifetimes` package:
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/pareto_nbd_fitter.py#L359
Parameters
----------
data : ~pandas.DataFrame, optional
Dataframe containing the following columns:
* `customer_id`: unique customer identifier
* `t`: Optional column for *t* parametrization.
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
t : array_like, optional
Number of time periods over which to estimate purchases.
Not required if `data` Dataframe contains a *t* column.
References
----------
.. [1] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions."
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
"""
if data is None:
data = self.data
if t is not None:
data = data.assign(t=t)
dataset = self._extract_predictive_variables(data, customer_varnames=["t"])
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
t = dataset["t"]
first_term = r * beta / alpha / (s - 1)
second_term = 1 - (beta / (beta + t)) ** (s - 1)
return (first_term * second_term).transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def distribution_new_customer(
self,
data: pd.DataFrame | None = None,
*,
T: int | np.ndarray | pd.Series | None = None,
random_seed: RandomState | None = None,
var_names: Sequence[
Literal["dropout", "purchase_rate", "recency_frequency"]
] = (
"dropout",
"purchase_rate",
"recency_frequency",
),
n_samples: int = 1000,
) -> xarray.Dataset:
"""Compute posterior predictive samples of dropout, purchase rate and frequency/recency of new customers.
In a model with covariates, if `data` is not specified, the dataset used for fitting will be used and
a prediction will be computed for a *new customer* with each set of covariates.
*This is not a conditional prediction for observed customers!*
Parameters
----------
data : ~pandas.DataFrame, Optional
DataFrame containing the following columns:
* `customer_id`: Unique customer identifier
* `T`: Time between the first purchase and the end of the observation period
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
T : array_like, optional
time between the first purchase and the end of the observation period.
Not needed if `data` parameter is provided with a `T` column.
random_seed : ~numpy.random.RandomState, optional
Random state to use for sampling.
var_names : sequence of str, optional
Names of the variables to sample from. Defaults to ["dropout", "purchase_rate", "recency_frequency"].
n_samples : int, optional
Number of samples to generate. Defaults to 1000
"""
if data is None:
data = self.data
if T is not None:
data = data.assign(T=T)
dataset = self._extract_predictive_variables(data, customer_varnames=["T"])
T = dataset["T"].values
# Delete "T" so we can pass dataset directly to `sample_posterior_predictive`
del dataset["T"]
if dataset.sizes["chain"] == 1 and dataset.sizes["draw"] == 1:
# For map fit add a dummy draw dimension
dataset = dataset.squeeze("draw").expand_dims(draw=range(n_samples))
coords = self.model.coords.copy() # type: ignore
coords["customer_id"] = data["customer_id"]
with pm.Model(coords=coords) as pred_model:
if self.purchase_covariate_cols:
alpha = pm.Flat("alpha", dims=["customer_id"])
else:
alpha = pm.Flat("alpha")
if self.dropout_covariate_cols:
beta = pm.Flat("beta", dims=["customer_id"])
else:
beta = pm.Flat("beta")
r = pm.Flat("r")
s = pm.Flat("s")
pm.Gamma(
"purchase_rate",
alpha=r,
beta=alpha,
dims=pred_model.named_vars_to_dims.get("alpha"),
)
pm.Gamma(
"dropout",
alpha=s,
beta=beta,
dims=pred_model.named_vars_to_dims.get("beta"),
)
ParetoNBD(
name="recency_frequency",
r=r,
alpha=alpha,
s=s,
beta=beta,
T=T,
dims=["customer_id", "obs_var"],
)
return pm.sample_posterior_predictive(
dataset,
var_names=var_names,
random_seed=random_seed,
predictions=True,
).predictions
[docs]
def distribution_new_customer_dropout(
self,
data: pd.DataFrame | None = None,
*,
random_seed: RandomState | None = None,
) -> xarray.Dataset:
"""Sample from the Gamma distribution representing dropout times for new customers.
This is the duration of time a new customer is active before churning, or dropping out.
Parameters
----------
data : ~pandas.DataFrame, optional
DataFrame containing the following columns:
* `customer_id`: Unique customer identifier
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
random_seed : ~numpy.random.RandomState, optional
Random state to use for sampling.
Returns
-------
~xarray.Dataset
Dataset containing the posterior samples for the population-level dropout rate.
"""
return self.distribution_new_customer(
data=data,
random_seed=random_seed,
var_names=["dropout"],
)["dropout"]
[docs]
def distribution_new_customer_purchase_rate(
self,
data: pd.DataFrame | None = None,
*,
random_seed: RandomState | None = None,
) -> xarray.Dataset:
"""Sample from the Gamma distribution representing purchase rates for new customers.
This is the purchase rate for a new customer and determines the time between
purchases for any new customer.
Parameters
----------
data : ~pandas.DataFrame, optional
DataFrame containing the following columns:
* `customer_id`: Unique customer identifier
* All covariate columns specified when model was initialized.
If not provided, predictions will be ran with data used to fit model.
random_seed : ~numpy.random.RandomState, optional
Random state to use for sampling.
Returns
-------
~xarray.Dataset
Dataset containing the posterior samples for the population-level purchase rate.
"""
return self.distribution_new_customer(
data=data,
random_seed=random_seed,
var_names=["purchase_rate"],
)["purchase_rate"]
[docs]
def distribution_new_customer_recency_frequency(
self,
data: pd.DataFrame | None = None,
*,
T: int | np.ndarray | pd.Series | None = None,
random_seed: RandomState | None = None,
n_samples: int = 1000,
) -> xarray.Dataset:
"""Pareto/NBD process representing purchases across the customer population.
This is the distribution of purchase frequencies given 'T' observation periods for each customer.
Parameters
----------
data : ~pandas.DataFrame, optional
DataFrame containing the following columns:
* `customer_id`: Unique customer identifier
* `T`: Time between the first purchase and the end of the observation period.
* All covariate columns specified when model was initialized.
If not provided, the method will use the fit dataset.
T : array_like, optional
Number of observation periods for each customer. If not provided, T values from fit dataset will be used.
Not required if `data` Dataframe contains a `T` column.
random_seed : ~numpy.random.RandomState, optional
Random state to use for sampling.
n_samples : int, optional
Number of samples to generate. Defaults to 1000.
Returns
-------
~xarray.Dataset
Dataset containing the posterior samples for the customer population.
"""
return self.distribution_new_customer(
data=data,
T=T,
random_seed=random_seed,
var_names=["recency_frequency"],
n_samples=n_samples,
)["recency_frequency"]