Skip to content

neighbors.models.NNMF_sgd

The non-negative matrix factorization algorithm tries to decompose a users x items matrix into two additional matrices: users x factors and factors x items.

Training is performed via stochastic-gradient-descent and continues until convergence or the maximum number of iterations has been reached. Unlike NNMF_mult errors during training are used to update latent factors separately for each user/item combination. Additionally this implementation is more flexible as it supports hyperparameters for various kinds of regularization at the cost of increased computation time.

The number of factors, convergence, and maximum iterations can be controlled with the n_factors, tol, and max_iterations arguments to the .fit method. By default the number of factors = the number items.

random_state does not control the sgd fit, only the initialization of the factor matrices

Important Note: model fitting can be highly sensitive to the regularization hyper-parameters passed to .fit. These hyper-parameters control the amount of regularization used when learning user and item factors and biases. By default no regularization is performed. For some combinations of hyper-parameters (e.g. large user_fact_reg and small item_fact_reg) latent vectors can blow up to infinity producing NaNs in model estimates. Model fitting will not fail in these cases so caution should be taken when making use of hyper-parameters.

Source code in neighbors/models.py
class NNMF_sgd(BaseNMF):
    """
    The non-negative matrix factorization algorithm tries to decompose a users x items matrix into two additional matrices: users x factors and factors x items.

    Training is performed via stochastic-gradient-descent and continues until convergence or the maximum number of iterations has been reached. Unlike `NNMF_mult` errors during training are used to update latent factors *separately* for each user/item combination. Additionally this implementation is more flexible as it supports hyperparameters for various kinds of regularization at the cost of increased computation time.

    The number of factors, convergence, and maximum iterations can be controlled with the `n_factors`, `tol`, and `max_iterations` arguments to the `.fit` method. By default the number of factors = the number items.

    `random_state` does not control the sgd fit, only the initialization of the factor matrices

    **Important Note**: model fitting can be highly sensitive to the regularization hyper-parameters passed to `.fit`. These hyper-parameters control the amount of regularization used when learning user and item factors and biases. By default *no regularization* is performed. For some combinations of hyper-parameters (e.g. large `user_fact_reg` and small `item_fact_reg`) latent vectors can blow up to infinity producing `NaNs` in model estimates. Model fitting will not fail in these cases so **caution** should be taken when making use of hyper-parameters.

    """

    def __init__(
        self, data, mask=None, n_mask_items=None, verbose=True, random_state=None
    ):
        """
        Args:
            data (pd.DataFrame): users x items dataframe
            mask (pd.DataFrame, optional): A boolean dataframe used to split the data into 'observed' and 'missing' datasets. Defaults to None.
            n_mask_items (int/float, optional): number of items to mask out, while the rest are treated as observed; Defaults to None.
            data_range (int/float, optional): max - min of the data; Default computed from the input data. This is useful to set manually in case the input data do not span the full range of possible values
            random_state (None, int, RandomState): a seed or random state used for all internal random operations (e.g. randomly mask half the data given n_mask_item = .05). Passing None will generate a new random seed. Default None.
            verbose (bool; optional): print any initialization warnings; Default True

        """
        super().__init__(
            data, mask, n_mask_items, random_state=random_state, verbose=verbose
        )
        self.n_factors = None

    def __repr__(self):
        return f"{super().__repr__()[:-1]}, n_factors={self.n_factors})"

    def fit(
        self,
        n_factors=None,
        item_fact_reg=0.0,
        user_fact_reg=0.0,
        item_bias_reg=0.0,
        user_bias_reg=0.0,
        learning_rate=0.001,
        n_iterations=1000,
        tol=1e-6,
        verbose=False,
        dilate_by_nsamples=None,
        **kwargs,
    ):
        """
        Fit NNMF collaborative filtering model using stochastic-gradient-descent. **Note:** Some combinations of fit parameters may lead to degenerate fits due to use and item vectors converging to infinity. Because no constraints are imposed on the values these parameters can take, please adjust them with caution. If you encounter NaNs in your predictions it's likely because of the specific combination of parameters you chose and you can try refitting with the default settings (i.e. no regularization and learning rate = 0.001). Use `verbose=True` to help determine at what iteration these degenerate fits occur.

        Args:
            n_factors (int, optional): number of factors to learn. Defaults to None which includes all factors.
            item_fact_reg (float, optional): item factor regularization to apply. Defaults to 0.0.
            user_fact_reg (float, optional): user factor regularization to apply. Defaults to 0.0.
            item_bias_reg (float, optional): item factor bias term to apply. Defaults to 0.0.
            user_bias_reg (float, optional): user factor bias term to apply. Defaults to 0.0.
            learning_rate (float, optional): how quickly to integrate errors during training. Defaults to 0.001.
            n_iterations (int, optional): total number of training iterations if convergence is not achieved. Defaults to 5000.
            tol (float, optional): Convergence criteria. Model is considered converged if the change in error during training < tol. Defaults to 0.001.
            verbose (bool, optional): print information about training. Defaults to False.
            dilate_by_nsamples (int, optional): How many items to dilate by prior to training. Defaults to None.
        """

        # Call parent fit which acts as a guard for non-masked data
        super().fit()

        # initialize variables
        n_users, n_items = self.data.shape

        if (
            isinstance(n_factors, int) and (n_factors > n_items and n_factors > n_users)
        ) or isinstance(n_factors, np.floating):
            raise TypeError("n_factors must be an integer < number of items and users")

        if n_factors is None:
            n_factors = min([n_users, n_items])

        self.n_factors = n_factors
        self.item_fact_reg = item_fact_reg
        self.user_fact_reg = user_fact_reg
        self.item_bias_reg = item_bias_reg
        self.user_bias_reg = user_bias_reg
        self.error_history = []

        # Perform dilation if requested
        self.dilate_mask(n_samples=dilate_by_nsamples)

        # Get indices of training data to compute; np.nonzero returns a tuple of row and column indices that when iterated over simultaneosly yield the [row_index, col_index] of each training observation
        if self.is_mask_dilated:
            row_indices, col_indices = self.dilated_mask.values.nonzero()
        else:
            row_indices, col_indices = self.mask.values.nonzero()

        # Convert tuples cause numba complains
        row_indices, col_indices = np.array(row_indices), np.array(col_indices)

        # Initialize global, user, and item biases and latent vectors
        self.global_bias = self.masked_data.mean().mean()
        self.user_bias = np.zeros(n_users)
        self.item_bias = np.zeros(n_items)

        # Initialize random values oriented these as user x factor, factor x item
        self.user_vecs = np.abs(
            self.random_state.normal(scale=1.0 / n_factors, size=(n_users, n_factors))
        )
        self.item_vecs = np.abs(
            self.random_state.normal(scale=1.0 / n_factors, size=(n_factors, n_items))
        )

        X = self.masked_data.to_numpy()

        # Generate seed for shuffling within sgd
        seed = self.random_state.randint(np.iinfo(np.int32).max)

        # Run SGD
        # Silence numba warning until this issue gets fixed: https://github.com/numba/numba/issues/4585
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)
            (
                error_history,
                converged,
                n_iter,
                delta,
                norm_rmse,
                user_bias,
                user_vecs,
                item_bias,
                item_vecs,
            ) = sgd(
                X,
                seed,
                self.global_bias,
                self.data_range,
                tol,
                self.user_bias,
                self.user_vecs,
                self.user_bias_reg,
                self.user_fact_reg,
                self.item_bias,
                self.item_vecs,
                self.item_bias_reg,
                self.item_fact_reg,
                n_iterations,
                row_indices,
                col_indices,
                learning_rate,
                verbose,
            )
        # Save outputs to model
        (
            self.error_history,
            self.user_bias,
            self.user_vecs,
            self.item_bias,
            self.item_vecs,
        ) = (
            error_history,
            user_bias,
            user_vecs,
            item_bias,
            item_vecs,
        )

        self._n_iter = n_iter
        self._delta = delta
        self._norm_rmse = norm_rmse
        self.converged = converged
        if verbose:
            if self.converged:
                print("\n\tCONVERGED!")
                print(f"\n\tFinal Iteration: {self._n_iter}")
                print(f"\tFinal Delta: {np.round(self._delta)}")
            else:
                print("\tFAILED TO CONVERGE (n_iter reached)")
                print(f"\n\tFinal Iteration: {self._n_iter}")
                print(f"\tFinal delta exceeds tol: {tol} <= {self._delta}")

            print(f"\tFinal Norm Error: {np.round(100*norm_rmse, 2)}%")

        self._predict()
        self.is_fit = True

    def _predict(self):

        """Predict User's missing items using NNMF with stochastic gradient descent"""

        # user x factor * factor item + biases
        predictions = self.user_vecs @ self.item_vecs
        predictions = (
            (predictions.T + self.user_bias).T + self.item_bias + self.global_bias
        )
        self.predictions = pd.DataFrame(
            predictions, index=self.data.index, columns=self.data.columns
        )

