# Copyright 2022 - 2025 The PyMC Labs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for the CLV module."""
import warnings
from datetime import date, datetime
import numpy as np
import pandas
import xarray
from numpy import datetime64
__all__ = [
"customer_lifetime_value",
"rfm_segments",
"rfm_summary",
"rfm_train_test_split",
"to_xarray",
]
[docs]
def to_xarray(customer_id, *arrays, dim: str = "customer_id"):
"""Convert vector arrays to xarray with a common dim (default "customer_id")."""
dims = (dim,)
coords = {dim: np.asarray(customer_id)}
res = tuple(
xarray.DataArray(data=array, coords=coords, dims=dims) for array in arrays
)
return res[0] if len(arrays) == 1 else res
[docs]
def customer_lifetime_value(
transaction_model,
data: pandas.DataFrame,
future_t: int = 12,
discount_rate: float = 0.00,
time_unit: str = "D",
) -> xarray.DataArray:
"""
Compute customer lifetime value.
Compute the average lifetime value for a group of one or more customers
and apply a discount rate for net present value estimations.
Note `future_t` is measured in months regardless of `time_unit` specified.
Adapted from lifetimes package
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L449
Parameters
----------
transaction_model : ~CLVModel
Predictive model for future transactions. `BetaGeoModel` and `ParetoNBDModel` are currently supported.
data : ~pandas.DataFrame
DataFrame containing the following columns:
* `customer_id`: Unique customer identifier
* `frequency`: Number of repeat purchases observed for each customer
* `recency`: Time between the first and the last purchase
* `T`: Time between the first purchase and the end of the observation period
* `future_spend`: Predicted monetary values for each customer
future_t : int, optional
The lifetime expected for the user in months. Default: 12
discount_rate : float, optional
The monthly adjusted discount rate. Default: 0.00
time_unit : string, optional
Unit of time of the purchase history. Defaults to "D" for daily.
Other options are "W" (weekly), "M" (monthly), and "H" (hourly).
Example: If your dataset contains information about weekly purchases,
you should use "W".
Returns
-------
xarray
DataArray containing estimated customer lifetime values
"""
if "future_spend" not in data.columns:
raise ValueError("Required column future_spend missing")
def _squeeze_dims(x: xarray.DataArray):
"""
Squeeze dimensions for MAP-fitted model predictions.
This utility is required for MAP-fitted model predictions to broadcast properly.
Parameters
----------
x : xarray.DataArray
DataArray to squeeze dimensions for.
Returns
-------
xarray.DataArray
DataArray with squeezed dimensions.
"""
dims_to_squeeze: tuple[str, ...] = ()
if "chain" in x.dims and len(x.chain) == 1:
dims_to_squeeze += ("chain",)
if "draw" in x.dims and len(x.draw) == 1:
dims_to_squeeze += ("draw",)
x = x.squeeze(dims_to_squeeze)
return x
if discount_rate == 0.0:
# no discount rate: just compute a single time step from 0 to `time`
steps = np.arange(future_t, future_t + 1)
else:
steps = np.arange(1, future_t + 1)
factor = {"W": 4.345, "M": 1.0, "D": 30, "H": 30 * 24}[time_unit]
monetary_value = to_xarray(data["customer_id"], data["future_spend"])
clv = xarray.DataArray(0.0)
# TODO: Add an IF block to support ShiftedBetaGeoModelIndividual
# initialize FOR loop with 0 purchases at future_t = 0
prev_expected_purchases = 0
for i in steps * factor:
# since the prediction of number of transactions is cumulative, we have to subtract off the previous periods
new_expected_purchases = _squeeze_dims(
transaction_model.expected_purchases(
data=data,
future_t=i,
)
)
expected_transactions = new_expected_purchases - prev_expected_purchases
prev_expected_purchases = new_expected_purchases
# sum up the CLV estimates of all the periods and apply discounted cash flow
clv = clv + (monetary_value * expected_transactions) / (1 + discount_rate) ** (
i / factor
)
# Add squeezed chain/draw dims
if "draw" not in clv.dims:
clv = clv.expand_dims({"draw": 1})
if "chain" not in clv.dims:
clv = clv.expand_dims({"chain": 1})
return clv.transpose("chain", "draw", "customer_id")
def _find_first_transactions(
transactions: pandas.DataFrame,
customer_id_col: str,
datetime_col: str,
monetary_value_col: str | None = None,
datetime_format: str | None = None,
observation_period_end: str | pandas.Period | datetime | None = None,
time_unit: str = "D",
sort_transactions: bool | None = True,
) -> pandas.DataFrame:
"""Return dataframe with first transactions.
This takes a DataFrame of transaction data of the form:
*customer_id, datetime [, monetary_value]*
and appends a column named *repeated* to the transaction log to indicate which rows
are repeated transactions for each *customer_id*.
Adapted from lifetimes package
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L148
Parameters
----------
transactions : ~pandas.DataFrame
A Pandas DataFrame containing *customer_id_col* and *datetime_col*.
customer_id_col : string
Column in the *transactions* DataFrame denoting the *customer_id*.
datetime_col : string
Column in the *transactions* DataFrame denoting datetimes purchase were made.
monetary_value_col : string, optional
Column in the *transactions* DataFrame that denotes the monetary value of the transaction.
Optional; only needed for spend estimation models like the Gamma-Gamma model.
datetime_format : string, optional
A string that represents the timestamp format. Useful if Pandas can't understand
the provided format.
observation_period_end : Union[str, pandas.Period, datetime], optional
A string or datetime to denote the final date of the study.
Events after this date are truncated. If not given, defaults to the max 'datetime_col'.
time_unit : string, optional
Time granularity for study.
Default : 'D' for days. Possible values listed here:
https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
sort_transactions : bool, optional
Default: True
If raw data is already sorted in chronological order, set to `False` to improve computational efficiency.
"""
select_columns = [customer_id_col, datetime_col]
if observation_period_end is None:
observation_period_end = transactions[datetime_col].max()
if isinstance(observation_period_end, pandas.Period):
observation_period_end = observation_period_end.to_timestamp()
if isinstance(observation_period_end, str):
observation_period_end = pandas.to_datetime(observation_period_end)
if monetary_value_col:
select_columns.append(monetary_value_col)
if sort_transactions:
transactions = transactions[select_columns].sort_values(select_columns).copy()
# convert date column into a DateTimeIndex for time-wise grouping and truncating
transactions[datetime_col] = pandas.to_datetime(
transactions[datetime_col], format=datetime_format
)
transactions = (
transactions.set_index(datetime_col).to_period(time_unit).to_timestamp()
)
mask = pandas.to_datetime(transactions.index) <= pandas.to_datetime(
observation_period_end
)
transactions = transactions.loc[mask].reset_index()
period_groupby = transactions.groupby(
[datetime_col, customer_id_col], sort=False, as_index=False
)
if monetary_value_col:
# when processing a monetary column, make sure to sum together transactions made in the same period
period_transactions = period_groupby.sum()
else:
# by calling head() on the groupby object, the datetime and customer_id columns
# will be reduced to the first transaction of that time period
period_transactions = period_groupby.head(1)
# create a new column for flagging first transactions
period_transactions = period_transactions.copy()
period_transactions.loc[:, "first"] = False
# find all first transactions and store as an index
first_transactions = (
period_transactions.groupby(customer_id_col, sort=True, as_index=False)
.head(1)
.index
)
# flag first transactions as True
period_transactions.loc[first_transactions, "first"] = True
select_columns.append("first")
# reset datetime_col to period
period_transactions[datetime_col] = period_transactions[datetime_col].dt.to_period(
time_unit
)
return period_transactions[select_columns]
[docs]
def rfm_summary(
transactions: pandas.DataFrame,
customer_id_col: str,
datetime_col: str,
monetary_value_col: str | None = None,
datetime_format: str | None = None,
observation_period_end: str | pandas.Period | datetime | None = None,
time_unit: str = "D",
time_scaler: float | None = 1,
include_first_transaction: bool | None = False,
sort_transactions: bool | None = True,
) -> pandas.DataFrame:
"""Summarize transaction data for use in CLV modeling or RFM segmentation.
This transforms a DataFrame of transaction data of the form:
*customer_id, datetime [, monetary_value]*
to a DataFrame for CLV modeling:
*customer_id, frequency, recency, T [, monetary_value]*
If the `include_first_transaction = True` argument is specified, a DataFrame for RFM segmentation is returned:
*customer_id, frequency, recency, monetary_value*
This function is not required if using the `clv.rfm_segments` utility.
Adapted from lifetimes package
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L230
Parameters
----------
transactions : ~pandas.DataFrame
A Pandas DataFrame containing *customer_id_col* and *datetime_col*.
customer_id_col : string
Column in the *transactions* DataFrame denoting the *customer_id*.
datetime_col : string
Column in the *transactions* DataFrame denoting datetimes purchase were made.
monetary_value_col : string, optional
Column in the transactions DataFrame denoting the monetary value of the transaction.
Optional; only needed for RFM segmentation and spend estimation models like the Gamma-Gamma model.
observation_period_end : Union[str, pandas.Period, datetime], optional
A string or datetime to denote the final date of the study.
Events after this date are truncated. If not given, defaults to the max 'datetime_col'.
datetime_format : string, optional
A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format.
time_unit : string, optional
Time granularity for study.
Default: 'D' for days. Possible values listed here:
https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
time_scaler : int, optional
Default: 1. Scales *recency* & *T* to a different time granularity.
This is useful for datasets spanning many years, and running predictions in different time scales.
datetime_format : string, optional
A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format.
monetary_value_col : string, optional
Column in the *transactions* DataFrame that denotes the monetary value of the transaction.
Optional; only needed for spend estimation models like the Gamma-Gamma model.
include_first_transaction : bool, optional
Default: *False*
For predictive CLV modeling, this should be *False*.
Set to *True* if performing RFM segmentation.
sort_transactions : bool, optional
Default: *True*
If raw data is already sorted in chronological order, set to *False* to improve computational efficiency.
Returns
-------
DataFrame
Dataframe containing summarized RFM data, and test columns for *frequency*, *T*,
and *monetary_value* if specified
"""
if observation_period_end is None:
observation_period_end_ts = (
pandas.to_datetime(transactions[datetime_col].max(), format=datetime_format)
.to_period(time_unit)
.to_timestamp()
)
elif isinstance(observation_period_end, pandas.Period):
observation_period_end_ts = observation_period_end.to_timestamp()
else:
observation_period_end_ts = (
pandas.to_datetime(observation_period_end, format=datetime_format)
.to_period(time_unit)
.to_timestamp()
)
# label repeated transactions
repeated_transactions = _find_first_transactions( # type: ignore
transactions,
customer_id_col,
datetime_col,
monetary_value_col,
datetime_format,
observation_period_end_ts,
time_unit,
sort_transactions,
)
# reset datetime_col to timestamp
repeated_transactions[datetime_col] = repeated_transactions[
datetime_col
].dt.to_timestamp()
# count all orders by customer
customers = repeated_transactions.groupby(customer_id_col, sort=False)[
datetime_col
].agg(["min", "max", "count"])
# subtract 1 from count, as we ignore the first order.
customers["frequency"] = customers["count"] - 1
customers["recency"] = (
(pandas.to_datetime(customers["max"]) - pandas.to_datetime(customers["min"]))
/ np.timedelta64(1, time_unit) # type: ignore[call-overload]
/ time_scaler
)
customers["T"] = (
(observation_period_end_ts - customers["min"])
/ np.timedelta64(1, time_unit) # type: ignore[call-overload]
/ time_scaler
)
summary_columns = ["frequency", "recency", "T"]
if include_first_transaction:
# add the first order back to the frequency count
customers["frequency"] = customers["frequency"] + 1
# change recency to segmentation definition
customers["recency"] = customers["T"] - customers["recency"]
# T column is not used for segmentation
summary_columns = ["frequency", "recency"]
if monetary_value_col:
if not include_first_transaction:
# create an index of all the first purchases
first_purchases = repeated_transactions[
repeated_transactions["first"]
].index
# by setting the monetary_value cells of all the first purchases to NaN,
# those values will be excluded from the mean value calculation
repeated_transactions.loc[first_purchases, monetary_value_col] = np.nan
customers["monetary_value"] = (
repeated_transactions.groupby(customer_id_col)[monetary_value_col]
.mean()
.fillna(0)
)
summary_columns.append("monetary_value")
summary_df = customers[summary_columns].astype(float)
summary_df = summary_df.reset_index().rename(
columns={customer_id_col: "customer_id"}
)
return summary_df
[docs]
def rfm_train_test_split(
transactions: pandas.DataFrame,
customer_id_col: str,
datetime_col: str,
train_period_end: float | str | datetime | datetime64 | date,
test_period_end: float | str | datetime | datetime64 | date | None = None,
time_unit: str = "D",
time_scaler: float | None = 1,
datetime_format: str | None = None,
monetary_value_col: str | None = None,
include_first_transaction: bool | None = False,
sort_transactions: bool | None = True,
) -> pandas.DataFrame:
"""Summarize transaction data and split into training and tests datasets for CLV modeling.
This can also be used to evaluate the impact of a time-based intervention like a marketing campaign.
This transforms a DataFrame of transaction data of the form:
*customer_id, datetime [, monetary_value]*
to a DataFrame of the form:
*customer_id, frequency, recency, T [, monetary_value], test_frequency [, test_monetary_value], test_T*
Note this function will exclude new customers whose first transactions occurred during the test period.
Adapted from lifetimes package
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L27
Parameters
----------
transactions : ~pandas.DataFrame
A Pandas DataFrame containing *customer_id_col* and *datetime_col*.
customer_id_col : string
Column in the *transactions* DataFrame denoting the customer_id.
datetime_col : string
Column in the *transactions* DataFrame denoting datetimes purchases were made.
train_period_end : Union[str, pandas.Period, datetime], optional
A string or datetime to denote the final time period for the training data.
Events after this time period are used for the test data.
test_period_end : Union[str, pandas.Period, datetime], optional
A string or datetime to denote the final time period of the study.
Events after this date are truncated. If not given, defaults to the max of *datetime_col*.
time_unit : string, optional
Time granularity for study.
Default: 'D' for days. Possible values listed here:
https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
time_scaler : int, optional
Default: 1. Scales *recency* & *T* to a different time granularity.
This is useful for datasets spanning many years, and running predictions in different time scales.
datetime_format : string, optional
A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format.
monetary_value_col : string, optional
Column in the *transactions* DataFrame that denotes the monetary value of the transaction.
Optional; only needed for spend estimation models like the Gamma-Gamma model.
include_first_transaction : bool, optional
Default: *False*
For predictive CLV modeling, this should be *False*.
Set to *True* if performing RFM segmentation.
sort_transactions : bool, optional
Default: *True*
If raw data is already sorted in chronological order, set to *False* to improve computational efficiency.
Returns
-------
DataFrame
Dataframe containing summarized RFM data, and test columns for *frequency*, *T*,
and *monetary_value* if specified
"""
if test_period_end is None:
test_period_end = transactions[datetime_col].max()
transaction_cols = [customer_id_col, datetime_col]
if monetary_value_col:
transaction_cols.append(monetary_value_col)
transactions = transactions[transaction_cols].copy()
transactions[datetime_col] = pandas.to_datetime(
transactions[datetime_col], format=datetime_format
)
test_period_end = pandas.to_datetime(test_period_end, format=datetime_format)
train_period_end = pandas.to_datetime(train_period_end, format=datetime_format)
# create training dataset
training_transactions = transactions.loc[
transactions[datetime_col] <= train_period_end
]
if training_transactions.empty:
error_msg = """No data available. Check `test_transactions` and `train_period_end`
and confirm values in `transactions` occur prior to those time periods."""
raise ValueError(error_msg)
training_rfm_data = rfm_summary(
training_transactions,
customer_id_col,
datetime_col,
monetary_value_col=monetary_value_col,
datetime_format=datetime_format,
observation_period_end=train_period_end,
time_unit=time_unit,
time_scaler=time_scaler,
include_first_transaction=include_first_transaction,
sort_transactions=sort_transactions,
)
# create test dataset
test_transactions = transactions.loc[
(test_period_end >= transactions[datetime_col])
& (transactions[datetime_col] > train_period_end)
].copy()
if test_transactions.empty:
error_msg = """No data available. Check `test_transactions` and `train_period_end`
and confirm values in `transactions` occur prior to those time periods."""
raise ValueError(error_msg)
test_transactions[datetime_col] = test_transactions[datetime_col].dt.to_period(
time_unit
)
# create dataframe with customer_id and test_frequency columns
test_rfm_data = (
test_transactions.groupby([customer_id_col, datetime_col], sort=False)[
datetime_col
]
.agg(lambda r: 1)
.groupby(level=customer_id_col)
.count()
).reset_index()
test_rfm_data = test_rfm_data.rename(
columns={customer_id_col: "customer_id", datetime_col: "test_frequency"}
)
if monetary_value_col:
test_monetary_value = (
test_transactions.groupby([customer_id_col, datetime_col])[
monetary_value_col
]
.sum()
.groupby(customer_id_col)
.mean()
)
test_rfm_data = test_rfm_data.merge(
test_monetary_value,
left_on="customer_id",
right_on=customer_id_col,
how="inner",
)
test_rfm_data = test_rfm_data.rename(
columns={monetary_value_col: "test_monetary_value"}
)
train_test_rfm_data = training_rfm_data.merge(
test_rfm_data, on="customer_id", how="left"
)
train_test_rfm_data.fillna(0, inplace=True)
time_delta = (
test_period_end.to_period(time_unit) - train_period_end.to_period(time_unit)
).n
train_test_rfm_data["test_T"] = time_delta / time_scaler # type: ignore
return train_test_rfm_data
[docs]
def rfm_segments(
transactions: pandas.DataFrame,
customer_id_col: str,
datetime_col: str,
monetary_value_col: str,
segment_config: dict | None = None,
observation_period_end: str | pandas.Period | datetime | None = None,
datetime_format: str | None = None,
time_unit: str = "D",
time_scaler: float | None = 1,
sort_transactions: bool | None = True,
) -> pandas.DataFrame:
"""Assign customers to segments based on spending behavior derived from RFM scores.
This transforms a DataFrame of transaction data of the form:
*customer_id, datetime, monetary_value*
to a DataFrame of the form:
*customer_id, frequency, recency, monetary_value, rfm_score, segment*
Customer purchasing data is aggregated into three variables: `recency`, `frequency`, and `monetary_value`.
Quartiles are estimated for each variable, and a three-digit RFM score is then assigned to each customer.
For example, a customer with a score of '234' is in the second quartile for `recency`, third quartile for
`frequency`, and fourth quartile for `monetary_value`.
RFM scores corresponding to segments such as "Top Spender", "Frequent Buyer", or "At-Risk" are determined, and
customers are then segmented based on their RFM score.
By default, the following segments are created:
- "Premium Customer": Customers in top 2 quartiles for all variables.
- "Repeat Customer": Customers in top 2 quartiles for frequency, and either recency or monetary value.
- "Top Spender": Customers in top 2 quartiles for monetary value, and either frequency or recency.
- "At-Risk Customer": Customers in bottom 2 quartiles for two or more variables.
- "Inactive Customer": Customers in bottom quartile for two or more variables.
- Customers with unspecified RFM scores will be assigned to a segment named "Other".
If an alternative segmentation approach is desired, use
`rfm_summary(include_first_transaction=True, *args, **kwargs)` instead to preprocess data for segmentation.
In either case, the returned DataFrame cannot be used for modeling.
If assigning model predictions to RFM segments, create a separate DataFrame for modeling and join by Customer ID.
Parameters
----------
transactions : ~pandas.DataFrame
A Pandas DataFrame containing *customer_id_col* and *datetime_col*.
customer_id_col : string
Column in the *transactions* DataFrame denoting the *customer_id*.
datetime_col : string
Column in the *transactions* DataFrame denoting datetimes purchase were made.
monetary_value_col : string
Column in the *transactions* DataFrame that denotes the monetary value of the transaction.
segment_config : dict, optional
Dictionary containing segment names and list of RFM score assignments;
key/value pairs should be formatted as `{"segment": ['111', '123', '321'], ...}`.
If not provided, default segment names and definitions are applied.
observation_period_end : Union[str, pandas.Period, datetime, None], optional
A string or datetime to denote the final date of the study.
Events after this date are truncated. If not given, defaults to the max of *datetime_col*.
datetime_format : string, optional
A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format.
time_unit : string, optional
Time granularity for study.
Default: 'D' for days. Possible values listed here:
https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
time_scaler : int, optional
Default: 1. Scales *recency* & *T* to a different time granularity.
This is useful for datasets spanning many years, and running predictions in different time scales.
sort_transactions : bool, optional
Default: *True*
If raw data is already sorted in chronological order, set to *False* to improve computational efficiency.
Returns
-------
DataFrame
Dataframe containing summarized RFM data, RFM scores, and segment assignments
"""
rfm_data = rfm_summary(
transactions,
customer_id_col=customer_id_col,
datetime_col=datetime_col,
monetary_value_col=monetary_value_col,
observation_period_end=observation_period_end,
datetime_format=datetime_format,
time_unit=time_unit,
time_scaler=time_scaler,
include_first_transaction=True,
sort_transactions=sort_transactions,
)
# iteratively assign quartile labels for each row/variable
for column_name in zip(
["r_quartile", "f_quartile", "m_quartile"],
["recency", "frequency", "monetary_value"],
strict=False,
):
# If data has many repeat values, fewer than 4 bins will be returned.
# These try blocks will modify labelling for fewer bins.
try:
labels = _rfm_quartile_labels(column_name[0], 5)
rfm_data[column_name[0]] = pandas.qcut(
rfm_data[column_name[1]], q=4, labels=labels, duplicates="drop"
).astype(str)
except ValueError:
try:
labels = _rfm_quartile_labels(column_name[0], 4)
rfm_data[column_name[0]] = pandas.qcut(
rfm_data[column_name[1]], q=4, labels=labels, duplicates="drop"
).astype(str)
except ValueError:
labels = _rfm_quartile_labels(column_name[0], 3)
rfm_data[column_name[0]] = pandas.qcut(
rfm_data[column_name[1]], q=4, labels=labels, duplicates="drop"
).astype(str)
warnings.warn(
f"RFM score will not exceed 2 for {column_name[0]}. Specify a custom segment_config",
UserWarning,
stacklevel=1,
)
rfm_data = pandas.eval( # type: ignore
"rfm_score = rfm_data.r_quartile + rfm_data.f_quartile + rfm_data.m_quartile",
target=rfm_data,
)
if segment_config is None:
segment_config = _default_rfm_segment_config
segment_names = list(segment_config.keys())
# create catch-all "Other" segment and assign defined segments from config
rfm_data["segment"] = "Other"
for key in segment_names:
rfm_data.loc[rfm_data["rfm_score"].isin(segment_config[key]), "segment"] = key
# drop unnecessary columns
rfm_data = rfm_data.drop(columns=["r_quartile", "f_quartile", "m_quartile"])
return rfm_data
def _rfm_quartile_labels(column_name, max_label_range):
"""
Label quartiles for each variable.
Called internally by rfm_segments to label quartiles for each variable.
Parameters
----------
column_name : str
The name of the column to label.
max_label_range : int
The maximum range of labels to create.
Returns
-------
list[int]
A list of labels for the column.
"""
# recency labels must be reversed because lower values are more desirable
if column_name == "r_quartile":
return list(range(max_label_range - 1, 0, -1))
else:
return range(1, max_label_range)
_default_rfm_segment_config = {
"Premium Customer": [
"334",
"443",
"444",
"344",
"434",
"433",
"343",
"333",
],
"Repeat Customer": ["244", "234", "232", "332", "143", "233", "243"],
"Top Spender": [
"424",
"414",
"144",
"314",
"324",
"124",
"224",
"423",
"413",
"133",
"323",
"313",
"134",
],
"At Risk Customer": [
"422",
"223",
"212",
"122",
"222",
"132",
"322",
"312",
"412",
"123",
"214",
],
"Inactive Customer": ["411", "111", "113", "114", "112", "211", "311"],
}
def _expected_cumulative_transactions(
model,
transactions: pandas.DataFrame,
customer_id_col: str,
datetime_col: str,
t: int,
datetime_format: str | None = None,
time_unit: str = "D",
time_scaler: float | None = 1,
sort_transactions: bool | None = True,
set_index_date: bool | None = False,
):
"""
Aggregate actual and expected cumulative transactions over time for a fitted ``BetaGeoModel`` or ``ParetoNBDModel``.
This function follows the formulation on page 8 of [1]_. Specifically, we take only customers who have made their
first transaction before the specified number of ``t`` time periods, run ``expected_purchases_new_customer()``
for all remaining time periods, then sum across the customer population.
Adapted from legacy ``lifetimes`` library:
https://github.com/CamDavidsonPilon/lifetimes/blob/master/lifetimes/utils.py#L506
Parameters
----------
model:
A fitted ``BetaGeoModel`` or ``ParetoNBDModel``.
transactions : ~pandas.DataFrame
A Pandas DataFrame containing *customer_id_col* and *datetime_col*.
customer_id_col : string
Column in the *transactions* DataFrame denoting the *customer_id*.
datetime_col : string
Column in the *transactions* DataFrame denoting datetimes purchase were made.
t: int
Number of time units since earliest transaction for which we want to aggregate cumulative transactions.
datetime_format : string, optional
A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format.
time_unit : string, optional
Time granularity for study.
Default: 'D' for days. Possible values listed here:
https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
time_scaler : int, optional
Default: 1. Scales *recency* & *T* to a different time granularity.
This is useful for datasets spanning many years, and running predictions in different time scales.
sort_transactions : bool, optional
Default: *True*
If raw data is already sorted in chronological order, set to *False* to improve computational efficiency.
set_index_date: bool, optional
Set to True to return a dataframe with a datetime index.
Returns
-------
DataFrame
Dataframe containing columns for actual and predicted values
References
----------
.. [1] Fader, Peter S., Bruce G.S. Hardie, and Ka Lok Lee (2005),
A Note on Implementing the Pareto/NBD Model in MATLAB.
http://brucehardie.com/notes/008/
"""
start_date = pandas.to_datetime(
transactions[datetime_col], format=datetime_format
).min()
start_period = start_date.to_period(time_unit)
observation_period_end = start_period + t
# Has an extra column (besides the id and the date)
# with a boolean for when it is a first transaction
repeated_and_first_transactions = _find_first_transactions( # type: ignore
transactions,
customer_id_col,
datetime_col,
datetime_format=datetime_format,
observation_period_end=observation_period_end,
time_unit=time_unit,
sort_transactions=sort_transactions,
)
# Mask, first transactions and repeated transactions
first_trans_mask = repeated_and_first_transactions["first"]
repeated_transactions = repeated_and_first_transactions[~first_trans_mask]
first_transactions = repeated_and_first_transactions[first_trans_mask]
date_range = pandas.date_range(start_date, periods=t + 1, freq=time_unit)
date_periods = date_range.to_period(time_unit)
pred_cum_transactions = np.array([])
# First Transactions on Each Day/Freq
first_trans_size = first_transactions.groupby(datetime_col).size()
# In the loop below, we calculate the expected number of purchases for customers
# who have made their first purchases on a date before the one being evaluated.
# Then we sum them to get the cumulative sum up to the specific period.
for i, period in enumerate(date_periods): # index of period and its date
if i % time_scaler == 0 and i > 0: # type: ignore
# Periods before the one being evaluated
times = np.array([d.n for d in period - first_trans_size.index])
times = times[times > 0].astype(float) / time_scaler
# create arbitrary dataframe from array of n time periods for predictions
pred_data = pandas.DataFrame(
{
"customer_id": times,
"t": times,
}
)
# Array of different expected number of purchases for different times
# TODO: This does not currently support a covariate model
expected_trans_array = model.expected_purchases_new_customer(
pred_data
).mean(dim=("chain", "draw"))
# Mask for the number of customers with 1st transactions up to the period
mask = first_trans_size.index < period
masked_first_trans = first_trans_size[mask].values # type: ignore
# ``expected_trans`` is an xarray with the cumulative sum of expected transactions
expected_trans = (expected_trans_array * masked_first_trans).sum()
pred_cum_transactions = np.append(
pred_cum_transactions, expected_trans.values
)
act_trans = repeated_transactions.groupby(datetime_col).size()
act_tracking_transactions = act_trans.reindex(date_periods, fill_value=0)
act_cum_transactions = []
for j in range(1, t // time_scaler + 1): # type: ignore
sum_trans = sum(act_tracking_transactions.iloc[: j * time_scaler]) # type: ignore
act_cum_transactions.append(sum_trans)
if set_index_date:
index = date_periods[time_scaler - 1 : -1 : time_scaler] # type: ignore
else:
index = range(0, t // time_scaler) # type: ignore
df_cum_transactions = pandas.DataFrame(
{"actual": act_cum_transactions, "predicted": pred_cum_transactions},
index=index,
)
return df_cum_transactions