Source code for softsensor.meas_handling

# -*- coding: utf-8 -*-
"""
Created on Mon May 31 13:08:01 2021

@author: WET2RNG
"""

import pandas as pd
import numpy as np
from torch.utils.data import ConcatDataset, random_split, DataLoader
from torch.utils.data import Subset
from sklearn.preprocessing import StandardScaler
from pylife.stress.timesignal import butter_bandpass
from scipy import interpolate

from softsensor.datasets import SlidingWindow


[docs] class Meas_handling: """ Measurement handling class that can be used for the whole data preprocessing Parameters ---------- train_dfs : List of pd.DataFrames List of pd.DataFrame for every Measurement used as Training data for subsequent models. train_names : List of str List of Track Names corresponding to the train_dfs. Need to have the same length as train_dfs input_sensors : List of str Input sensors for the subsequent models, The order of the str defines the order in which the data is present inside the loader output_sensors : List of str Output sensors for the subsequent models, The order of of the str defines the order in which the data is present inside the loader fs : int sample frequency. test_dfs : List of pd.DataFrames, optional List of pd.DataFrame for every Measurement used as Testing data for subsequent models. The default is None test_names : List of str, optional List of Track Names corresponding to the test_dfs. Need to have the same length as test_dfs. The default is None pre_comp_cols : List of str, optional Defines wheather and which a precomputed solution is in the dataset. Precomputed solutions might be helpfull in certain training tasks The default is None Returns ------- None. Examples ------- Define a Measurment Handling class >>> import softsensor.meas_handling as ms >>> import pandas as pd >>> import numpy as np >>> t = np.linspace(0, 1.0, 10001) >>> xlow = np.sin(2 * np.pi * 100 * t) >>> xhigh = 0.2 * np.sin(2 * np.pi * 3000 * t) >>> d = {'sine_inp': xlow + .1 * xhigh, 'cos_inp': np.cos(2 * np.pi * 50 * t), 'out': np.linspace(0, 1.0, 10001)} >>> t = np.linspace(0, 1.0, 10001) >>> test_df = {'sine_inp': 10*xlow + .1 * xhigh, 'cos_inp': np.cos(2 * np.pi * 50 * t), 'out': np.linspace(0, 1.0, 10001)} >>> handler = ms.Meas_handling([pd.DataFrame(d, index=t), pd.DataFrame(d, index=t)], ['sine1', 'sine2'], ['sine_inp', 'cos_inp'], ['out'], 10000, [pd.DataFrame(test_df, index=t)], ['test']) """ def __init__( self, train_dfs, train_names, input_sensors, output_sensors, fs, test_dfs=None, test_names=None, pre_comp_cols=None, ): self.train_names = train_names self.test_names = test_names self.input_sensors = input_sensors self.output_sensors = output_sensors self.pre_comp_cols = pre_comp_cols self.fs = fs self.train_df = train_dfs self.test_df = test_dfs self.scaler = None self.Scaled = False # check that input is valid self._check_input(train_dfs, train_names, input_sensors + output_sensors) if (test_dfs is not None) or (test_names is not None): self._check_input(test_dfs, test_names, input_sensors + output_sensors) names = train_names + test_names self.test_data = True else: names = train_names self.test_data = False self._check_naming(names)
[docs] def Scale(self, scaler=StandardScaler(), predef_scaler=False): """ Scale the Data. The scaler is fitted only on the traindata. Afterwards train and testdata is transformed. Parameters ---------- scaler : sklearn.preprocessing scaler, optional The default is StandardScaler(). predef_scaler : bool, optional if True, scaler needs to be fitted already and will be used to scale data. This might come in handy if the data used for scaling is no longer available or special scaling procedures are required The default is False Returns ------- None. Examples ------- Based on the Example from class initialisation >>> df = handler.give_dataframe('sine1') >>> print(np.var(df['sine_inp'].values)) >>> handler.Scale() >>> print(np.var(df['sine_inp'].values)) 1.0 """ if predef_scaler: self.scaler = scaler else: dfs = [d[self.input_sensors + self.output_sensors] for d in self.train_df] self.scaler = scaler.fit(pd.concat(dfs)) for i, df in enumerate(self.train_df): df = df[self.input_sensors + self.output_sensors] self.train_df[i][self.input_sensors + self.output_sensors] = ( self.scaler.transform(df) ) if self.test_data: for i, df in enumerate(self.test_df): df = df[self.input_sensors + self.output_sensors] self.test_df[i][self.input_sensors + self.output_sensors] = ( self.scaler.transform(df) ) self.Scaled = True
[docs] def Resample(self, fs, kind="linear"): """ Resample self.train_df and self.test_df to fs using Fourier method along the given axis. Parameters ---------- fs : float or int fs to resample data to. kind : str Specifies the kind of interpolation as a string or as an integer specifying the order of the spline interpolator to use. The string has to be one of ‘linear’, ‘nearest’, ‘nearest-up’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, or ‘next’. ‘zero’, ‘slinear’, ‘quadratic’ and ‘cubic’ refer to a spline interpolation of zeroth, first, second or third order; ‘previous’ and ‘next’ simply return the previous or next value of the point; ‘nearest-up’ nd ‘nearest’ differ when interpolating half-integers (e.g. 0.5, 1.5) in that ‘nearest-up’ rounds up and ‘nearest’ down. Default is ‘linear’. see interpolate.interp1d docu Returns ------- None. """ self.fs = fs for i, df in enumerate(self.train_df): new_t = np.arange(df.index[0], df.index[-1], 1 / fs) new_data = { col: interpolate.interp1d(df.index, df[col])(new_t) for col in df } self.train_df[i] = pd.DataFrame(new_data, index=new_t) if self.test_data: for i, df in enumerate(self.test_df): new_t = np.arange(df.index[0], df.index[-1], 1 / fs) new_data = { col: interpolate.interp1d(df.index, df[col])(new_t) for col in df } self.test_df[i] = pd.DataFrame(new_data, index=new_t)
[docs] def Filter(self, freq_lim): """ Function to bandpass Filter the Sensor data Parameters ---------- freq_lim : (low_cut, high cut) Defining the low and high cut for bandpass filtering Returns ------- None. Examples ------- Based on the Example from class initialisation, Result should be roughly zero >>> handler.Filter(freq_lim=(10, 700)) >>> filtered_sine = handler.train_df[0]['sine_inp'].values >>> dev = xlow - filtered_sine >>> print(np.mean(dev)) 0.0 """ for i, df in enumerate(self.train_df): self.train_df[i] = butter_bandpass(df, freq_lim[0], freq_lim[1]) if self.test_data: for i, df in enumerate(self.test_df): self.test_df[i] = butter_bandpass(df, freq_lim[0], freq_lim[1])
[docs] def fade_in(self, window_length, window_type="hanning", columns=None): """ Apply the defined window to the train and test data to fade in. Parameters ---------- window_type: str Defining the window function type. Supported window types are: hanning, hamming, blackman, bartlette window_length : int Defining the window length of the half window. Returns ------- Examples ------- Based on the Example from class initialisation, Result should be roughly zero >>> print(handler.test_df[0]['cos_inp'][0]) 1.0 >>> handler.fade_in(10) >>> print(handler.test_df[0]['cos_inp'][0]) 0.0 """ if columns is None: columns = [ self.train_df[0].columns.get_loc(c) for c in self.input_sensors + self.output_sensors ] window = self._get_window(window_type, window_length * 2) # use only the half window window_in = window[:window_length] window_out = window[:-window_length] for i, df in enumerate(self.train_df): self.train_df[i].iloc[:window_length, columns] = df.iloc[ :window_length, columns ].mul(window_in, axis=0) # self.train_df[i].iloc[-window_length:, columns] = df.iloc[-window_length:, columns].mul(window_out, axis=0) if self.test_df is None: return None for i, df in enumerate(self.test_df): self.test_df[i].iloc[:window_length, columns] = np.array( df.iloc[:window_length, columns].mul(window_in, axis=0) )
# elf.test_df[i].iloc[-window_length:, columns] = np.array(df.iloc[-window_length:, columns].mul(window_out, axis=0)) @staticmethod def _get_window(window_type: str, window_length: int) -> np.ndarray: """ Function to return a discrete window function. Parameters ---------- window_type: str Defining the window function type. Supported window types are: Hanning, Hamming, Blackman window_length : int Defining the window length of the window. Returns ------- Output: np.ndarray Returns the discrete window function. """ window_type = window_type.lower() if window_type == "hanning": return np.hanning(window_length) if window_type == "hamming": return np.hamming(window_length) if window_type == "blackman": return np.blackman(window_length) if window_type == "bartlett": return np.bartlett(window_length) print( f'Unsupported window type: {window_type}! Use the default window type "Hanning"' ) return np.hanning(window_length)
[docs] def give_torch_loader( self, window_size, keyword="training", train_ratio=0.8, batch_size=32, rnn_window=None, shuffle=False, Add_zeros=True, forecast=1, full_ds=False, pre_comp=False, n_samples=[5000, 1000], ): """ Gives back a torch dataloader for training, evaluation or testing purpose Parameters ---------- window_size : int Window size for input and output series. keyword : str, optional possibilities are 'training', 'testing', 'short' or ['Name']. 'training' gives a training and validation dataloader using all training data. 'testing' gives a single dataloader using all test data. 'short' gives a training loader with 5000 samples and a validation loader with 1000 samples. ['Name'], list of names, names must be present in either train_names or test_names The default is 'training'. train_ratio : float, optional only needed for keyword 'training'. Defines the ration of training data compared to validation data. The default is .8. batch_size : int, optional Batchsize for the dataloader. The default is 32. rnn_window : int, optional Window size of the recurrent window. The default is None. shuffle : bool, optional Loader is shuffled or not. The default is False. Add_zeros : bool, optional only needed if rnn_window is False, Adds zeros to the beginning of the time series. The default is True forecast : int, optional forcasting horizon pre_comp : bool, optional using precomputed solution or not. The default is False. Returns ------- torch.dataloader one dataloader if train_ratio = 1, otherwise two dataloaders for training and validation """ # get data from keyword if keyword == "short": set_list = self.give_Datasets( window_size, "training", rnn_window, Add_zeros, forecast, full_ds, pre_comp, ) else: set_list = self.give_Datasets( window_size, keyword, rnn_window, Add_zeros, forecast, full_ds, pre_comp ) dataset = ConcatDataset(set_list) # Define data loaders if keyword == "short": return self._split_dataset( dataset, train_ratio, batch_size, shuffle, short=True, n_samples=n_samples, ) else: return self._split_dataset( dataset, train_ratio, batch_size, shuffle, short=False )
def _split_dataset( self, dataset, ratio, batch_size, shuffle, short=False, n_samples=[5000, 1000] ): """ Internal class to split a given dataset in training and evaluation sets Parameters ---------- dataset : torch.utils.data.Dataset dataset to split. ratio : float. (0, 1] ratio to split dataset in training and validation. ratio = 1: dataset isn't split at all ratio < 1: dataset is split with ratio as the percentage of data in the first loader batch_size : int Batchsize for the dataloader. shuffle : bool Loader is shuffled or not. short : bool, optional gives back a short dataloader with 500 samples for training and 1000 for validation. The default is False. Returns ------- torch.dataloader one dataloader if train_ratio = 1, otherwise two dataloaders for training and validation """ if ratio != 1: length_of_set = dataset.__len__() if short: train_samples = n_samples[0] val_samples = n_samples[1] _samples = length_of_set - train_samples - val_samples train_set, val_set, _ = random_split( dataset, (train_samples, val_samples, _samples) ) else: train_samples = int(length_of_set * ratio) val_samples = length_of_set - train_samples train_set, val_set = random_split(dataset, (train_samples, val_samples)) train_loader = DataLoader(train_set, batch_size, shuffle=shuffle) val_loader = DataLoader(val_set, batch_size, shuffle=shuffle) return (train_loader, val_loader) else: if short: dataset = Subset(dataset, np.arange(n_samples[0])) train_loader = DataLoader(dataset, batch_size, shuffle=shuffle) return train_loader
[docs] def give_list( self, window_size, keyword="testing", batch_size=32, Add_zeros=False, rnn_window=None, forecast=1, full_ds=False, ): """ Gives List of DataLoader Parameters ---------- window_size : int Window size for input and output series. keyword : str, optional possibilities are 'training', 'testing', 'short' or [Name]. 'training' gives a training and validation dataloader using all training data. 'testing' gives a single dataloader using all test data. 'short' gives a training loader with 5000 samples and a validation loader with 1000 samples. [Name], gives back a unshuffled loader corresponding to the track name define in init The default is 'training'. batch_size : int, optional Batchsize for the dataloader. The default is 32. Add_zeros : bool, optional Appends zeros at the beginning for Autoregressive models. The default is False. rnn_window : int, optional Window size of the recurrent window. The default is None. forecast : int, optional forecasting horizon Returns ------- list_loader : list of torch.dataloader list of dataloaders with individual Measurements. """ set_list = self.give_Datasets( window_size, keyword, rnn_window, Add_zeros, forecast, full_ds ) list_loader = [DataLoader(s, batch_size, shuffle=False) for s in set_list] return list_loader
[docs] def give_Datasets( self, window_size, keyword="training", rnn_window=None, Add_zeros=True, forecast=1, full_ds=False, pre_comp=False, ): """ Gives List of DataSets Parameters ---------- window_size : int Window size for input and output series. keyword : str, optional possibilities are 'training', 'testing', 'short' or [Name]. 'training' gives a training and validation dataloader using all training data. 'testing' gives a single dataloader using all test data. 'short' gives a training loader with 5000 samples and a validation loader with 1000 samples. [Name], gives back a unshuffled loader corresponding to the track name define in init The default is 'training'. Add_zeros : bool, optional Appends zeros at the beginning for Autoregressive models. The default is False. rnn_window : int, optional Window size of the recurrent window. The default is None. forecast : int, optional forcasting horizon Returns ------- set_list : list of SlidingWindow Datasets list of Datsets with individual Measurements. """ if keyword == "testing": if self.test_data: temp_df = [self.give_dataframe(k) for k in self.test_names] else: raise ValueError( "keyword testing given, but no test_df is " + "defined" ) elif keyword == "training": temp_df = [self.give_dataframe(k) for k in self.train_names] else: temp_df = [self.give_dataframe(k) for k in keyword] set_list = [] pre = self.pre_comp_cols if pre_comp else None for df in temp_df: set_list.append( SlidingWindow( df, window_size, self.output_sensors, self.input_sensors, Add_zeros, rnn_window, forecast, full_ds, pre, ) ) return set_list
[docs] def give_dataframes(self, keywords): """ Returns list f dataframes Parameters ---------- keywords : list of str or 'training' or 'testing' rdefines which dataframes will be returned. list of str return list of same length with dataframes 'training' or 'testing' return list of dfs in training / testing Returns ------- dfs : list of dfs List of Dataframes. """ if keywords == "testing": dfs = [self.give_dataframe(k) for k in self.test_names] elif keywords == "training": dfs = [self.give_dataframe(k) for k in self.train_names] else: dfs = [self.give_dataframe(k) for k in keywords] return dfs
[docs] def give_dataframe(self, Name): """ Gives back Dataframe corresponding to the specific name Parameters ---------- Name : str String that matches train or test name defined in init. Returns ------- df : pd.DataFrame DataFrame that corresponds to the given Name. """ idx, Train = self._get_idx(Name) if Train: df = self.train_df[idx] else: df = self.test_df[idx] return df
def _get_idx(self, name): """ Gives index of df name in df list Parameters ---------- Name : str Name of the df the index is needed. Returns ------- idx : int index in df list. train : bool True if name is in training names, False if in test names. """ idx = None if name in self.train_names: idx = self.train_names.index(name) train = True return idx, train if self.test_data: if name in self.test_names: idx = self.test_names.index(name) train = False return idx, train raise ValueError(f"{name} not in train_names or test_names") def _check_input(self, df_list, name_list, columns): """ Internal class to check whether the input is valid for preprocessing. Checks whether: df_list is a list of pd.DataFrames names_list is a list of str with the same length as df_list columns is a list of str all columns are present in all DataFrames Parameters ---------- df_list : list valid input is a list of pandas DataFrames name_list : list valid input is a list of str with the same length as df_list. columns : list valid input is a list of str with column names. Raises ------ ValueError raises Error if one of the conditions is not met. Returns ------- None. """ # check list of dfs if type(df_list) is list: for df in df_list: if not isinstance(df, pd.DataFrame): raise ValueError("Not all list entries are pd.DataFrame" + "type") else: if not set(columns).issubset(set(df.columns)): raise ValueError( "columns are not present in all" + " dataframes" ) else: raise ValueError("Entry is not a list") # check list of names if type(name_list) is not list: raise ValueError("Entry is not a list") else: for name in name_list: if not isinstance(name, str): raise ValueError("not all list entries are type str") # check if ist have same length if not len(df_list) == len(name_list): raise ValueError("length of lists does not match") def _check_naming(self, names): """ Internal class to check for duplicated names. Duplication can lead to problems when specific Measurements need to be accessed Parameters ---------- names : list of str list of str to check for duplication. Raises ------ Warning if there are duplicated names, a warning is raised. Returns ------- None. """ if not len(names) == len(set(names)): raise Warning( "there are dublicated names in train_names and/or" + "test_names" )