__init__(self, data, mask=None, n_mask_items=None, verbose=True, random_state=None) special

Parameters:

Name Type Description Default
data pd.DataFrame

users x items dataframe

required
mask pd.DataFrame

A boolean dataframe used to split the data into 'observed' and 'missing' datasets. Defaults to None.

None
n_mask_items int/float

number of items to mask out, while the rest are treated as observed; Defaults to None.

None
data_range int/float

max - min of the data; Default computed from the input data. This is useful to set manually in case the input data do not span the full range of possible values

required
random_state None, int, RandomState

a seed or random state used for all internal random operations (e.g. randomly mask half the data given n_mask_item = .05). Passing None will generate a new random seed. Default None.

None
verbose bool; optional

print any initialization warnings; Default True

True
Source code in neighbors/models.py
def __init__(
    self, data, mask=None, n_mask_items=None, verbose=True, random_state=None
):
    """
    Args:
        data (pd.DataFrame): users x items dataframe
        mask (pd.DataFrame, optional): A boolean dataframe used to split the data into 'observed' and 'missing' datasets. Defaults to None.
        n_mask_items (int/float, optional): number of items to mask out, while the rest are treated as observed; Defaults to None.
        data_range (int/float, optional): max - min of the data; Default computed from the input data. This is useful to set manually in case the input data do not span the full range of possible values
        random_state (None, int, RandomState): a seed or random state used for all internal random operations (e.g. randomly mask half the data given n_mask_item = .05). Passing None will generate a new random seed. Default None.
        verbose (bool; optional): print any initialization warnings; Default True

    """
    super().__init__(
        data, mask, n_mask_items, random_state=random_state, verbose=verbose
    )
    self.n_factors = None

