Source code for softsensor.metrics

# -*- coding: utf-8 -*-
# %%
from collections import defaultdict
import numpy as np
import pandas as pd
import scipy.stats as st
import torch
import torch.nn as nn
from sklearn.metrics import r2_score
from scipy.stats import rv_histogram, rv_continuous
from scipy.interpolate import CubicSpline
from pylife.stress.timesignal import psd_df

"""
Methods to compute metrics with the same argument structure as nn.GaussianNLLLoss()

For a description of the provided uncertainty metrics refer to
"Uncertainty Quantification for Traffic Forecasting: A Unified Approach"
[Qian et al. 2022 https://arxiv.org/abs/2208.05875]
"""


[docs] def rmse(mu, targets, var): """Root Mean Square Error (RMSE) Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- rmse: torch.Tensor """ return nn.MSELoss()(mu, targets).sqrt()
[docs] def mae(mu, targets, var): """Mean Absolute Error (MAE) Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- mae: torch.Tensor """ return nn.L1Loss()(mu, targets)
[docs] def r2(mu, targets, var): """R2 score Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- r2: float """ y_true = targets.squeeze(0).numpy() y_pred = mu.squeeze(0).numpy() return r2_score(y_true, y_pred)
[docs] def pearson(mu, targets, var): """Pearson correlation coefficient Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- pearson: float """ y_true = targets.numpy() y_pred = mu.numpy() return np.corrcoef(y_true, y_pred)[0, 1]
[docs] def nll(mu, targets, var): """Gaussian Negative Log Likelihood Loss (NLL) Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- nll: torch.Tensor """ return nn.GaussianNLLLoss()(mu, targets, var)
[docs] def nll_statistic(mu, targets, var): """Gaussian Negative Log Likelihood (rather than the score of the optimization objective) We mainly report the NLL loss as its minimization is equivalent to NLL minimization but add this metric for comparability Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- nll: torch.Tensor """ gaussians = torch.distributions.Normal(mu, var.sqrt()) return -gaussians.log_prob(targets).mean()
[docs] def rmv(mu, targets, var): """Root Mean Variance (RMV) measures the sharpness of the uncertainty distributions Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- sharpness: torch.Tensor """ return var.mean().sqrt()
[docs] def heteroscedasticity(mu, targets, var): """Heteroscedasticity of uncertainty estimate as std of std Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- heteroscedasticity: torch.Tensor """ return torch.round(var.sqrt().std(dim=-1).mean(), decimals=6)
[docs] def crps(mu, targets, var) -> float: """The negatively oriented continuous ranked probability score for Gaussians. Computes CRPS for held out data (y_true) given predictive uncertainty with mean (y_pred) and standard-deviation (y_std). Each test point is given equal weight in the overall score over the test set. Negatively oriented means a smaller value is more desirable. Adapted from https://github.com/uncertainty-toolbox/uncertainty-toolbox Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances Returns ------- crps: float """ mu = mu.numpy() targets = targets.numpy() std = var.sqrt().numpy() y_standardized = (targets - mu) / std term_1 = 1 / np.sqrt(np.pi) term_2 = 2 * st.norm.pdf(y_standardized, loc=0, scale=1) term_3 = y_standardized * (2 * st.norm.cdf(y_standardized, loc=0, scale=1) - 1) crps_list = -1 * std * (term_1 - term_2 - term_3) crps = np.mean(crps_list) return crps
[docs] def picp(mu, targets, var, z=1.96): """Prediction Interval Coverage Probability (PICP) measures the coverage of a specific PI Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances z: float, optional Z-score for the specific quantile, the default is 1.96 (95% interval) Returns ------- pi_coverage: torch.Tensor """ std = torch.sqrt(var) lower_bound = mu - z * std upper_bound = mu + z * std pi_coverage = ( torch.logical_and(lower_bound <= targets, targets <= upper_bound) .int() .mean(dtype=float) ) return pi_coverage
[docs] def mpiw(mu, targets, var, z=1.96): """Mean Prediction Interval Width (MPIW) measures the width of a specific PI Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances z: float, optional Z-score for the specific quantile, the default is 1.96 (95% interval) Returns ------- pi_width: torch.Tensor """ std = torch.sqrt(var) lower_bound = mu - z * std upper_bound = mu + z * std pi_width = (upper_bound - lower_bound).mean() return pi_width
[docs] def ece(mu, targets, var, quantiles=np.arange(0.05, 1.0, 0.05)): """ Expected Calibration Error (ECE) measures the mean absolute calibration error of multiple PICPs See "Accurate Uncertainties for Deep Learning Using Calibrated Regression" [Kuleshov et al. 2018 https://arxiv.org/abs/1807.00263] See https://stackoverflow.com/questions/20864847/probability-to-z-score-and-vice-versa Parameters ---------- mu: torch.Tensor Predicted means targets: torch.Tensor Target values var: torch.Tensor Predicted variances quantiles: list[x], x in (0,1) Quantiles to evaluate Returns ------- pi_width: torch.Tensor """ z_scores = [st.norm.ppf(1 - (1 - p) / 2) for p in quantiles] picp_scores = torch.tensor([picp(mu, targets, var, z) for z in z_scores]) return nn.L1Loss()(picp_scores, torch.tensor(quantiles))
[docs] def log_area_error(psd_original, psd_targets, f): if f[0] == 0: f = f[1:] psd_original = psd_original[1:] psd_targets = psd_targets[1:] log_original = np.log(psd_original) log_targets = np.log(psd_targets) diff = np.abs(log_original - log_targets) log_f = np.log(f) area = np.trapezoid(diff, log_f) return torch.mean(torch.tensor(area))
''' def compute_metrics(model, test_loader, metric_names=None): """Compute the mean metric scores of a model on the test set Note: This function reduces scores across test tracks and variables. Use eval_tools.comp_metrics if you want to compute scores for individual output sensors. Parameters ---------- model: Uncertainty model Model to evaluate test_loader: list[Dataloader] Test dataset metric_names: list[string], optional Must be a subset of uncertainty metrics: ["RMSE", "MAE", "R2", "Corr", "NLL", "RMV", "CRPS", "Het", "PICP", "MPIW", "ECE"] Returns ------- mean_scores: dict[str, float] """ uncertainty_metrics = { "RMSE": rmse, "MAE": mae, "R2": r2, "Corr": pearson, "NLL": nll, "RMV": rmv, "CRPS": crps, "Het": heteroscedasticity, "PICP": picp, "MPIW": mpiw, "ECE": ece, } if metric_names: uncertainty_metrics = {k: v for k, v in uncertainty_metrics.items() if k in metric_names} scores = defaultdict(list) for track in test_loader: mean, var = model.prediction(track) targets = torch.tensor([[data[1][0][i][0] for data in track] for i in range(mean.shape[0])]) for name, metric in uncertainty_metrics.items(): scores[name].append(metric(mean, targets, var)) mean_scores = {k: np.mean(v) for k, v in scores.items()} return mean_scores def compute_eval_metrics(model, test_loader): """Wrapper of compute metrics that focuses on the most important metrics Parameters ---------- model: Uncertainty model Model to evaluate test_loader: list[Dataloader] Test dataset Returns ------- mean_scores: dict[str, float] """ metric_names = ["RMSE", "NLL", "ECE", "Het"] return compute_metrics(model, test_loader, metric_names) def compare_models(models, model_names, test_loader): """Compare multiple models on the same test set Parameters ---------- models: list[Uncertainty model] Models to evaluate model_names: list[string] Identifier of the models test_loader: list[Dataloader] Test dataset Returns ------- df: pd.Dataframe Scores as rows and model names as columns """ ds = [compute_metrics(model, test_loader) for model in models] df = pd.DataFrame(ds, index=model_names).T return df '''
[docs] def quantile_ece( predicted_quantiles, targets, expected_quantiles=np.arange(0.05, 1.0, 0.05) ): """ Expected Calibration Error (ECE) measures the mean absolute calibration error of multiple PICPs See "Accurate Uncertainties for Deep Learning Using Calibrated Regression" [Kuleshov et al. 2018 https://arxiv.org/abs/1807.00263] Parameters ---------- predicted_quantiles: list[torch.Tensor] Expected to be of the form [median, lb0, ub0, lb1, ub1, ...] targets: torch.Tensor Ground truth for median expected_quantiles: Quantiles to evaluate Expected to be of the form [lb0, lb1, ..., median, ..., ub1, ub0] Returns ------- pi_width: torch.Tensor """ num_quantiles = len(expected_quantiles) assert num_quantiles % 2 == 1 assert len(predicted_quantiles) // 2 == num_quantiles picp_scores = [] for lb, ub in zip(predicted_quantiles[1::2], predicted_quantiles[2::2]): picp_scores.append( float( torch.logical_and(lb <= targets, targets <= ub).int().mean(dtype=float) ) ) picp_scores = picp_scores[::-1] return nn.L1Loss()(torch.tensor(picp_scores), torch.tensor(expected_quantiles))
[docs] def compute_quantile_metrics( model, test_loader, output_names=["x"], expected_quantiles=np.arange(0.05, 1.0, 0.05), ): """Compute metrics that are compatible with quantile models Parameters ---------- model: QuantileNARX Quantile model to evaluate test_loader: list[Dataloader] Test dataset output_names: list[str], optional Output sensors to consider. The default is ["x"] expected_quantiles: list[float] Quantiles to evaluate Expected to be of the form [lb0, lb1, ..., median, ..., ub1, ub0] Returns ------- mean_scores: dict[str, float] """ scores = defaultdict(list) for track in test_loader: pred = model.prediction(track) var = None for i, output in enumerate(output_names): predidcted_quantiles = [x[i] for x in pred] median, lb, ub = predidcted_quantiles[:3] targets = torch.tensor([data[1][0][i][0] for data in track]) for metric_name, metric in zip( ["RMSE", "MAE", "R2", "Corr"], [rmse, mae, r2, pearson] ): scores[f"{output}_{metric_name}"].append(metric(median, targets, var)) scores[f"{output}_PICP"].append( torch.logical_and(lb <= targets, targets <= ub).int().mean(dtype=float) ) scores[f"{output}_MPIW"].append((ub - lb).mean()) scores[f"{output}_ECE"].append( quantile_ece(predidcted_quantiles, targets, expected_quantiles) ) mean_scores = {k: np.mean(v) for k, v in scores.items()} return mean_scores
[docs] class wf_distribution(rv_continuous): def __init__(self, frequency, norm_psd_cumsum, *args, **kwargs): super().__init__(*args, **kwargs) self.cdf_interpolator = CubicSpline(frequency, norm_psd_cumsum) def _cdf(self, x): return self.cdf_interpolator(x)
def _calc_pdf(x, distribution): return distribution.pdf(x) def _calc_cdf(x, distribution): return distribution.cdf(x) def _calc_ppf(x, distribution): return distribution.ppf(x)
[docs] def distance_calc(inverse_cdf, p): w_distance = pd.DataFrame(index=inverse_cdf.columns, columns=inverse_cdf.columns) for col1 in inverse_cdf.columns: for col2 in inverse_cdf.columns: # if col1 != col2: wstf_act = np.trapezoid( ((inverse_cdf[col1] - inverse_cdf[col2]).abs().dropna().values.ravel()) ** p, x=inverse_cdf.index, ) ** (1 / p) w_distance.loc[col1, col2] = wstf_act return w_distance
[docs] class WassersteinDistance: """ WassersteinDistance class to calculate the Wasserstein distance and the wasserstein Fourier distance. Attributes: data (pd.DataFrame): The input data as a pandas DataFrame. weights_u (pd.DataFrame): The weights associated with the data as a pandas DataFrame. Methods: sort_data: Sort the data and weights in ascending order. weighted_hist: Calculate the weighted histogram of the data. pdf: Calculate the probability density function of the data. cdf: Calculate the cumulative density function of the data. inverse_cdf: Calculate the inverse cumulative density function of the data. wasserstein_distance_p: Calculate the p-th Wasserstein distance. """ def __init__(self, data, weights=None): self.data = data if weights is None: self.weights = pd.DataFrame( np.ones_like(data.values), columns=data.columns, index=data.index ) else: self.weights = weights self.columns = data.columns self.sorted_indices, self.sorted_data, self.sorted_weights = self.sort_data() nfft = min(2048, len(self.data)) nperseg = min(2048, len(self.data)) self.weighted_hist(nfft=nfft, nperseg=nperseg)
[docs] def sort_data(self): """ Sorts the data and weights based on the values in the data. This method sorts the data and weights along the first axis (rows) based on the values in the data. It returns the sorted indices, sorted data, and sorted weights. Returns: tuple: A tuple containing: - sorted_indices (numpy.ndarray): The indices that would sort the data. - sorted_data (pandas.DataFrame): The data sorted according to the sorted indices. - sorted_weights (pandas.DataFrame): The weights sorted according to the sorted indices. """ sorted_indices = np.argsort(self.data.values, axis=0) sorted_data = pd.DataFrame( np.take_along_axis(self.data.values, sorted_indices, axis=0), columns=self.columns, ) sorted_weights = pd.DataFrame( np.take_along_axis(self.weights.values, sorted_indices, axis=0), columns=self.columns, ) return sorted_indices, sorted_data, sorted_weights
[docs] def weighted_hist(self, bins=100, nfft=2048, nperseg=2048): """ Compute weighted histograms for each column in the data. This method calculates the weighted histogram for each column in the dataset using the specified number of bins. The weights for each column are used to compute the histogram. The resulting histograms are stored in the `self.hist` attribute, and the corresponding probability distributions are stored in the `self.distribution` attribute. Parameters: bins (int): The number of bins to use for the histogram. Default is 100. Attributes: self.hist (dict): A dictionary where keys are column names and values are tuples containing the histogram values and bin edges. self.distribution (dict): A dictionary where keys are column names and values are `rv_histogram` objects representing the probability distributions of the histograms. """ self.hist = { col: np.histogram( self.data[col].dropna().to_numpy(), bins=bins, weights=self.weights[col][self.data[col].dropna().index].to_numpy(), density=True, ) for col in self.columns } self.distribution = {col: rv_histogram(self.hist[col]) for col in self.columns} psd_dic = {} for col in self.data.columns: psd = psd_df( self.data[col].dropna().to_frame(), nfft=nfft, nperseg=nperseg ).squeeze() psd.index = psd.index.round( 4 - int(np.floor(np.log10(abs(psd.index[1])))) - 1 ) psd_dic[col] = psd psd = pd.DataFrame(psd_dic) self.psd = psd[psd.index > 0] moment_0 = np.trapezoid(self.psd.values, self.psd.index.values, axis=0) self.NPSD = ( np.mean(self.psd.index.diff().dropna()) * self.psd / moment_0 ) # scaled to frequency delta and area ind = self.NPSD.index.values self.wsfourier_distribution = { col: wf_distribution( ind, self.NPSD[col].cumsum().values, a=ind.min(), b=ind.max() ) for col in self.columns }
[docs] def pdf(self, n_points=1000): x = np.linspace(self.data.min().min(), self.data.max().max(), n_points) return pd.DataFrame( {col: _calc_pdf(x, self.distribution[col]) for col in self.columns}, index=x, )
[docs] def cdf(self, n_points=1000): x = np.linspace(self.data.min().min(), self.data.max().max(), n_points) return pd.DataFrame( {col: _calc_cdf(x, self.distribution[col]) for col in self.columns}, x, )
[docs] def inverse_cdf(self, n_points=1000): x = np.linspace(0, 1, n_points) return pd.DataFrame( {col: _calc_ppf(x, self.distribution[col]) for col in self.columns}, index=x )
[docs] def wsf_pdf(self, n_points=1000): x = np.linspace(self.NPSD.index.min(), self.NPSD.index.max(), n_points) return pd.DataFrame( { col: _calc_pdf(x, self.wsfourier_distribution[col]) for col in self.columns }, # index=pd.Index(np.exp(x), name="freq"), index=pd.Index(x, name="freq"), )
[docs] def wsf_cdf(self, n_points=1000): x = np.linspace(self.NPSD.index.min(), self.NPSD.index.max(), n_points) # x = np.log( # np.logspace( # np.log(self.NPSD.index.min()), # np.log(self.NPSD.index.max()), # base=np.e, # num=n_points, # ) # ) return pd.DataFrame( { col: _calc_cdf(x, self.wsfourier_distribution[col]) for col in self.columns }, # index=pd.Index(np.exp(x), name="freq"), index=pd.Index(x, name="freq"), )
[docs] def wsf_inverse_cdf(self, n_points=1000): x = np.linspace(0, 1, n_points) return pd.DataFrame( { col: _calc_ppf(x, self.wsfourier_distribution[col]) for col in self.columns }, index=x, ) # .apply(np.exp)
[docs] def wasserstein_distance_p(self, p=1): """ Compute the Wasserstein distance (p-th order) based on the inverse cumulative distribution function (CDF). This method calculates the Wasserstein distance, a measure of the distance between two probability distributions, using the inverse CDF of the distribution. The distance is computed for a specified order `p`. Parameters: p (int, optional): The order of the Wasserstein distance. Defaults to 1. Returns: float: The computed Wasserstein distance of order `p`. """ inverse_cdf = self.inverse_cdf(n_points=1000).dropna() w_distance = distance_calc(inverse_cdf, p) return w_distance
[docs] def wasserstein_fourier_distance(self, p=2): """ Compute the Wasserstein Fourier distance for the given data. This method calculates the Wasserstein Fourier distance by first obtaining the inverse cumulative distribution function (CDF) using the normalized power spectral density and then computing the distance using the specified metric. Parameters: p (int, optional): The order of the norm used in the distance calculation. Defaults to 2. Returns: float: The computed Wasserstein Fourier distance. """ inverse_cdf = self.wsf_inverse_cdf(n_points=1000).dropna() wsf_distance = distance_calc(inverse_cdf, p) return wsf_distance