Source code for pymc_marketing.clv.utils

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Utilities for the CLV module."""

import warnings
from datetime import date, datetime, timedelta

import narwhals as nw
import numpy as np
import pandas
import xarray
from narwhals.typing import IntoFrameT
from numpy import datetime64

__all__ = [
    "customer_lifetime_value",
    "rfm_segments",
    "rfm_summary",
    "rfm_train_test_split",
    "to_xarray",
]


[docs] def to_xarray(customer_id, *arrays, dim: str = "customer_id"): """Convert vector arrays to xarray with a common dim (default "customer_id").""" dims = (dim,) coords = {dim: np.asarray(customer_id)} res = tuple( xarray.DataArray(data=array, coords=coords, dims=dims) for array in arrays ) return res[0] if len(arrays) == 1 else res
[docs] def customer_lifetime_value( transaction_model, data: pandas.DataFrame, future_t: int = 12, discount_rate: float = 0.00, time_unit: str = "D", ) -> xarray.DataArray: """ Compute customer lifetime value. Compute the average lifetime value for a group of one or more customers and apply a discount rate for net present value estimations. Note `future_t` is measured in months regardless of `time_unit` specified. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L449 Parameters ---------- transaction_model : ~CLVModel Predictive model for future transactions. `BetaGeoModel` and `ParetoNBDModel` are currently supported. data : ~pandas.DataFrame DataFrame containing the following columns: * `customer_id`: Unique customer identifier * `frequency`: Number of repeat purchases observed for each customer * `recency`: Time between the first and the last purchase * `T`: Time between the first purchase and the end of the observation period * `future_spend`: Predicted monetary values for each customer future_t : int, optional The lifetime expected for the user in months. Default: 12 discount_rate : float, optional The monthly adjusted discount rate. Default: 0.00 time_unit : string, optional Unit of time of the purchase history. Defaults to "D" for daily. Other options are "W" (weekly), "M" (monthly), and "H" (hourly). Example: If your dataset contains information about weekly purchases, you should use "W". Returns ------- xarray DataArray containing estimated customer lifetime values """ if "future_spend" not in data.columns: raise ValueError("Required column future_spend missing") def _squeeze_dims(x: xarray.DataArray): """ Squeeze dimensions for MAP-fitted model predictions. This utility is required for MAP-fitted model predictions to broadcast properly. Parameters ---------- x : xarray.DataArray DataArray to squeeze dimensions for. Returns ------- xarray.DataArray DataArray with squeezed dimensions. """ dims_to_squeeze: tuple[str, ...] = () if "chain" in x.dims and len(x.chain) == 1: dims_to_squeeze += ("chain",) if "draw" in x.dims and len(x.draw) == 1: dims_to_squeeze += ("draw",) x = x.squeeze(dims_to_squeeze) return x if discount_rate == 0.0: # no discount rate: just compute a single time step from 0 to `time` steps = np.arange(future_t, future_t + 1) else: steps = np.arange(1, future_t + 1) factor = {"W": 4.345, "M": 1.0, "D": 30, "H": 30 * 24}[time_unit] monetary_value = to_xarray(data["customer_id"], data["future_spend"]) clv = xarray.DataArray(0.0) # TODO: Add an IF block to support ShiftedBetaGeoModelIndividual # initialize FOR loop with 0 purchases at future_t = 0 prev_expected_purchases = 0 for i in steps * factor: # since the prediction of number of transactions is cumulative, we have to subtract off the previous periods new_expected_purchases = _squeeze_dims( transaction_model.expected_purchases( data=data, future_t=i, ) ) expected_transactions = new_expected_purchases - prev_expected_purchases prev_expected_purchases = new_expected_purchases # sum up the CLV estimates of all the periods and apply discounted cash flow clv = clv + (monetary_value * expected_transactions) / (1 + discount_rate) ** ( i / factor ) # Add squeezed chain/draw dims if "draw" not in clv.dims: clv = clv.expand_dims({"draw": 1}) if "chain" not in clv.dims: clv = clv.expand_dims({"chain": 1}) return clv.transpose("chain", "draw", "customer_id")
def _find_first_transactions_alternative( transactions: IntoFrameT, customer_id_col: str, datetime_col: str, monetary_value_col: str | None = None, datetime_format: str | None = None, ) -> IntoFrameT: transactions = nw.from_native(transactions) first_date = transactions.group_by(customer_id_col).agg( first_date=nw.col(datetime_col).min() ) agg_cols = [] if monetary_value_col is None else [nw.col(monetary_value_col).sum()] agg = transactions.group_by([customer_id_col, datetime_col]).agg(*agg_cols) return ( agg.join(first_date, on=customer_id_col) .with_columns(first=nw.col(datetime_col) == nw.col("first_date")) .drop("first_date") .to_native() ) def _find_first_transactions( transactions: pandas.DataFrame, customer_id_col: str, datetime_col: str, monetary_value_col: str | None = None, datetime_format: str | None = None, observation_period_end: str | pandas.Period | datetime | None = None, time_unit: str = "D", sort_transactions: bool | None = True, ) -> pandas.DataFrame: """Return dataframe with first transactions. This takes a DataFrame of transaction data of the form: *customer_id, datetime [, monetary_value]* and appends a column named *repeated* to the transaction log to indicate which rows are repeated transactions for each *customer_id*. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L148 Parameters ---------- transactions : ~pandas.DataFrame A Pandas DataFrame containing *customer_id_col* and *datetime_col*. customer_id_col : string Column in the *transactions* DataFrame denoting the *customer_id*. datetime_col : string Column in the *transactions* DataFrame denoting datetimes purchase were made. monetary_value_col : string, optional Column in the *transactions* DataFrame that denotes the monetary value of the transaction. Optional; only needed for spend estimation models like the Gamma-Gamma model. datetime_format : string, optional A string that represents the timestamp format. Useful if Pandas can't understand the provided format. observation_period_end : Union[str, pandas.Period, datetime], optional A string or datetime to denote the final date of the study. Events after this date are truncated. If not given, defaults to the max 'datetime_col'. time_unit : string, optional Time granularity for study. Default : 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units sort_transactions : bool, optional Default: True If raw data is already sorted in chronological order, set to `False` to improve computational efficiency. """ select_columns = [customer_id_col, datetime_col] if observation_period_end is None: observation_period_end = transactions[datetime_col].max() if isinstance(observation_period_end, pandas.Period): observation_period_end = observation_period_end.to_timestamp() if isinstance(observation_period_end, str): observation_period_end = pandas.to_datetime(observation_period_end) if monetary_value_col: select_columns.append(monetary_value_col) if sort_transactions: transactions = transactions[select_columns].sort_values(select_columns).copy() # convert date column into a DateTimeIndex for time-wise grouping and truncating transactions[datetime_col] = pandas.to_datetime( transactions[datetime_col], format=datetime_format ) transactions = ( transactions.set_index(datetime_col).to_period(time_unit).to_timestamp() ) mask = pandas.to_datetime(transactions.index) <= pandas.to_datetime( observation_period_end ) transactions = transactions.loc[mask].reset_index() period_groupby = transactions.groupby( [datetime_col, customer_id_col], sort=False, as_index=False ) if monetary_value_col: # when processing a monetary column, make sure to sum together transactions made in the same period period_transactions = period_groupby.sum() else: # by calling head() on the groupby object, the datetime and customer_id columns # will be reduced to the first transaction of that time period period_transactions = period_groupby.head(1) # create a new column for flagging first transactions period_transactions = period_transactions.copy() period_transactions.loc[:, "first"] = False # find all first transactions and store as an index first_transactions = ( period_transactions.groupby(customer_id_col, sort=True, as_index=False) .head(1) .index ) # flag first transactions as True period_transactions.loc[first_transactions, "first"] = True select_columns.append("first") # reset datetime_col to period period_transactions[datetime_col] = period_transactions[datetime_col].dt.to_period( time_unit ) return period_transactions[select_columns]
[docs] def rfm_summary_alternative( transactions: IntoFrameT, customer_id_col: str, datetime_col: str, monetary_value_col: str | None = None, datetime_format: str | None = None, observation_period_end: str | pandas.Period | datetime | None = None, time_scaler: float = 1.0, ) -> IntoFrameT: transactions = nw.from_native(transactions) date = nw.col(datetime_col).cast(nw.Datetime) if observation_period_end is None: observation_period_end = transactions[datetime_col].cast(nw.Datetime).max() repeated_transactions = _find_first_transactions_alternative( transactions, customer_id_col=customer_id_col, datetime_col=datetime_col, monetary_value_col=monetary_value_col, datetime_format=datetime_format, ) # TODO: Support the various units divisor = timedelta(days=1) * time_scaler additional_cols = ( [] if monetary_value_col is None else [nw.col(monetary_value_col).mean()] ) customers = ( nw.from_native(repeated_transactions) .group_by(customer_id_col) .agg( *additional_cols, min=date.min(), max=date.max(), count=date.len(), ) .with_columns( frequency=nw.col("count") - 1, recency=(nw.col("max") - nw.col("min")) / divisor, T=(observation_period_end - nw.col("min")) / divisor, ) .rename({customer_id_col: "customer_id"}) # .select(["customer_id", "frequency", "recency"]) ) return customers.to_native()
[docs] def rfm_summary( transactions: pandas.DataFrame, customer_id_col: str, datetime_col: str, monetary_value_col: str | None = None, datetime_format: str | None = None, observation_period_end: str | pandas.Period | datetime | None = None, time_unit: str = "D", time_scaler: float | None = 1, include_first_transaction: bool | None = False, sort_transactions: bool | None = True, ) -> pandas.DataFrame: """Summarize transaction data for use in CLV modeling or RFM segmentation. This transforms a DataFrame of transaction data of the form: *customer_id, datetime [, monetary_value]* to a DataFrame for CLV modeling: *customer_id, frequency, recency, T [, monetary_value]* If the `include_first_transaction = True` argument is specified, a DataFrame for RFM segmentation is returned: *customer_id, frequency, recency, monetary_value* This function is not required if using the `clv.rfm_segments` utility. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L230 Parameters ---------- transactions : ~pandas.DataFrame A Pandas DataFrame containing *customer_id_col* and *datetime_col*. customer_id_col : string Column in the *transactions* DataFrame denoting the *customer_id*. datetime_col : string Column in the *transactions* DataFrame denoting datetimes purchase were made. monetary_value_col : string, optional Column in the transactions DataFrame denoting the monetary value of the transaction. Optional; only needed for RFM segmentation and spend estimation models like the Gamma-Gamma model. observation_period_end : Union[str, pandas.Period, datetime], optional A string or datetime to denote the final date of the study. Events after this date are truncated. If not given, defaults to the max 'datetime_col'. datetime_format : string, optional A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format. time_unit : string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units time_scaler : int, optional Default: 1. Scales *recency* & *T* to a different time granularity. This is useful for datasets spanning many years, and running predictions in different time scales. datetime_format : string, optional A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format. monetary_value_col : string, optional Column in the *transactions* DataFrame that denotes the monetary value of the transaction. Optional; only needed for spend estimation models like the Gamma-Gamma model. include_first_transaction : bool, optional Default: *False* For predictive CLV modeling, this should be *False*. Set to *True* if performing RFM segmentation. sort_transactions : bool, optional Default: *True* If raw data is already sorted in chronological order, set to *False* to improve computational efficiency. Returns ------- DataFrame Dataframe containing summarized RFM data, and test columns for *frequency*, *T*, and *monetary_value* if specified """ if observation_period_end is None: observation_period_end_ts = ( pandas.to_datetime(transactions[datetime_col].max(), format=datetime_format) .to_period(time_unit) .to_timestamp() ) elif isinstance(observation_period_end, pandas.Period): observation_period_end_ts = observation_period_end.to_timestamp() else: observation_period_end_ts = ( pandas.to_datetime(observation_period_end, format=datetime_format) .to_period(time_unit) .to_timestamp() ) # label repeated transactions repeated_transactions = _find_first_transactions( # type: ignore transactions, customer_id_col, datetime_col, monetary_value_col, datetime_format, observation_period_end_ts, time_unit, sort_transactions, ) # reset datetime_col to timestamp repeated_transactions[datetime_col] = repeated_transactions[ datetime_col ].dt.to_timestamp() # count all orders by customer customers = repeated_transactions.groupby(customer_id_col, sort=False)[ datetime_col ].agg(["min", "max", "count"]) # subtract 1 from count, as we ignore the first order. customers["frequency"] = customers["count"] - 1 customers["recency"] = ( (pandas.to_datetime(customers["max"]) - pandas.to_datetime(customers["min"])) / np.timedelta64(1, time_unit) # type: ignore[call-overload] / time_scaler ) customers["T"] = ( (observation_period_end_ts - customers["min"]) / np.timedelta64(1, time_unit) # type: ignore[call-overload] / time_scaler ) summary_columns = ["frequency", "recency", "T"] if include_first_transaction: # add the first order back to the frequency count customers["frequency"] = customers["frequency"] + 1 # change recency to segmentation definition customers["recency"] = customers["T"] - customers["recency"] # T column is not used for segmentation summary_columns = ["frequency", "recency"] if monetary_value_col: if not include_first_transaction: # create an index of all the first purchases first_purchases = repeated_transactions[ repeated_transactions["first"] ].index # by setting the monetary_value cells of all the first purchases to NaN, # those values will be excluded from the mean value calculation repeated_transactions.loc[first_purchases, monetary_value_col] = np.nan customers["monetary_value"] = ( repeated_transactions.groupby(customer_id_col)[monetary_value_col] .mean() .fillna(0) ) summary_columns.append("monetary_value") summary_df = customers[summary_columns].astype(float) summary_df = summary_df.reset_index().rename( columns={customer_id_col: "customer_id"} ) return summary_df
[docs] def rfm_train_test_split( transactions: pandas.DataFrame, customer_id_col: str, datetime_col: str, train_period_end: float | str | datetime | datetime64 | date, test_period_end: float | str | datetime | datetime64 | date | None = None, time_unit: str = "D", time_scaler: float | None = 1, datetime_format: str | None = None, monetary_value_col: str | None = None, include_first_transaction: bool | None = False, sort_transactions: bool | None = True, ) -> pandas.DataFrame: """Summarize transaction data and split into training and tests datasets for CLV modeling. This can also be used to evaluate the impact of a time-based intervention like a marketing campaign. This transforms a DataFrame of transaction data of the form: *customer_id, datetime [, monetary_value]* to a DataFrame of the form: *customer_id, frequency, recency, T [, monetary_value], test_frequency [, test_monetary_value], test_T* Note this function will exclude new customers whose first transactions occurred during the test period. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L27 Parameters ---------- transactions : ~pandas.DataFrame A Pandas DataFrame containing *customer_id_col* and *datetime_col*. customer_id_col : string Column in the *transactions* DataFrame denoting the customer_id. datetime_col : string Column in the *transactions* DataFrame denoting datetimes purchases were made. train_period_end : Union[str, pandas.Period, datetime], optional A string or datetime to denote the final time period for the training data. Events after this time period are used for the test data. test_period_end : Union[str, pandas.Period, datetime], optional A string or datetime to denote the final time period of the study. Events after this date are truncated. If not given, defaults to the max of *datetime_col*. time_unit : string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units time_scaler : int, optional Default: 1. Scales *recency* & *T* to a different time granularity. This is useful for datasets spanning many years, and running predictions in different time scales. datetime_format : string, optional A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format. monetary_value_col : string, optional Column in the *transactions* DataFrame that denotes the monetary value of the transaction. Optional; only needed for spend estimation models like the Gamma-Gamma model. include_first_transaction : bool, optional Default: *False* For predictive CLV modeling, this should be *False*. Set to *True* if performing RFM segmentation. sort_transactions : bool, optional Default: *True* If raw data is already sorted in chronological order, set to *False* to improve computational efficiency. Returns ------- DataFrame Dataframe containing summarized RFM data, and test columns for *frequency*, *T*, and *monetary_value* if specified """ if test_period_end is None: test_period_end = transactions[datetime_col].max() transaction_cols = [customer_id_col, datetime_col] if monetary_value_col: transaction_cols.append(monetary_value_col) transactions = transactions[transaction_cols].copy() transactions[datetime_col] = pandas.to_datetime( transactions[datetime_col], format=datetime_format ) test_period_end = pandas.to_datetime(test_period_end, format=datetime_format) train_period_end = pandas.to_datetime(train_period_end, format=datetime_format) # create training dataset training_transactions = transactions.loc[ transactions[datetime_col] <= train_period_end ] if training_transactions.empty: error_msg = """No data available. Check `test_transactions` and `train_period_end` and confirm values in `transactions` occur prior to those time periods.""" raise ValueError(error_msg) training_rfm_data = rfm_summary( training_transactions, customer_id_col, datetime_col, monetary_value_col=monetary_value_col, datetime_format=datetime_format, observation_period_end=train_period_end, time_unit=time_unit, time_scaler=time_scaler, include_first_transaction=include_first_transaction, sort_transactions=sort_transactions, ) # create test dataset test_transactions = transactions.loc[ (test_period_end >= transactions[datetime_col]) & (transactions[datetime_col] > train_period_end) ].copy() if test_transactions.empty: error_msg = """No data available. Check `test_transactions` and `train_period_end` and confirm values in `transactions` occur prior to those time periods.""" raise ValueError(error_msg) test_transactions[datetime_col] = test_transactions[datetime_col].dt.to_period( time_unit ) # create dataframe with customer_id and test_frequency columns test_rfm_data = ( test_transactions.groupby([customer_id_col, datetime_col], sort=False)[ datetime_col ] .agg(lambda r: 1) .groupby(level=customer_id_col) .count() ).reset_index() test_rfm_data = test_rfm_data.rename( columns={customer_id_col: "customer_id", datetime_col: "test_frequency"} ) if monetary_value_col: test_monetary_value = ( test_transactions.groupby([customer_id_col, datetime_col])[ monetary_value_col ] .sum() .groupby(customer_id_col) .mean() ) test_rfm_data = test_rfm_data.merge( test_monetary_value, left_on="customer_id", right_on=customer_id_col, how="inner", ) test_rfm_data = test_rfm_data.rename( columns={monetary_value_col: "test_monetary_value"} ) train_test_rfm_data = training_rfm_data.merge( test_rfm_data, on="customer_id", how="left" ) train_test_rfm_data.fillna(0, inplace=True) time_delta = ( test_period_end.to_period(time_unit) - train_period_end.to_period(time_unit) ).n train_test_rfm_data["test_T"] = time_delta / time_scaler # type: ignore return train_test_rfm_data
[docs] def rfm_segments( transactions: pandas.DataFrame, customer_id_col: str, datetime_col: str, monetary_value_col: str, segment_config: dict | None = None, observation_period_end: str | pandas.Period | datetime | None = None, datetime_format: str | None = None, time_unit: str = "D", time_scaler: float | None = 1, sort_transactions: bool | None = True, ) -> pandas.DataFrame: """Assign customers to segments based on spending behavior derived from RFM scores. This transforms a DataFrame of transaction data of the form: *customer_id, datetime, monetary_value* to a DataFrame of the form: *customer_id, frequency, recency, monetary_value, rfm_score, segment* Customer purchasing data is aggregated into three variables: `recency`, `frequency`, and `monetary_value`. Quartiles are estimated for each variable, and a three-digit RFM score is then assigned to each customer. For example, a customer with a score of '234' is in the second quartile for `recency`, third quartile for `frequency`, and fourth quartile for `monetary_value`. RFM scores corresponding to segments such as "Top Spender", "Frequent Buyer", or "At-Risk" are determined, and customers are then segmented based on their RFM score. By default, the following segments are created: - "Premium Customer": Customers in top 2 quartiles for all variables. - "Repeat Customer": Customers in top 2 quartiles for frequency, and either recency or monetary value. - "Top Spender": Customers in top 2 quartiles for monetary value, and either frequency or recency. - "At-Risk Customer": Customers in bottom 2 quartiles for two or more variables. - "Inactive Customer": Customers in bottom quartile for two or more variables. - Customers with unspecified RFM scores will be assigned to a segment named "Other". If an alternative segmentation approach is desired, use `rfm_summary(include_first_transaction=True, *args, **kwargs)` instead to preprocess data for segmentation. In either case, the returned DataFrame cannot be used for modeling. If assigning model predictions to RFM segments, create a separate DataFrame for modeling and join by Customer ID. Parameters ---------- transactions : ~pandas.DataFrame A Pandas DataFrame containing *customer_id_col* and *datetime_col*. customer_id_col : string Column in the *transactions* DataFrame denoting the *customer_id*. datetime_col : string Column in the *transactions* DataFrame denoting datetimes purchase were made. monetary_value_col : string Column in the *transactions* DataFrame that denotes the monetary value of the transaction. segment_config : dict, optional Dictionary containing segment names and list of RFM score assignments; key/value pairs should be formatted as `{"segment": ['111', '123', '321'], ...}`. If not provided, default segment names and definitions are applied. observation_period_end : Union[str, pandas.Period, datetime, None], optional A string or datetime to denote the final date of the study. Events after this date are truncated. If not given, defaults to the max of *datetime_col*. datetime_format : string, optional A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format. time_unit : string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units time_scaler : int, optional Default: 1. Scales *recency* & *T* to a different time granularity. This is useful for datasets spanning many years, and running predictions in different time scales. sort_transactions : bool, optional Default: *True* If raw data is already sorted in chronological order, set to *False* to improve computational efficiency. Returns ------- DataFrame Dataframe containing summarized RFM data, RFM scores, and segment assignments """ rfm_data = rfm_summary( transactions, customer_id_col=customer_id_col, datetime_col=datetime_col, monetary_value_col=monetary_value_col, observation_period_end=observation_period_end, datetime_format=datetime_format, time_unit=time_unit, time_scaler=time_scaler, include_first_transaction=True, sort_transactions=sort_transactions, ) # iteratively assign quartile labels for each row/variable for column_name in zip( ["r_quartile", "f_quartile", "m_quartile"], ["recency", "frequency", "monetary_value"], strict=False, ): # If data has many repeat values, fewer than 4 bins will be returned. # These try blocks will modify labelling for fewer bins. try: labels = _rfm_quartile_labels(column_name[0], 5) rfm_data[column_name[0]] = pandas.qcut( rfm_data[column_name[1]], q=4, labels=labels, duplicates="drop" ).astype(str) except ValueError: try: labels = _rfm_quartile_labels(column_name[0], 4) rfm_data[column_name[0]] = pandas.qcut( rfm_data[column_name[1]], q=4, labels=labels, duplicates="drop" ).astype(str) except ValueError: labels = _rfm_quartile_labels(column_name[0], 3) rfm_data[column_name[0]] = pandas.qcut( rfm_data[column_name[1]], q=4, labels=labels, duplicates="drop" ).astype(str) warnings.warn( f"RFM score will not exceed 2 for {column_name[0]}. Specify a custom segment_config", UserWarning, stacklevel=1, ) rfm_data = pandas.eval( # type: ignore "rfm_score = rfm_data.r_quartile + rfm_data.f_quartile + rfm_data.m_quartile", target=rfm_data, ) if segment_config is None: segment_config = _default_rfm_segment_config segment_names = list(segment_config.keys()) # create catch-all "Other" segment and assign defined segments from config rfm_data["segment"] = "Other" for key in segment_names: rfm_data.loc[rfm_data["rfm_score"].isin(segment_config[key]), "segment"] = key # drop unnecessary columns rfm_data = rfm_data.drop(columns=["r_quartile", "f_quartile", "m_quartile"]) return rfm_data
def _rfm_quartile_labels(column_name, max_label_range): """ Label quartiles for each variable. Called internally by rfm_segments to label quartiles for each variable. Parameters ---------- column_name : str The name of the column to label. max_label_range : int The maximum range of labels to create. Returns ------- list[int] A list of labels for the column. """ # recency labels must be reversed because lower values are more desirable if column_name == "r_quartile": return list(range(max_label_range - 1, 0, -1)) else: return range(1, max_label_range) _default_rfm_segment_config = { "Premium Customer": [ "334", "443", "444", "344", "434", "433", "343", "333", ], "Repeat Customer": ["244", "234", "232", "332", "143", "233", "243"], "Top Spender": [ "424", "414", "144", "314", "324", "124", "224", "423", "413", "133", "323", "313", "134", ], "At Risk Customer": [ "422", "223", "212", "122", "222", "132", "322", "312", "412", "123", "214", ], "Inactive Customer": ["411", "111", "113", "114", "112", "211", "311"], } def _expected_cumulative_transactions( model, transactions: pandas.DataFrame, customer_id_col: str, datetime_col: str, t: int, datetime_format: str | None = None, time_unit: str = "D", time_scaler: float | None = 1, sort_transactions: bool | None = True, set_index_date: bool | None = False, ): """ Aggregate actual and expected cumulative transactions over time for a fitted ``BetaGeoModel`` or ``ParetoNBDModel``. This function follows the formulation on page 8 of [1]_. Specifically, we take only customers who have made their first transaction before the specified number of ``t`` time periods, run ``expected_purchases_new_customer()`` for all remaining time periods, then sum across the customer population. Adapted from legacy ``lifetimes`` library: https://github.com/CamDavidsonPilon/lifetimes/blob/master/lifetimes/utils.py#L506 Parameters ---------- model: A fitted ``BetaGeoModel`` or ``ParetoNBDModel``. transactions : ~pandas.DataFrame A Pandas DataFrame containing *customer_id_col* and *datetime_col*. customer_id_col : string Column in the *transactions* DataFrame denoting the *customer_id*. datetime_col : string Column in the *transactions* DataFrame denoting datetimes purchase were made. t: int Number of time units since earliest transaction for which we want to aggregate cumulative transactions. datetime_format : string, optional A string that represents the timestamp format. Useful if Pandas doesn't recognize the provided format. time_unit : string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units time_scaler : int, optional Default: 1. Scales *recency* & *T* to a different time granularity. This is useful for datasets spanning many years, and running predictions in different time scales. sort_transactions : bool, optional Default: *True* If raw data is already sorted in chronological order, set to *False* to improve computational efficiency. set_index_date: bool, optional Set to True to return a dataframe with a datetime index. Returns ------- DataFrame Dataframe containing columns for actual and predicted values References ---------- .. [1] Fader, Peter S., Bruce G.S. Hardie, and Ka Lok Lee (2005), A Note on Implementing the Pareto/NBD Model in MATLAB. http://brucehardie.com/notes/008/ """ start_date = pandas.to_datetime( transactions[datetime_col], format=datetime_format ).min() start_period = start_date.to_period(time_unit) observation_period_end = start_period + t # Has an extra column (besides the id and the date) # with a boolean for when it is a first transaction repeated_and_first_transactions = _find_first_transactions( # type: ignore transactions, customer_id_col, datetime_col, datetime_format=datetime_format, observation_period_end=observation_period_end, time_unit=time_unit, sort_transactions=sort_transactions, ) # Mask, first transactions and repeated transactions first_trans_mask = repeated_and_first_transactions["first"] repeated_transactions = repeated_and_first_transactions[~first_trans_mask] first_transactions = repeated_and_first_transactions[first_trans_mask] date_range = pandas.date_range(start_date, periods=t + 1, freq=time_unit) date_periods = date_range.to_period(time_unit) pred_cum_transactions = np.array([]) # First Transactions on Each Day/Freq first_trans_size = first_transactions.groupby(datetime_col).size() # In the loop below, we calculate the expected number of purchases for customers # who have made their first purchases on a date before the one being evaluated. # Then we sum them to get the cumulative sum up to the specific period. for i, period in enumerate(date_periods): # index of period and its date if i % time_scaler == 0 and i > 0: # type: ignore # Periods before the one being evaluated times = np.array([d.n for d in period - first_trans_size.index]) times = times[times > 0].astype(float) / time_scaler # create arbitrary dataframe from array of n time periods for predictions pred_data = pandas.DataFrame( { "customer_id": times, "t": times, } ) # Array of different expected number of purchases for different times # TODO: This does not currently support a covariate model expected_trans_array = model.expected_purchases_new_customer( pred_data ).mean(dim=("chain", "draw")) # Mask for the number of customers with 1st transactions up to the period mask = first_trans_size.index < period masked_first_trans = first_trans_size[mask].values # type: ignore # ``expected_trans`` is an xarray with the cumulative sum of expected transactions expected_trans = (expected_trans_array * masked_first_trans).sum() pred_cum_transactions = np.append( pred_cum_transactions, expected_trans.values ) act_trans = repeated_transactions.groupby(datetime_col).size() act_tracking_transactions = act_trans.reindex(date_periods, fill_value=0) act_cum_transactions = [] for j in range(1, t // time_scaler + 1): # type: ignore sum_trans = sum(act_tracking_transactions.iloc[: j * time_scaler]) # type: ignore act_cum_transactions.append(sum_trans) if set_index_date: index = date_periods[time_scaler - 1 : -1 : time_scaler] # type: ignore else: index = range(0, t // time_scaler) # type: ignore df_cum_transactions = pandas.DataFrame( {"actual": act_cum_transactions, "predicted": pred_cum_transactions}, index=index, ) return df_cum_transactions