create_masked_data(self, n_mask_items=0.2) inherited

Create a mask and apply it to data using number of items or % of items

Parameters:

Name Type Description Default
n_items int/float

if an integer is passed its raw value is used. Otherwise if a float is passed its taken to be a (rounded) percentage of the total items; Default 0.1 (10% of the data)

required
Source code in neighbors/models.py
def create_masked_data(self, n_mask_items=0.2):
    """
    Create a mask and apply it to data using number of items or % of items

    Args:
        n_items (int/float, optional): if an integer is passed its raw value is used. Otherwise if a float is passed its taken to be a (rounded) percentage of the total items; Default 0.1 (10% of the data)
    """

    if (
        isinstance(n_mask_items, np.floating)
        and (n_mask_items >= 1.0 or n_mask_items <= 0.0)
    ) or (
        isinstance(n_mask_items, int)
        and (n_mask_items >= self.data.shape[1] or n_mask_items <= 0)
    ):
        raise TypeError(
            "n_items should a float between 0-1 or an integer < the number of items"
        )
    self.mask = create_sparse_mask(
        self.data, n_mask_items, random_state=self.random_state
    )
    self.masked_data = self.data[self.mask]
    self.is_masked = True
    self.n_mask_items = n_mask_items

dilate_mask(self, n_samples=None) inherited

Dilate sparse time-series data by n_samples. Overlapping data will be averaged. This method computes and stores the dilated mask in .dilated_mask and internally updates the .masked_data. Repeated calls to this method on the same model instance do not stack, but rather perform a new dilation on the original masked data. Called this method with None will undo any dilation.

Parameters:

Name Type Description Default
nsamples int

Number of samples to dilate data

required
Source code in neighbors/models.py
def dilate_mask(self, n_samples=None):

    """Dilate sparse time-series data by n_samples.
    Overlapping data will be averaged. This method computes and stores the dilated mask in `.dilated_mask` and internally updates the `.masked_data`. Repeated calls to this method on the same model instance **do not** stack, but rather perform a new dilation on the original masked data. Called this method with `None` will undo any dilation.

    Args:
        nsamples (int):  Number of samples to dilate data

    """

    if self.mask is None:
        raise ValueError("Model has no mask and requires one to perform dilation")
    if not self.is_masked and n_samples is not None:
        raise ValueError("Make sure model instance has been masked.")

    if isinstance(n_samples, np.floating) or (
        n_samples is not None and n_samples >= self.data.shape[1]
    ):
        raise TypeError("nsamples should be an integer < the number of items")

    # Always reset to the undilated mask first
    self.masked_data = self.data[self.mask]

    if n_samples is not None:
        # After masking, perform dilation and save as the new masked data
        self.masked_data = self.masked_data.apply(
            lambda x: self._conv_ts_mean_overlap(x, n_samples=n_samples),
            axis=1,
            result_type="broadcast",
        )
        # Calculate and save dilated mask
        self.dilated_mask = ~self.masked_data.isnull()
        self.is_mask_dilated = True
        self.dilated_by_nsamples = n_samples
    else:
        self.dilated_mask = None
        self.is_mask_dilated = False
        self.dilated_by_nsamples = None

downsample(self, n_samples, sampling_freq=None, target_type='samples') inherited

Downsample a model's rating matrix to a new target frequency or number of samples using averaging. Also downsamples a model's mask and dilated mask if they exist as well as a model's predictions if it's already been fit.

If target_type = 'samples' and sampling_freq is None, the new user x item matrix will have shape users x items * (1 / n_samples).

If target_type = 'seconds', the new user x item matrix will have shape users x items * (1 / n_samples * sampling_freq).

If target_type = 'hz', the new user x item matrix will have shape users x items * (1 / sampling_freq / n_samples).

Parameters:

Name Type Description Default
n_samples int

number of samples

required
sampling_freq int/float

Sampling frequency of data; Default None

None
target_type str

how to downsample; must be one of "samples", "seconds" or "hz". Defaults to "samples".

