Source code for softsensor.eval_tools

# -*- coding: utf-8 -*-
"""
Created on Tue May  3 17:41:37 2022

@author: WET2RNG
"""
import os
import pickle
import math

import numpy as np
import pandas as pd
import scipy.signal as sig
import sklearn.metrics as metr
import torch


from scipy.special import kl_div
from scipy.spatial.distance import jensenshannon
from scipy.stats import wasserstein_distance

from softsensor.autoreg_models import ARNN, _pred_ARNN_batch

from softsensor.datasets import batch_rec_SW
from softsensor.metrics import (
    crps,
    ece,
    heteroscedasticity,
    mpiw,
    nll,
    pearson,
    picp,
    r2,
    rmse,
    rmv,
    log_area_error,
)
from softsensor.recurrent_models import AR_RNN, RNN_DNN
from softsensor.visualization import plot_sens_analysis, plot_uncertainty_sens


[docs] def load_model(Type, path): """ load function of models Parameters ---------- Type : str Type of Network ['CNN_DNN', 'NARX', 'CNN_NARX', 'AR_CNN', 'RNN']. path : str path of the Model Returns ------- model : nn.Model with nn.Modules result_df : pandas.DataFrame Results of the hyperparameter optimization. """ result_df = pd.read_csv(os.path.join(path, "result_df.csv")) state_dict = os.path.join(path, "Best_model.pt") file = open(os.path.join(path, "model_parameters.pkl"), "rb") model_params = pickle.load(file) file.close() model = _get_model(Type, model_params, state_dict) return model, result_df
def _get_model(Type, model_params, state_dict): """ internal function to load the model by Type Parameters ---------- Type : str Type of Network ['CNN_DNN', 'NARX', 'CNN_NARX', 'AR_CNN', 'RNN']. model_params : dict Model hyper parameters. state_dict : str path of the state_dict of the model (weights) Returns ------- model : nn.Model with nn.Modules """ if Type == "ARNN": model = ARNN(**model_params) elif Type == "AR_RNN": model = AR_RNN(**model_params) elif Type == "RNN": model = RNN_DNN(**model_params) else: raise KeyError("No valid model Type given") model.load_state_dict(torch.load(state_dict)) return model
[docs] def comp_pred( models, data_handle, track, names=None, batch_size=256, reduce=False, sens_analysis=None, ): """ Computes the predictions for given a DataFrame and a Track for the specific models Parameters ---------- models : list of Models DESCRIPTION. data_handle : data handle of the Meas Handling class data handle that should include the track. track : str name of the track to observe. names : list of str. str should include 'NARX' if its a NARX model, 'AR' if its an autoregressive model and 'RNN' if is an Recurrent Network batch_size : int, optional Batch size for models where batching is possible. The default is 256. reduce : bool, optional If True, reduce the epistemic and aleatoric uncertainty to a total one. Only relevant for ensembles and Evidence Estimation ARNN. The default is False. sens_analysis : dict, optional Dictionary that defines if and how a sensitivity analysis is computed for the prediction. If key 'method' is valid, the sensitivity analysis is computed either 'gradient' or 'perturbation'-based. If key 'comp' is given & True, gradients of the prediction w.r.t. inputs are computed. If key 'plot' is given & True, postprocessing results of the gradients are visualized. If key 'sens_length' is given, the prediction is only computed for the n 'sens_length' samples in the time series. The default is None, i.e. no sensitivity analysis is computed. Returns ------- pred_df : pd.DataFrame New columns are Added for each Model prediction. if comp_sens: sens_dict : dict Additional list of dictionaries with the sensitivity analysis results for each model. Examples -------- >>> t = np.linspace(0, 1.0, 101) >>> xlow = np.sin(2 * np.pi * 100 * t) # 100Hz Signal >>> xhigh = np.sin(2 * np.pi * 3000 * t) # 3000Hz Signal >>> d = {'sine_inp': xlow + xhigh, >>> 'cos_inp': np.cos(2 * np.pi * 50 * t), >>> 'out': np.linspace(0, 1.0, 101)} >>> list_of_df = [pd.DataFrame(d), pd.DataFrame(d)] >>> test_df = {'sine_inp': 10*xlow + xhigh, >>> 'cos_inp': np.cos(2 * np.pi * 50 * t), >>> 'out': np.linspace(0, 1.0, 101)} >>> test_df = [pd.DataFrame(test_df)] >>> data_handle = Meas_handling(list_of_df, train_names=['sine1', 'sine2'], >>> input_sensors=['sine_inp', 'cos_inp'], >>> output_sensors=['out'], fs=100, >>> test_dfs=test_df, test_names=['test']) >>> model = ARNN(input_channels=2, pred_size=1, window_size=10, rnn_window=10) >>> pred_df = comp_pred([model], data_handle, track, names=['ARNN']) # without sensitivity analysis >>> pred_df, sens_dict = comp_pred([model], data_handle, track, names=['ARNN'], sens_analysis={'method': 'gradient', 'params': {'comp': True, 'plot': True}}) # with sensitivity analysis """ if names is None: names = [m.Type for m in models] pred_df = data_handle.give_dataframe(track) sens_list = [] for m, n in zip(models, names): if sens_analysis: pred_df, sens = _model_pred( m, data_handle, pred_df, n, track, batch_size, reduce=reduce, sens_analysis=sens_analysis, ) sens_list.append(sens) else: pred_df = _model_pred( m, data_handle, pred_df, n, track, batch_size, reduce=reduce ) if sens_analysis: # return directly the dict itself if only one model is computed sens_list = sens_list[0] if len(sens_list) == 1 else sens_list return pred_df, sens_list else: return pred_df
def _model_pred( m, data_handle, pred_df, n, track, batch_size=256, reduce=False, sens_analysis=None ): """ Internal Method for the prediction of time series Parameters ---------- m : model model to predict. data_handle : Meas_handling class Meas handling class that has internal functions to get Dataloader and Dataset. pred_df : pd.Dataframe Dataframe where the prediction is appended as additional column. n : str Name of the model. New columns is dataframe will be named as: f'{output sensor}_{n}' track : str name of the track to predict. batch_size : int, optional Batch size for models where batching is possible. The default is 256. reduce : bool, optional If True, reduce the epistemic and aleatoric uncertainty to a total one. Only relevant for ensembles and Evidence Estimation ARNN. The default is False. sens_analysis : dict, optional Dictionary that defines if and how a sensitivity analysis is computed for the prediction. If key 'method' is valid, the sensitivity analysis is computed either 'gradient' or 'perturbation'-based. If key 'comp' is given & True, gradients of the prediction w.r.t. inputs are computed. If key 'plot' is given & True, postprocessing results of the gradients are visualized. If key 'sens_length' is given, the prediction is only computed for the n 'sens_length' samples in the time series. The default is None, i.e. no sensitivity analysis is computed. Raises ------ AttributeError if Modeltype is not implemented. Returns ------- pred_df : pd.Dataframe prd_df with additional columns for the prediction. if comp_sens: sens_dict : dict Dictionary with the sensitivity analysis results for each model. """ checks = _check_sens_analysis(sens_analysis) if sens_analysis: sens_params = checks method, comp_sens, plot_sens = list(sens_params.values())[:3] use_case = sens_params.get("use_case", None) orig_length = sens_params.get("orig_length", None) random_samples = sens_params.get("random_samples", False) amplification = sens_params.get("amplification", 1) save_path = sens_params.get("save_path", None) channel_names = data_handle.input_sensors + data_handle.output_sensors sens_dict = None # default None type for models without sensitivities sens_uq, eps = ( None, None, ) # default None type for _predict_ARNN method and ensembles else: sens_params, method, comp_sens = checks # define some generic functions for prediction and sensitivity analysis to avoid code duplication def get_prediction(m, dataloader, reduce=False, sens_params=None): """Generic function to get the prediction of a model""" if sens_params: if m.Pred_Type == "Mean_Var" and m.Ensemble: return m.prediction(dataloader, reduce=reduce, sens_params=sens_params) else: return m.prediction(dataloader, sens_params=sens_params) else: if m.Pred_Type == "Mean_Var" and m.Ensemble: return m.prediction(dataloader, reduce=reduce) else: return m.prediction(dataloader) def write_predictions_to_df(pred, pred_df, labels, reduce=False): """Update the DataFrame with uncertainty labels, coming from the prediction results.""" if m.Pred_Type == "Mean_Var" and m.Ensemble and not reduce: # Uncertainty labels for Mean_Var and Ensemble without reduction uncertainty_labels = [ f"{out_name}_{n}_ep_var" for out_name in data_handle.output_sensors ] pred_df.loc[:, uncertainty_labels] = np.array(pred[1].transpose(1, 0)) uncertainty_labels = [ f"{out_name}_{n}_al_var" for out_name in data_handle.output_sensors ] pred_df.loc[:, uncertainty_labels] = np.array(pred[2].transpose(1, 0)) pred = pred[0] elif m.Pred_Type == "Mean_Var" and m.Ensemble and reduce: # Uncertainty labels for Mean_Var and Ensemble with reduction uncertainty_labels = [ f"{out_name}_{n}_var" for out_name in data_handle.output_sensors ] pred_df.loc[:, uncertainty_labels] = np.array(pred[1].transpose(1, 0)) pred = pred[0] elif m.Pred_Type == "Mean_Var" or m.Ensemble: # Uncertainty labels for either Mean_Var or Ensemble uncertainty_labels = [ f"{out_name}_{n}_var" for out_name in data_handle.output_sensors ] pred_df.loc[:, uncertainty_labels] = np.array(pred[1].transpose(1, 0)) pred = pred[0] elif m.Pred_Type == "Quantile": # quantiles = np.linspace(0, 1, m.n_quantiles+2)[1:m.n_quantiles+1]*100 # quantiles = np.rint(quantiles).astype(int) labels = ["median"] for i in range(math.floor(m.n_quantiles / 2)): labels = labels + [f"{n}_lb{i}", f"{n}_ub{i}"] labels = [ f"{out_name}_{l}" for out_name in data_handle.output_sensors for l in labels ] pred = torch.reshape(pred, (m.pred_size * m.n_quantiles, -1)) return pred, labels labels = [f"{out_name}_{n}" for out_name in data_handle.output_sensors] Type = m.Type if Type in ["TF", "ARX"]: pred_mimo = m.prediction(pred_df) pred_df.loc[:, labels] = np.asarray(pred_mimo)[: len(pred_df)] elif Type in ["AR", "AR_RNN"]: loader = data_handle.give_list( window_size=m.window_size, keyword=[track], batch_size=1, rnn_window=m.rnn_window, forecast=m.forecast, full_ds=False, ) pred_result = get_prediction( m, loader[0], reduce=reduce, sens_params=sens_params ) if method and comp_sens: if random_samples: pred, sens_dict, sens_uq, eps = pred_result else: pred, sens_dict = pred_result if plot_sens: _plot_sensitivities( m, sens_dict, plot_sens, method, channel_names, use_case, orig_length, sens_uq, eps, amplification, save_path=save_path, ) else: pred = pred_result pred, labels = write_predictions_to_df(pred, pred_df, labels, reduce=reduce) pred = pred.transpose(1, 0) pred_df.loc[:, labels] = ( pred.detach().cpu().numpy() if torch.is_tensor(pred) else np.array(pred) ) elif Type == "RNN": loader = data_handle.give_list( window_size=m.window_size, keyword=[track], Add_zeros=True, batch_size=batch_size, forecast=m.forecast, full_ds=False, ) pred_result = get_prediction( m, loader[0], reduce=reduce, sens_params=sens_params ) if method and comp_sens: pred, sens_dict = pred_result if plot_sens: _plot_sensitivities( m, sens_dict, plot_sens, method, channel_names, use_case, orig_length, sens_uq, eps, save_path=save_path, ) else: # pred = m.prediction(loader[0]) if m.Pred_Type == "Mean_Var" and m.Ensemble: pred = m.prediction(loader[0], reduce=reduce) else: pred = m.prediction(loader[0]) if m.Pred_Type == "Mean_Var" and m.Ensemble and not reduce: uncertainty_labels = [ f"{out_name}_{n}_ep_var" for out_name in data_handle.output_sensors ] # pred_df[uncertainty_labels] = pred[1].transpose(1, 0) pred_df.loc[:, uncertainty_labels] = pred[1].transpose(1, 0) uncertainty_labels = [ f"{out_name}_{n}_al_var" for out_name in data_handle.output_sensors ] # pred_df[uncertainty_labels] = pred[2].transpose(1, 0) pred_df.loc[:, uncertainty_labels] = pred[2].transpose(1, 0) pred = pred[0] elif m.Pred_Type == "Mean_Var" and m.Ensemble and reduce: uncertainty_labels = [ f"{out_name}_{n}_var" for out_name in data_handle.output_sensors ] # pred_df[uncertainty_labels] = pred[1].transpose(1, 0) pred_df.loc[:, uncertainty_labels] = pred[1].transpose(1, 0) pred = pred[0] elif m.Pred_Type == "Mean_Var" or m.Ensemble: uncertainty_labels = [ f"{out_name}_{n}_var" for out_name in data_handle.output_sensors ] # pred_df[uncertainty_labels] = pred[1].transpose(1, 0) pred_df.loc[:, uncertainty_labels] = np.array(pred[1].transpose(1, 0)) pred = pred[0] pred = pred.transpose(1, 0) pred_df.loc[:, labels] = ( pred.detach().cpu().numpy() if torch.is_tensor(pred) else np.array(pred) ) else: raise AttributeError( f"Modeltype for model {m.__class__.__name__} not implemented" ) if sens_analysis: return pred_df, sens_dict else: return pred_df
[docs] def comp_batch( models, data_handle, tracks, names, device="cpu", batch_size=256, n_samples=5, reduce=False, sens_analysis=None, ): """ Compute the prediction for a list of models and tracks Parameters ---------- models : list of models list of models for computation. MOdels need to be defined in _model_pred. data_handle : Meas_handling class Meas handling class that has internal functions to get Dataloader and Dataset. tracks : list of str Names of the tracks to predict. names : list of str Names that are added to the column name. device : str, optional device for computation. The default is 'cpu'. batch_size : int, optional Batch size for models where batching is possible. The default is 256. reduce : bool, optional If True, reduce the epistemic and aleatoric uncertainty to a total one. Only relevant for ensembles and Evidence Estimation ARNN. The default is False. sens_analysis : dict, optional Dictionary that defines if and how a sensitivity analysis is computed for the prediction. If key 'method' is valid, the sensitivity analysis is computed either 'gradient' or 'perturbation'-based. If key 'comp' is given & True, gradients of the prediction w.r.t. inputs are computed. If key 'plot' is given & True, postprocessing results of the gradients are visualized. If key 'sens_length' is given, the prediction is only computed for the n 'sens_length' samples in the time series. The default is None, i.e. no sensitivity analysis is computed. Returns ------- pred_df : list of pd.Dataframe pred_dfs with additional columns for the predictions. Examples -------- Define Data >>> import softsensor.meas_handling as ms >>> import numpy as np >>> import pandas as pd >>> t = np.linspace(0, 1.0, 101) >>> d = {'inp1': np.random.randn(101), 'inp2': np.random.randn(101), 'out': np.random.randn(101)} >>> handler = ms.Meas_handling([pd.DataFrame(d, index=t)], ['train'], ['inp1', 'inp2'], ['out'], fs=100) Compute Prediction >>> from softsensor.eval_tools import comp_batch >>> import softsensor.autoreg_models >>> params = {'input_channels': 2, 'pred_size': 1, 'window_size': 10, 'rnn_window': 10} >>> m = softsensor.autoreg_models.ARNN(**params, hidden_size=[16, 8]) >>> dataframes = comp_batch([m], handler, handler.train_names, ['ARNN'], device='cpu') >>> list(dataframes[0].columns) ['inp1', 'inp2', 'out', 'out_ARNN'] Compute Prediciton wth uncertainty >>> import softsensor.homoscedastic_model as hm >>> vars = hm.fit_homoscedastic_var(dataframes, ['out'], ['out_ARNN']) >>> homosc_m = hm.HomoscedasticModel(m, vars) >>> sepmve = softsensor.autoreg_models.SeparateMVEARNN(**params,mean_model=m, var_hidden_size=[16, 8]) >>> dataframes = comp_batch([m, homosc_m, sepmve], handler, handler.train_names, ['ARNN', 'Homosc_ARNN', 'SepMVE'], device='cpu') >>> list(dataframes[0].columns) ['inp1','inp2','out','out_ARNN','out_Homosc_ARNN','out_Homosc_ARNN_var','out_SepMVE','out_SepMVE_var'] """ checks = _check_sens_analysis(sens_analysis) if sens_analysis: sens_params = checks method, comp_sens, plot_sens = list(sens_params.values())[:3] use_case = sens_params.get("use_case", None) orig_length = sens_params.get("orig_length", None) amplification = sens_params.get("amplification", 1) save_path = sens_params.get("save_path", None) channel_names = data_handle.input_sensors + data_handle.output_sensors sens_list = [] else: method, comp_sens = checks[1:] dataframes = data_handle.give_dataframes(tracks) for m, n in zip(models, names): if sens_analysis: sens_dict = None # default None type for models without sensitivities sens_uq, eps = ( None, None, ) # default None type for sens analysis without amplification if m.Type == "AR" and (m.Ensemble is False) and (m.Pred_Type != "Quantile"): if method and comp_sens: pred, sens_dict, sens_uq, eps = _comp_ARNN_batch( m, data_handle, tracks, device, sens_params=sens_params ) if plot_sens: _plot_sensitivities( m, sens_dict, plot_sens, method, channel_names, use_case, orig_length, sens_uq, eps, amplification, batched=True, save_path=save_path, ) else: pred = _comp_ARNN_batch(m, data_handle, tracks, device) if m.Pred_Type == "Point": labels = [f"{out_name}_{n}" for out_name in data_handle.output_sensors] dataframes = _ARNN_dataframe_pred(dataframes, pred, labels) if m.Pred_Type == "Mean_Var": labels = [f"{out_name}_{n}" for out_name in data_handle.output_sensors] dataframes = _ARNN_dataframe_pred(dataframes, pred[0], labels) labels = [ f"{out_name}_{n}_var" for out_name in data_handle.output_sensors ] dataframes = _ARNN_dataframe_pred(dataframes, pred[1], labels) if sens_analysis: sens_list.append(sens_dict) else: if sens_analysis: dict_list = [] sens_analysis["params"]["plot"] = False for track, df in zip(tracks, dataframes): df, sens_dict = _model_pred( m, data_handle, df, n, track, batch_size, reduce=reduce, sens_analysis=sens_analysis, ) dict_list.append(sens_dict) if sens_dict: sens_dict = { k: [] for k in dict_list[0].keys() } # same list structure as with batched approach for d in dict_list: for k, v in d.items(): sens_dict[k].append(v) if plot_sens: _plot_sensitivities( m, sens_dict, plot_sens, method, channel_names, use_case, orig_length, sens_uq, eps, batched=True, save_path=save_path, ) sens_analysis["params"]["plot"] = True sens_list.append(sens_dict) else: for track, df in zip(tracks, dataframes): df = _model_pred( m, data_handle, df, n, track, batch_size, reduce=reduce ) if sens_analysis: sens_list = sens_list[0] if len(sens_list) == 1 else sens_list return dataframes, sens_list else: return dataframes
def _ARNN_dataframe_pred(dataframes, pred, labels): """ internal method to add predictions to dataframes Parameters ---------- dataframes : list of df dataframes where the predction is added as columns. pred : list of arrays prediction to be added to the dataframes. labels : list of str column names. Raises ------ ValueError Raised igf lengths of dataframes and pred does not match. Returns ------- dataframes : dataframes : list of df dataframes with added predictions """ if len(dataframes) != len(pred): raise ValueError("length of lists does not match") for i, p in enumerate(pred): dataframes[i][labels] = p.transpose(1, 0).cpu() return dataframes def _comp_ARNN_batch(model, data_handle, tracks, device="cpu", sens_params=None): """ Internal Method to compute the batch of AR Models for faster computation Parameters ---------- model : Autoregressive model Model for the prediction. data_handle : Meas_handling class Measurement handling that contains tracks. tracks : list of str list of tracks to compute prediction. device : str, optional device for computation. The default is 'cpu'. loss_fkt : nn.Loss, optional Loss function for training. The default is None. comp_gradients : bool, optional If True, gradients of the prediction w.r.t. inputs are computed. The default is False. Returns ------- if loss_ft=None: list of torch.Tensor Torch Tensor of same length as tracks if loss_ft=torch loss function: list of (torch.Tensor, loss) list of tuple of Torch Tensor of same length as input and loss """ sws = data_handle.give_Datasets( model.window_size, keyword=tracks, rnn_window=model.rnn_window, full_ds=False, forecast=model.forecast, ) batch_sw = batch_rec_SW(sws) forecast = batch_sw.forecast if sens_params: method = sens_params.get("method", "") comp_sens = sens_params.get("comp", False) sens_length = sens_params.get("sens_length", None) else: comp_sens = False if comp_sens: if sens_length: # commonly shared sens_length cannot be larger than smallest length in the batch add_zeros = [sw.add_zero for sw in batch_sw.sws] min_length = min( [ le * forecast - zeros for le, zeros in zip(batch_sw.__lengths__(), add_zeros) ] ) if sens_length > min_length: sens_params["sens_length"] = ( min_length # update sensitivity length in params dict ) print( f"INFO: Given sensitivity length was changed to {min_length} as the smallest length in the batch." ) results = _pred_ARNN_batch(model, batch_sw, device, sens_params=sens_params) prediction_sh, sensitivities, sens_uq, eps = results else: prediction_sh = _pred_ARNN_batch(model, batch_sw, device) # Make List of prediction if model.Pred_Type == "Point": pred_list = list(prediction_sh) if comp_sens: sens_mean_list = list(sensitivities) elif model.Pred_Type == "Mean_Var": pred_list = list(prediction_sh[0]) var_pred_list = list(prediction_sh[1]) if comp_sens: sens_mean_list = list(sensitivities[0]) sens_var_list = list(sensitivities[1]) if len(pred_list) > 1: orig_indizes = batch_sw.permutation() add_zeros = [sw.add_zero for sw in batch_sw.sws] # cut the predictions and gradient matrices to their original length for i, (p, le, zeros) in enumerate( zip(pred_list, batch_sw.__lengths__(), add_zeros) ): pred_list[i] = p[:, : le * forecast - zeros] if comp_sens and sens_length is None: sens_mean_list[i] = sens_mean_list[i][: le * forecast - zeros, :, :] if model.Pred_Type == "Mean_Var": for i, (p, le, zeros) in enumerate( zip(var_pred_list, batch_sw.__lengths__(), add_zeros) ): var_pred_list[i] = p[:, : le * forecast - zeros] if comp_sens and sens_length is None: sens_var_list[i] = sens_var_list[i][: le * forecast - zeros, :, :] # sort the predictions and gradient matrices back to their original order pred = [x for _, x in sorted(zip(orig_indizes, pred_list))] if model.Pred_Type == "Mean_Var": var_pred = [x for _, x in sorted(zip(orig_indizes, var_pred_list))] if comp_sens: sens_mean = [x for _, x in sorted(zip(orig_indizes, sens_mean_list))] if model.Pred_Type == "Mean_Var": sens_var = [x for _, x in sorted(zip(orig_indizes, sens_var_list))] else: le = batch_sw.__lengths__()[0] pred = [pred_list[0][:, : le * forecast - batch_sw.sws[0].add_zero]] if model.Pred_Type == "Mean_Var": var_pred = [var_pred_list[0][:, : le * forecast - batch_sw.sws[0].add_zero]] if comp_sens: sens_mean = ( [sens_mean_list[0][: le * forecast - batch_sw.sws[0].add_zero, :, :]] if sens_length is None else [sens_mean_list[0]] ) if model.Pred_Type == "Mean_Var": sens_var = ( [sens_var_list[0][: le * forecast - batch_sw.sws[0].add_zero, :, :]] if sens_length is None else [sens_var_list[0]] ) for i, p in enumerate(pred): if len(p.shape) == 1: pred[i] = p.unsqueeze(dim=0) if model.Pred_Type == "Mean_Var": for i, p in enumerate(var_pred): if len(p.shape) == 1: var_pred[i] = p.unsqueeze(dim=0) pred = (pred, var_pred) # print(f'Shapes of predictions: {[p.shape for p in pred]}') if comp_sens: # print(f'Shapes of {method} matrices: {[s.shape for s in sens_mean]}\n') if model.Pred_Type == "Mean_Var": sensitivity_dict = {"Mean": sens_mean, "Aleatoric_UQ": sens_var} else: sensitivity_dict = {"Point": sens_mean} return pred, sensitivity_dict, sens_uq, eps else: return pred
[docs] def comp_error( test_df, out_sens, fs=None, names=["pred"], metrics=["MSE"], freq_metrics=None, freq_range=None, bins=20, ): """ Computes the Error from a df with specific Names in the column Parameters ---------- test_df : pandas DataFrame DataFrame that must include the original output and the prediction column name of the prediction must look like: 'out_sens_name'. out_sens : list of str column names to observe fs : float sampling rate of the df for psd error computation. names : list of str, optional list of names that are appended to the original column. The default is ['pred']. metrics : list of str, optional Metrics to Evaluate in Time domain ['MSE', 'MAE', 'MAPE']. The default is ['MSE']. freq_range : tuple of float, optional range in which the psd error is computed. The default is None. Returns ------- result_df : pandas DataFrame DataFrame with errors as index and names as columns. Examples -------- Based on the examples in comp_batch. Prediction of point Metrics >>> from softsensor.eval_tools import comp_error >>> comp_error(dataframes[0], out_sens=['out'], names=['ARNN', 'Homosc_ARNN', 'SepMVE'], metrics=['MSE', 'MAE'], freq_range=None) ARNN Homosc_ARNN SepMVE out_MSE 1.297152 1.297152 1.297152 out_MAE 0.924926 0.924926 0.924926 Prediction of Distributional Metrics >>> comp_error(dataframes[0], out_sens=['out'], names=['Homosc_ARNN', 'SepMVE'], metrics=['NLL', 'ECE'], freq_range=None) Homosc_ARNN SepMVE out_NLL 0.630110 0.678553 out_ECE 0.023137 0.083637 Prediction of Statistical Metrics >>> comp_error(dataframes[0], out_sens=['out'], names=['ARNN', 'Homosc_ARNN', 'SepMVE'], metrics=['JSD', 'Wasserstein'], freq_range=None) ARNN Homosc_ARNN SepMVE out_JSD 0.680494 0.680494 0.680494 out_Wasserstein 0.073267 0.073267 0.073267 Prediction of Metrics in frequency domain (PSD) >>> comp_error(dataframes[0], out_sens=['out'], names=['ARNN', 'Homosc_ARNN', 'SepMVE'], metrics=None, freq_metrics=['MSLE'], fs=100, freq_range=(5, 25)) ARNN Homosc_ARNN SepMVE out_PSD_MSLE 0.000864 0.000864 0.000864 """ if metrics is not None: metr_err = [] for m in metrics: err = comp_metrics(test_df, out_sens, names, metric=m, bins=bins) metr_err.append( pd.DataFrame(err, columns=names, index=[f"{s}_{m}" for s in out_sens]) ) metr_err = pd.concat(metr_err) if freq_metrics is not None: psd_err = [] for m in freq_metrics: psd = comp_psd(test_df, out_sens, fs, names, freq_range) err = comp_metrics(psd, out_sens, names, metric=m) psd_err.append( pd.DataFrame( err, columns=names, index=[f"{s}_PSD_{m}" for s in out_sens] ) ) psd_err = pd.concat(psd_err) if (metrics is not None) and (freq_metrics is not None): result_df = pd.concat((metr_err, psd_err)) elif (metrics is None) and (freq_metrics is not None): result_df = psd_err elif (metrics is not None) and (freq_metrics is None): result_df = metr_err else: return None return result_df
[docs] def comp_psd(test_df, out_sens, fs, names=["pred"], freq_range=None): """ Compute the MLPE of the PSD's Parameters ---------- test_df : pandas DataFrame DataFrame that must include the original output and the prediction column name of the prediction must look like: 'out_sens_name'. out_sens : list of str column names to observe fs : float sampling rate of the df for psd error computation. names : list of str, optional list of names that are appended to the original column. The default is ['pred']. freq_range : tuple of float, optional range in which the psd error is computed. The default is None. Returns ------- psd_error : matrix of shape [len(out_sens), len(names)] """ psd_list = [] temp_n = [] for i, s in enumerate(out_sens): f, psd_original = sig.welch(test_df[s], fs=fs) if freq_range is not None: psd_original = psd_original[(freq_range[0] < f) & (f < freq_range[1])] f = f[(freq_range[0] < f) & (f < freq_range[1])] temp_n.append(f"{s}") psd_list.append(psd_original.reshape((len(psd_original), 1))) for ii, n in enumerate(names): f, psd_mod = sig.welch(test_df[f"{s}_{n}"].fillna(0), fs=fs) if freq_range is not None: psd_mod = psd_mod[(freq_range[0] < f) & (f < freq_range[1])] f = f[(freq_range[0] < f) & (f < freq_range[1])] temp_n.append(f"{s}_{n}") psd_list.append(psd_mod.reshape((len(psd_mod), 1))) psd_array = np.concatenate(psd_list, axis=1) psd_df = pd.DataFrame(psd_array, index=f, columns=temp_n) return psd_df
[docs] def comp_metrics(test_df, out_sens, names=["pred"], metric="MSE", bins=20): """ Compute the Errors in Time domain Parameters ---------- test_df : pandas DataFrame DataFrame that must include the original output and the prediction column name of the prediction must look like: 'out_sens_name'. if metric is uncertainty metric: column name of the uncertainty must look like: 'uncertainty_{out_sens}_{name}'. out_sens : list of str column names to observe names : list of str, optional list of names that are appended to the original column. The default is ['pred']. metrics : list of str, optional Metrics to Evaluate in Time domain ['MSE', 'MAE', 'MAPE']. The default is ['MSE']. Returns ------- error : matrix of shape [len(out_sens), len(names)] """ point_metrics = { "MSE": metr.mean_squared_error, "MAE": metr.mean_absolute_error, "MAPE": metr.mean_absolute_percentage_error, "MSLE": metr.mean_squared_log_error, } uncertainty_metrics = { "RMSE": rmse, "R2": r2, "Corr": pearson, "NLL": nll, "RMV": rmv, "CRPS": crps, "Het": heteroscedasticity, "PICP": picp, "MPIW": mpiw, "ECE": ece, } error = np.zeros((len(out_sens), len(names))) if metric in ["KLD", "JSD", "Wasserstein"]: nam = [f"{s}" for s in out_sens] + [f"{s}_{n}" for n in names for s in out_sens] bins = np.histogram(np.hstack((np.array(test_df[nam]))), bins=bins)[1] for i, s in enumerate(out_sens): for ii, n in enumerate(names): if metric in point_metrics: error[i, ii] = point_metrics[metric](test_df[s], test_df[f"{s}_{n}"]) elif metric in uncertainty_metrics: target, mean, variance = [ torch.tensor(test_df[x].values) for x in [s, f"{s}_{n}", f"{s}_{n}_var"] ] error[i, ii] = float( uncertainty_metrics[metric](mean, target, variance) ) elif metric == "KLD": p_x = np.histogram(test_df[f"{s}"], bins=bins)[0] / len(test_df[f"{s}"]) p_m = np.histogram(test_df[f"{s}_{n}"], bins=bins)[0] / len( test_df[f"{s}_{n}"] ) if not np.all(p_m): print( "WARNING: Histogram contains zero elements." + "KL Divergence will result in infinite values." + "Consider using the Wasserstein Distance or Jensen Shannon Divergence instead" ) error[i, ii] = np.inf else: error[i, ii] = np.sum(kl_div(p_x, p_m)) elif metric == "JSD": p_x = np.histogram(test_df[f"{s}"], bins=bins)[0] / len(test_df[f"{s}"]) p_m = np.histogram(test_df[f"{s}_{n}"], bins=bins)[0] / len( test_df[f"{s}_{n}"] ) error[i, ii] = jensenshannon(p_x, p_m) elif metric == "Wasserstein": p_x = np.histogram(test_df[f"{s}"], bins=bins)[0] / len(test_df[f"{s}"]) p_m = np.histogram(test_df[f"{s}_{n}"], bins=bins)[0] / len( test_df[f"{s}_{n}"] ) error[i, ii] = wasserstein_distance(p_x, p_m) elif metric == "log_area": # err = log_area_error(test_df[f'{s}_{n}'], test_df[s], test_df.index) error[i, ii] = log_area_error( test_df[f"{s}_{n}"], test_df[s], test_df.index ) return error
def _comp_mean_metrics_single_track( models, data_handle, track, fs, model_names, metrics, freq_range=None ): """Compute the metric scores of models on a single track Parameters ---------- models: list[nn.Module] Torch models to evaluate data_handle: Datahandle Datahandle that contains track track: String Name of the track to evaluate fs : float Sampling rate of the df for psd error computation. model_names: list[string] Names of the models metrics: list[string] Names of the metrics to evaluate freq_range : tuple of float, optional range in which the psd error is computed. The default is None. Returns ------- scores_df: dict[str, float] """ pred_df = comp_pred(models, data_handle, track, model_names) scores_df = comp_error( pred_df, data_handle.output_sensors, fs, model_names, metrics, freq_range ) return scores_df
[docs] def comp_mean_metrics(models, data_handle, fs, model_names, metrics, freq_range=None): """Compute the mean metric scores of models on the test tracks Parameters ---------- models: list[nn.Module] Torch models to evaluate data_handle: Datahandle Datahandle that contains track fs : float Sampling rate of the df for psd error computation. model_names: list[string] Names of the models metrics: list[string] Names of the metrics to evaluate freq_range : tuple of float, optional range in which the psd error is computed. The default is None. reduce : bool, optional If True we compute the mean scores over all output variables. The default is False. Returns ------- scores_df: dict[str, float] """ track_scores = [ _comp_mean_metrics_single_track( models, data_handle, track, fs, model_names, metrics, freq_range ) for track in data_handle.test_names ] mean_scores = pd.concat(track_scores).groupby(level=0).mean() return mean_scores[::-1]
def _check_sens_analysis(sens_analysis): """ Check the validity of the sensitivity analysis dictionary. Parameters ---------- sens_analysis : dict Dictionary that defines if and how a sensitivity analysis is computed for the prediction. Returns ------- if sens_analysis is None: Flattened dict (None), method (empty string), comp_sens (False). else: Flattened dict (from input argument) """ if sens_analysis: method = sens_analysis.get("method", "") params = sens_analysis.get("params", {}) comp_sens = params.get("comp", False) plot_sens = params.get("plot", False) sens_params = {"method": method, "comp": comp_sens, "plot": plot_sens} sens_params.update(params) return sens_params else: sens_params = None method = "" comp_sens = False return sens_params, method, comp_sens def _plot_sensitivities( model, sens_dict, plot_sens, method, ch_names, use_case=None, orig_length=None, sens_uq=None, eps=None, amplification=1, batched=False, save_path=None, ): """ Plot the results of the sensitivity analysis for every item in the sensitivity dictionary. """ plot_sens = plot_sens.lower() if isinstance(plot_sens, str) else plot_sens if plot_sens not in [True, False, "all"]: raise ValueError( ( f'Invalid value given for key "plot"! ' f'Expected True, False or "all", got "{plot_sens}".' ) ) sens_dict = sens_dict if plot_sens == "all" else dict(list(sens_dict.items())[:1]) for pred_result, sens_item in sens_dict.items(): if batched: # only for comp_batch method sens = torch.stack(sens_item).mean(dim=0) title = ( f"\n### {method.upper()}-based Sensitivity analysis results for " f"{model.__class__.__name__} model, avg. over {len(sens_item)} test tracks, {pred_result.upper()} prediction ###" ) else: sens = sens_item title = ( f"\n### {method.upper()}-based Sensitivity analysis results for " f"{model.__class__.__name__} model, {pred_result.upper()} prediction ###" ) plot_sens_analysis( model, sens, ch_names, title=title, use_case=use_case, orig_length=orig_length, save_path=save_path, ) if (sens_uq is not None) and (eps is not None): plot_uncertainty_sens( model, sens_uq, eps, ch_names, use_case, amplification, save_path )