'samples'
Source code in neighbors/models.py
def downsample(self, n_samples, sampling_freq=None, target_type="samples"):

    """
    Downsample a model's rating matrix to a new target frequency or number of samples using averaging. Also downsamples a model's mask and dilated mask if they exist as well as a model's predictions if it's already been fit.

    If target_type = 'samples' and sampling_freq is None, the new user x item matrix will have shape users x items * (1 / n_samples).

    If target_type = 'seconds', the new user x item matrix will have shape users x items * (1 / n_samples * sampling_freq).

    If target_type = 'hz', the new user x item matrix will have shape users x items * (1 / sampling_freq / n_samples).

    Args:
        n_samples (int): number of samples
        sampling_freq (int/float):  Sampling frequency of data; Default None
        target_type (str, optional): how to downsample; must be one of "samples", "seconds" or "hz". Defaults to "samples".

    """

    self.data = downsample_dataframe(
        self.data,
        sampling_freq=sampling_freq,
        n_samples=n_samples,
        target_type=target_type,
    )

    if self.is_masked:
        # Also downsample mask
        self.mask = downsample_dataframe(
            self.mask,
            sampling_freq=sampling_freq,
            n_samples=n_samples,
            target_type=target_type,
        )
        # Ensure mask stays boolean
        self.mask.loc[:, :] = self.mask > 0

        # Masked data
        self.masked_data = downsample_dataframe(
            self.masked_data,
            sampling_freq=sampling_freq,
            n_samples=n_samples,
            target_type=target_type,
        )
        # Dilated mask
        if self.is_mask_dilated:
            self.dilated_mask = downsample_dataframe(
                self.dilated_mask,
                sampling_freq=sampling_freq,
                n_samples=n_samples,
                target_type=target_type,
            )
            # Ensure mask stays boolean
            self.dilated_mask.loc[:, :] = self.dilated_mask > 0

    if self.is_fit:
        self.predictions = downsample_dataframe(
            self.predictions,
            sampling_freq=sampling_freq,
            n_samples=n_samples,
            target_type=target_type,
        )

fit(self, n_factors=None, item_fact_reg=0.0, user_fact_reg=0.0, item_bias_reg=0.0, user_bias_reg=0.0, learning_rate=0.001, n_iterations=1000, tol=1e-06, verbose=False, dilate_by_nsamples=None, **kwargs)

Fit NNMF collaborative filtering model using stochastic-gradient-descent. Note: Some combinations of fit parameters may lead to degenerate fits due to use and item vectors converging to infinity. Because no constraints are imposed on the values these parameters can take, please adjust them with caution. If you encounter NaNs in your predictions it's likely because of the specific combination of parameters you chose and you can try refitting with the default settings (i.e. no regularization and learning rate = 0.001). Use verbose=True to help determine at what iteration these degenerate fits occur.

Parameters:

Name Type Description Default
n_factors int

number of factors to learn. Defaults to None which includes all factors.

None
item_fact_reg float

item factor regularization to apply. Defaults to 0.0.

0.0
user_fact_reg float

user factor regularization to apply. Defaults to 0.0.

0.0
item_bias_reg float

item factor bias term to apply. Defaults to 0.0.

0.0
user_bias_reg float

user factor bias term to apply. Defaults to 0.0.

0.0
learning_rate float

how quickly to integrate errors during training. Defaults to 0.001.

0.001
n_iterations int

total number of training iterations if convergence is not achieved. Defaults to 5000.

1000
tol float

Convergence criteria. Model is considered converged if the change in error during training < tol. Defaults to 0.001.

1e-06
verbose bool

print information about training. Defaults to False.

False
dilate_by_nsamples int

How many items to dilate by prior to training. Defaults to None.

None
Source code in neighbors/models.py
def fit(
    self,
    n_factors=None,
    item_fact_reg=0.0,
    user_fact_reg=0.0,
    item_bias_reg=0.0,
    user_bias_reg=0.0,
    learning_rate=0.001,
    n_iterations=1000,
    tol=1e-6,
    verbose=False,
    dilate_by_nsamples=None,
    **kwargs,
):
    """
    Fit NNMF collaborative filtering model using stochastic-gradient-descent. **Note:** Some combinations of fit parameters may lead to degenerate fits due to use and item vectors converging to infinity. Because no constraints are imposed on the values these parameters can take, please adjust them with caution. If you encounter NaNs in your predictions it's likely because of the specific combination of parameters you chose and you can try refitting with the default settings (i.e. no regularization and learning rate = 0.001). Use `verbose=True` to help determine at what iteration these degenerate fits occur.

    Args:
        n_factors (int, optional): number of factors to learn. Defaults to None which includes all factors.
        item_fact_reg (float, optional): item factor regularization to apply. Defaults to 0.0.
        user_fact_reg (float, optional): user factor regularization to apply. Defaults to 0.0.
        item_bias_reg (float, optional): item factor bias term to apply. Defaults to 0.0.
        user_bias_reg (float, optional): user factor bias term to apply. Defaults to 0.0.
        learning_rate (float, optional): how quickly to integrate errors during training. Defaults to 0.001.
        n_iterations (int, optional): total number of training iterations if convergence is not achieved. Defaults to 5000.
        tol (float, optional): Convergence criteria. Model is considered converged if the change in error during training < tol. Defaults to 0.001.
        verbose (bool, optional): print information about training. Defaults to False.
        dilate_by_nsamples (int, optional): How many items to dilate by prior to training. Defaults to None.
    """

    # Call parent fit which acts as a guard for non-masked data
    super().fit()

    # initialize variables
    n_users, n_items = self.data.shape

    if (
        isinstance(n_factors, int) and (n_factors > n_items and n_factors > n_users)
    ) or isinstance(n_factors, np.floating):
        raise TypeError("n_factors must be an integer < number of items and users")

    if n_factors is None:
        n_factors = min([n_users, n_items])

    self.n_factors = n_factors
    self.item_fact_reg = item_fact_reg
    self.user_fact_reg = user_fact_reg
    self.item_bias_reg = item_bias_reg
    self.user_bias_reg = user_bias_reg
    self.error_history = []

    # Perform dilation if requested
    self.dilate_mask(n_samples=dilate_by_nsamples)

    # Get indices of training data to compute; np.nonzero returns a tuple of row and column indices that when iterated over simultaneosly yield the [row_index, col_index] of each training observation
    if self.is_mask_dilated:
        row_indices, col_indices = self.dilated_mask.values.nonzero()
    else:
        row_indices, col_indices = self.mask.values.nonzero()

    # Convert tuples cause numba complains
    row_indices, col_indices = np.array(row_indices), np.array(col_indices)

    # Initialize global, user, and item biases and latent vectors
    self.global_bias = self.masked_data.mean().mean()
    self.user_bias = np.zeros(n_users)
    self.item_bias = np.zeros(n_items)

    # Initialize random values oriented these as user x factor, factor x item
    self.user_vecs = np.abs(
        self.random_state.normal(scale=1.0 / n_factors, size=(n_users, n_factors))
    )
    self.item_vecs = np.abs(
        self.random_state.normal(scale=1.0 / n_factors, size=(n_factors, n_items))
    )

    X = self.masked_data.to_numpy()

    # Generate seed for shuffling within sgd
    seed = self.random_state.randint(np.iinfo(np.int32).max)

    # Run SGD
    # Silence numba warning until this issue gets fixed: https://github.com/numba/numba/issues/4585
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=NumbaPerformanceWarning)
        (
            error_history,
            converged,
            n_iter,
            delta,
            norm_rmse,
            user_bias,
            user_vecs,
            item_bias,
            item_vecs,
        ) = sgd(
            X,
            seed,
            self.global_bias,
            self.data_range,
            tol,
            self.user_bias,
            self.user_vecs,
            self.user_bias_reg,
            self.user_fact_reg,
            self.item_bias,
            self.item_vecs,
            self.item_bias_reg,
            self.item_fact_reg,
            n_iterations,
            row_indices,
            col_indices,
            learning_rate,
            verbose,
        )
    # Save outputs to model
    (
        self.error_history,
        self.user_bias,
        self.user_vecs,
        self.item_bias,
        self.item_vecs,
    ) = (
        error_history,
        user_bias,
        user_vecs,
        item_bias,
        item_vecs,
    )

    self._n_iter = n_iter
    self._delta = delta
    self._norm_rmse = norm_rmse
    self.converged = converged
    if verbose:
        if self.converged:
            print("\n\tCONVERGED!")
            print(f"\n\tFinal Iteration: {self._n_iter}")
            print(f"\tFinal Delta: {np.round(self._delta)}")
        else:
            print("\tFAILED TO CONVERGE (n_iter reached)")
            print(f"\n\tFinal Iteration: {self._n_iter}")
            print(f"\tFinal delta exceeds tol: {tol} <= {self._delta}")

        print(f"\tFinal Norm Error: {np.round(100*norm_rmse, 2)}%")

    self._predict()
    self.is_fit = True

plot_factors(self, save=False, **kwargs) inherited

Plot user x factor and item x factor matrices

Parameters:

Name Type Description Default
save bool/str/Path

if a string or path is provided will save the figure to that location. Defaults to False.

False
kwargs

additional arguments to seaborn.heatmap

{}

Returns:

Type Description
tuple

(figure handle, axes handle)

Source code in neighbors/models.py
def plot_factors(self, save=False, **kwargs):
    """
    Plot user x factor and item x factor matrices

    Args:
        save (bool/str/Path, optional): if a string or path is provided will save the figure to that location. Defaults to False.
        kwargs: additional arguments to seaborn.heatmap

    Returns:
        tuple: (figure handle, axes handle)
    """

    if self.is_fit:
        f, axs = plt.subplots(1, 2, figsize=(12, 6))
        if hasattr(self, "W"):
            _ = sns.heatmap(self.W, ax=axs[0], **kwargs)
            _ = sns.heatmap(self.H, ax=axs[1], **kwargs)
        else:
            _ = sns.heatmap(self.user_vecs, ax=axs[0], **kwargs)
            _ = sns.heatmap(self.item_vecs, ax=axs[1], **kwargs)
        axs[0].set_xlabel("Factor", fontsize=18)
        axs[0].set_ylabel("User", fontsize=18)
        axs[0].set_title("User Factors", fontsize=18)
        axs[1].set_xlabel("Item", fontsize=18)
        axs[1].set_ylabel("Factor", fontsize=18)
        axs[1].set_title("Item Factors", fontsize=18)
        if save:
            plt.savefig(save, bbox_inches="tight")
        return f, axs
    else:
        raise ValueError("Model has not been fit.")

plot_learning(self, save=False) inherited

Plot training error over iterations for diagnostic purposes

Parameters:

Name Type Description Default
save bool/str/Path

if a string or path is provided will save the figure to that location. Defaults to False.

False

Returns:

Type Description
tuple

(figure handle, axes handle)

Source code in neighbors/models.py
def plot_learning(self, save=False):
    """
    Plot training error over iterations for diagnostic purposes

    Args:
        save (bool/str/Path, optional): if a string or path is provided will save the figure to that location. Defaults to False.

    Returns:
        tuple: (figure handle, axes handle)
    """

    if self.is_fit:
        f, ax = plt.subplots(1, 1, figsize=(8, 6))
        _ = ax.plot(range(1, len(self.error_history) + 1), self.error_history)
        ax.set(
            xlabel="Iteration",
            ylabel="Normalized RMSE",
            title=f"Final Normalized RMSE: {np.round(self._norm_rmse, 3)}\nConverged: {self.converged}",
        )
        sns.despine()
        if save:
            plt.savefig(save, bbox_inches="tight")
        return f, ax
    else:
        raise ValueError("Model has not been fit.")

plot_predictions(self, dataset='missing', figsize=(16, 8), label_fontsize=16, hide_title=False, heatmap_kwargs={}) inherited

Create plot of actual vs predicted values.

Parameters:

Name Type Description Default
dataset str; optional

one of 'full', 'observed', or 'missing'. Default 'missing'.

'missing'
figsize tuple; optional

matplotlib figure size; Default (16,8)

(16, 8)
label_fontsize int; optional

fontsize for all axis labels and titles; Default 16

16
hide_title bool; optional

hide title containing RMSE and correlation performance if available; Default False

False
heatmap_kwargs dict

addition arguments to seaborn.heatmap.

{}

Returns:

Type Description
tuple

(figure handle, axis handle)

Source code in neighbors/models.py
def plot_predictions(
    self,
    dataset="missing",
    figsize=(16, 8),
    label_fontsize=16,
    hide_title=False,
    heatmap_kwargs={},
):
    """Create plot of actual vs predicted values.

    Args:
        dataset (str; optional): one of 'full', 'observed', or 'missing'. Default 'missing'.
        figsize (tuple; optional): matplotlib figure size; Default (16,8)
        label_fontsize (int; optional): fontsize for all axis labels and titles; Default 16
        hide_title (bool; optional): hide title containing RMSE and correlation performance if available; Default False
        heatmap_kwargs (dict, optional): addition arguments to seaborn.heatmap.

    Returns:
        tuple: (figure handle, axis handle)

    """

    if not self.is_fit:
        raise ValueError("Model has not been fit")

    vmax = max(self.data.max().max(), self.data.max().max())
    vmin = min(self.data.min().min(), self.data.min().min())

    actual, pred = self._retrieve_predictions(dataset)

    if actual is None:
        ncols = 2
        warnings.warn(
            "Cannot score predictions on missing data because true values were never observed!"
        )
    else:
        ncols = 3

    heatmap_kwargs.setdefault("square", False)
    heatmap_kwargs.setdefault("xticklabels", False)
    heatmap_kwargs.setdefault("yticklabels", False)
    heatmap_kwargs.setdefault("vmax", vmax)
    heatmap_kwargs.setdefault("vmin", vmin)

    f, ax = plt.subplots(nrows=1, ncols=ncols, figsize=figsize)

    # The original data matrix (potentially masked)
    sns.heatmap(self.masked_data, ax=ax[0], **heatmap_kwargs)
    ax[0].set_title("Actual User/Item Ratings", fontsize=label_fontsize)
    ax[0].set_xlabel("Items", fontsize=label_fontsize)
    ax[0].set_ylabel("Users", fontsize=label_fontsize)

    # The predicted data matrix
    sns.heatmap(self.predictions, ax=ax[1], **heatmap_kwargs)
    ax[1].set_title("Predicted User/Item Ratings", fontsize=label_fontsize)
    ax[1].set_xlabel("Items", fontsize=label_fontsize)
    ax[1].set_ylabel("Users", fontsize=label_fontsize)
    f.tight_layout()

    # Scatter plot if we can calculate it
    if actual is not None:
        nans = np.logical_or(np.isnan(actual), np.isnan(pred))
        ax[2].scatter(
            actual[~nans],
            pred[~nans],
        )
        ax[2].set_xlabel("Actual", fontsize=label_fontsize)
        ax[2].set_ylabel("Predicted", fontsize=label_fontsize)
        ax[2].set_title("Ratings", fontsize=label_fontsize)
        sns.despine()

        r = self.score(dataset=dataset, by_user=True, metric="correlation")
        rmse = self.score(dataset=dataset, by_user=True, metric="rmse")
        if not hide_title:
            plt.suptitle(
                f"Mean RMSE: {np.round(rmse.mean(),3)} +/- {np.round(rmse.std(), 3)}\nMean Correlation: {np.round(r.mean(), 3)} +/- {np.round(r.std(), 3)}",
                y=1.07,
                fontsize=label_fontsize + 2,
            )
        plt.subplots_adjust(wspace=0.2)

    return f, ax

score(self, metric='rmse', dataset='missing', by_user=True, actual=None) inherited

Get the performance of a fitted model by comparing observed and predicted data. This method is primarily useful if you want to calculate a single metric. Otherwise you should prefer the .summary() method instead, which scores all metrics.

Parameters:

Name Type Description Default
metric str; optional

what metric to compute, one of 'rmse', 'mse', 'mae' or 'correlation'; Default 'rmse'.

'rmse'
dataset str; optional

how to compute scoring, either using 'observed', 'missing' or 'full'. Default 'missing'.

'missing'
by_user bool; optional

whether to return a single score over all data points or a pandas Series of scores per user. Default True.

True
actual pd.DataFrame, None; optional

a dataframe to score against; Default is None which uses the data provided when the model was initialized

None

Returns:

Type Description
float/pd.Series

score

Source code in neighbors/models.py
def score(
    self,
    metric="rmse",
    dataset="missing",
    by_user=True,
    actual=None,
):
    """Get the performance of a fitted model by comparing observed and predicted data. This method is primarily useful if you want to calculate a single metric. Otherwise you should prefer the `.summary()` method instead, which scores all metrics.

    Args:
        metric (str; optional): what metric to compute, one of 'rmse', 'mse', 'mae' or 'correlation'; Default 'rmse'.
        dataset (str; optional): how to compute scoring, either using 'observed', 'missing' or 'full'. Default 'missing'.
        by_user (bool; optional): whether to return a single score over all data points or a pandas Series of scores per user. Default True.
        actual (pd.DataFrame, None; optional): a dataframe to score against; Default is None which uses the data provided when the model was initialized

    Returns:
        float/pd.Series: score

    """

    if not self.is_fit:
        raise ValueError("You must fit() model first before using this method.")

    if metric not in ["rmse", "mse", "mae", "correlation"]:
        raise ValueError(
            "metric must be one of 'rmse', 'mse', 'mae', or 'correlation'"
        )
    # Get dataframes of observed and predicted values
    # This will be a dense or sparse matrix the same shape as the input data
    model_actual, pred = self._retrieve_predictions(dataset)

    if actual is None:
        actual = model_actual
    else:
        if actual.shape != self.data.shape:
            raise ValueError(
                "actual values dataframe supplied but shape does not match original data"
            )

    if actual is None:
        warnings.warn(
            "Cannot score predictions on missing data because true values were never observed!"
        )
        return None

    with warnings.catch_warnings():
        # Catch 'Mean of empty slice' warnings from np.nanmean
        warnings.simplefilter("ignore", category=RuntimeWarning)
        if by_user:
            scores = []
            for userid in range(actual.shape[0]):
                user_actual = actual.iloc[userid, :].values
                user_pred = pred.iloc[userid, :].values
                if metric == "rmse":
                    score = np.sqrt(np.nanmean((user_pred - user_actual) ** 2))
                elif metric == "mse":
                    score = np.nanmean((user_pred - user_actual) ** 2)
                elif metric == "mae":
                    score = np.nanmean(np.abs(user_pred - user_actual))
                elif metric == "correlation":
                    nans = np.logical_or(np.isnan(user_actual), np.isnan(user_pred))
                    if len(user_actual[~nans]) < 2 or len(user_pred[~nans]) < 2:
                        score = np.nan
                    else:
                        score = pearsonr(user_actual[~nans], user_pred[~nans])[0]
                scores.append(score)
            return pd.Series(scores, index=actual.index, name=f"{metric}_{dataset}")
        else:
            actual, pred = actual.to_numpy().flatten(), pred.to_numpy().flatten()

            if metric == "rmse":
                return np.sqrt(np.nanmean((pred - actual) ** 2))
            elif metric == "mse":
                return np.nanmean((pred - actual) ** 2)
            elif metric == "mae":
                return np.nanmean(np.abs(pred - actual))
            elif metric == "correlation":
                nans = np.logical_or(np.isnan(actual), np.isnan(pred))
                if len(actual[~nans]) < 2 or len(pred[~nans]) < 2:
                    return np.nan
                else:
                    return pearsonr(actual[~nans], pred[~nans])[0]

summary(self, verbose=False, actual=None, dataset=None) inherited

Calculate the performance of a model and return a dataframe of results. Computes performance across all, observed, and missing datasets. Scores using rmse, mse, mae, and correlation. Computes scores across all subjects (i.e. ignoring the fact that ratings are clustered by subject) and the mean performance for each metric after calculating per-subject performance.

Parameters:

Name Type Description Default
verbose bool

Print warning messages during scoring. Defaults to False.

False
actual pd.DataFrame, None; optional

a dataframe to score against; Default is None which uses the data provided when the model was initialized

None
dataset str/None

dataset to score. Must be one of 'full', 'observed','missing' or None to score both 'observed' and 'missing'; Default None

None

Returns:

Type Description
pd.DataFrame

long-form dataframe of model performance

Source code in neighbors/models.py
def summary(self, verbose=False, actual=None, dataset=None):
    """
    Calculate the performance of a model and return a dataframe of results. Computes performance across all, observed, and missing datasets. Scores using rmse, mse, mae, and correlation. Computes scores across all subjects (i.e. ignoring the fact that ratings are clustered by subject) and the mean performance for each metric after calculating per-subject performance.

    Args:
        verbose (bool, optional): Print warning messages during scoring. Defaults to False.
        actual (pd.DataFrame, None; optional): a dataframe to score against; Default is None which uses the data provided when the model was initialized
        dataset (str/None): dataset to score. Must be one of 'full', 'observed','missing' or None to score both 'observed' and 'missing'; Default None

    Returns:
        pd.DataFrame: long-form dataframe of model performance
    """

    if not self.is_fit:
        raise ValueError("Model has not been fit!")

    if dataset is None:
        if actual is None:
            if self.is_dense:
                dataset = ["missing", "observed"]
            else:
                dataset = ["observed"]
        else:
            dataset = ["missing", "observed"]

    elif isinstance(dataset, str):
        if actual is None and not self.is_dense and dataset in ["full", "missing"]:
            raise ValueError(
                "Cannot score predictions on missing values because no ground truth was observed"
            )
        dataset = [dataset]

    # Compute results for all metrics, all datasets, separately for group and by subject
    group_results = {
        "algorithm": self.__class__.__name__,
    }
    subject_results = []

    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")
        for metric in ["rmse", "mse", "mae", "correlation"]:
            this_group_result = {}
            this_subject_result = []
            for dat in dataset:
                this_group_result[dat] = self.score(
                    metric=metric, dataset=dat, actual=actual, by_user=False
                )
                this_subject_result.append(
                    self.score(
                        metric=metric,
                        dataset=dat,
                        by_user=True,
                        actual=actual,
                    )
                )
            # Dict of group results for this metric
            group_results[metric] = this_group_result
            # Dataframe of subject results for this metric
            this_subject_result = pd.concat(this_subject_result, axis=1)
            subject_results.append(this_subject_result)
            group_results[f"{metric}_user"] = dict(
                zip(
                    dataset,
                    this_subject_result.mean().values,
                )
            )
    # Save final results to longform df
    self.user_results = pd.concat(subject_results, axis=1)
    group_results = pd.DataFrame(group_results)
    group_results = (
        group_results.reset_index()
        .melt(
            id_vars=["index", "algorithm"],
            var_name="metric",
            value_name="score",
        )
        .rename(columns={"index": "dataset"})
        .sort_values(by=["dataset", "metric"])
        .reset_index(drop=True)
        .assign(
            group=lambda df: df.metric.apply(
                lambda x: "user" if "user" in x else "all"
            ),
            metric=lambda df: df.metric.replace(
                {
                    "correlation_user": "correlation",
                    "mse_user": "mse",
                    "rmse_user": "rmse",
                    "mae_user": "mae",
                }
            ),
        )
        .sort_values(by=["dataset", "group", "metric"])
        .reset_index(drop=True)[
            ["algorithm", "dataset", "group", "metric", "score"]
        ]
    )
    self.overall_results = group_results
    if verbose:
        if w:
            print(w[-1].message)
        print(
            "User performance results (not returned) are accessible using .user_results"
        )
        print(
            "Overall performance results (returned) are accesible using .overall_results"
        )

    return group_results

to_long_df(self) inherited

Create a long format pandas dataframe with observed, predicted, and mask.

Source code in neighbors/models.py
def to_long_df(self):

    """Create a long format pandas dataframe with observed, predicted, and mask."""

    observed = pd.DataFrame(columns=["User", "Item", "Rating", "Condition"])
    for row in self.data.iterrows():
        tmp = pd.DataFrame(columns=observed.columns)
        tmp["Rating"] = row[1]
        tmp["Item"] = self.data.columns
        tmp["User"] = row[0]
        tmp["Condition"] = "Observed"
        if self.is_masked:
            if self.is_mask_dilated:
                tmp["Mask"] = self.dilated_mask.loc[row[0]]
            else:
                tmp["Mask"] = self.mask.loc[row[0]]
        observed = observed.append(tmp)

    if self.is_fit:
        predicted = pd.DataFrame(columns=["User", "Item", "Rating", "Condition"])
        for row in self.predictions.iterrows():
            tmp = pd.DataFrame(columns=predicted.columns)
            tmp["Rating"] = row[1]
            tmp["Item"] = self.predictions.columns
            tmp["User"] = row[0]
            tmp["Condition"] = "Predicted"
            if self.is_masked:
                tmp["Mask"] = self.mask.loc[row[0]]
            predicted = predicted.append(tmp)
        observed = observed.append(predicted)
    return observed

transform(self, return_only_predictions=False) inherited

Return a user x item matrix of predictions after a model has been fit

Parameters:

Name Type Description Default
return_only_predictions bool

Returns both training and testing predictions rather than simply filling in missing values with predictions. Defaults to False.

False

Returns:

Type Description
pd.DataFrame

user x item ratings

Source code in neighbors/models.py
def transform(self, return_only_predictions=False):
    """
    Return a user x item matrix of predictions after a model has been fit

    Args:
        return_only_predictions (bool, optional): Returns both training and testing predictions rather than simply filling in missing values with predictions. Defaults to False.

    Returns:
        pd.DataFrame: user x item ratings
    """

    if not self.is_fit:
        raise ValueError("Model has not been fit!")
    if return_only_predictions:
        return self.predictions
    else:
        # Propagate observed values to return object
        out = self.data[self.mask]
        # Fill in missing values with predictions
        out[~self.mask] = self.predictions[~self.mask]
        return out