Source code for softsensor.linear_methods

# -*- coding: utf-8 -*-
"""
Created on Fri Mar  4 13:04:39 2022

@author: Daniel Kreuter
"""
import numpy as np
import pandas as pd
from scipy.signal import csd
import scipy.linalg as la
from scipy.signal import ShortTimeFFT, get_window


[docs] class tf: """ linear methods based on FFT and short term FFT Calculate the linear MiMo Transferfunction similar to Matlab tfestimate: https://uk.mathworks.com/help/signal/ref/tfestimate.html#bufqg8e Parameters ---------- NFFT : int Length of the fft. The default ist 512. fs : int sample frequency. The default ist 1024. no_overlap : int Points overlapping. If None, than it is defined by no_overlap = np.fix(0.67 * NFFT) The default is None. spectrum_type : str: 'spectrum' or 'psd'. The default is 'spectrum'. Returns ------- None. Example ------- Computing the linear transfer function of a linear system with white noise excitation >>> import softsensor.data_gen as dg >>> import softsensor.linear_methods as lm >>> import matplotlib.pyplot as plt >>> import numpy as np >>> time = np.linspace(0, 100, 1000) >>> params = {'D': 0.05, 'c_nlin': 0} >>> F = dg.white_noise(time) >>> df = dg.get_academic_data(time, Model='Duffing', F=F, params=params) >>> model = lm.tf(NFFT=512, fs=fs, no_overlap=256) >>> model.fit([df], ['F(t)'], ['x']) >>> output = model.prediction(df, ['x']) >>> plt.plot(df.index, df['x'], label='original') >>> plt.plot(df.index, output, label='prediciton') .. image:: C:/Users/wet2rng/Desktop/Coding/SoftSensor/doc/img/lienar_tf.png """ def __init__(self, window_size=512, fs=1024, hop=256, spectrum_type="spectrum"): super().__init__() self.Type = "TF" self.window_size = window_size self.fs = fs self.hop = hop self.spectrum_type = spectrum_type # Short-Time Fourier Transformation Object self.window = get_window("hann", self.window_size) self.STFT = ShortTimeFFT( self.window, hop=self.hop, fs=self.fs, scale_to="magnitude" if spectrum_type == "spectrum" else "psd", )
[docs] def fit(self, df_list, inp_sens, out_sens): """ Fit linear TF to list of dfs by taking the mean transfer function Parameters ---------- df_list : list of pd.DataFrame list of training Files. inp_sens : list of str input sensors. out_sens : list of str output sensors. Returns ------- None. """ self.inp_sens = inp_sens self.out_sens = out_sens tf_list = [] for df in df_list: tf, freq = self.tf_mimo(df[self.inp_sens], df[self.out_sens]) tf_list.append(tf) self.tf = np.mean(np.stack(tf_list, axis=1), axis=1) self.frequency = freq
[docs] def prediction(self, test_df, columns=None, boundary=True): """ Predict test_df Parameters ---------- test_df : pd.DataFrame dtaFrame for the precition, needs columns that have the same name as in fit. columns : list of str, optional names for the prediction. The default is None. Returns ------- ts_pred : pd.DataFrame returns dataframe with linear_TF prediction as column. """ out_sfft = self.sfft(test_df[self.inp_sens]) out_sfft = self.tf @ np.moveaxis(out_sfft, 0, 1) ts_pred = self.isfft( np.moveaxis(out_sfft, 1, 0), columns=columns, boundary=boundary ) if columns is None: ts_pred = ts_pred[: len(test_df), :] else: ts_pred = ts_pred.iloc[: len(test_df), :] return ts_pred
[docs] def sfft(self, ts): """ computes the short term fast Fourier transformation (stft) for each input column of the DataFrame Parameters ---------- ts : pd.DataFrame pandas data frame. Returns ------- STFT : array The Fourier transformed signal. """ self.signal_len = ts.shape[0] Zxx = self.STFT.stft(ts.T.to_numpy()) Zxx = 2 * Zxx # one sided spectrum !!! self.frequency = self.STFT.f return Zxx
def _get_rayleigh_ampl(self, Zxx): rg = np.random.default_rng() ampl = np.abs(Zxx) phase = np.angle(Zxx) ampl_rayleigh = rg.rayleigh(scale=ampl) return ampl_rayleigh * np.exp(1j * phase)
[docs] def isfft(self, Zxx, columns=None, ampl_dist=None, boundary=True): """ Computes the inverse Fourier transformation Parameters ---------- Zxx : Array, Fourier transformed signal. columns : list of col names, optional Column names of the output df. The default is None. ampl_dist : None, str, optional if STFT is a PSD, you can define the amplitude distribution with a Raleigh or mixed distribution. Options are *None*, *'rayleigh'* or *'mixed'*. The default is None. Returns ------- TS_IFFT : TYPE DESCRIPTION. """ if ampl_dist == "rayleigh": Zxx = self._get_rayleigh_ampl(Zxx) if ampl_dist == "mixed": Zxx = 0.5 * (Zxx + self._get_rayleigh_ampl(Zxx)) out = self.STFT.istft( 0.5 * Zxx ) # Rescaling with 0.5: We have stored the stft with factor 2 out = out.T if columns is not None: out = pd.DataFrame( out, columns=columns, index=pd.Index( np.linspace( 0, out.shape[0] / self.fs, out.shape[0], endpoint=False ), name="time", ), ) out = out.iloc[: self.signal_len] else: out = out[: self.signal_len] return out
[docs] def tf_mimo(self, inp_df, out_df): """ ToDo: replace with scipy.signal.csd Calculate the linear MiMo Transferfunction similar to Matlab tfestimate: https://uk.mathworks.com/help/signal/ref/tfestimate.html#bufqg8e Parameters ---------- inp_df: pandas DataFrame input x(t) (index is time stamp), out_df: pandas DataFrame output y(t) (index is time stamp), Returns ------- tf: numpy array transfer function of shape (freq steps,no of out signals, no of input signal), complex f: numpy array corresponding frequency array """ csd_tot = np.empty( (int(self.STFT.mfft / 2 + 1), len(out_df.columns), len(inp_df.columns)), dtype=np.complex128, ) csd_tot_in = np.empty( (int(self.STFT.mfft / 2 + 1), len(inp_df.columns), len(inp_df.columns)), dtype=np.complex128, ) ii = 0 for colin in inp_df: oo = 0 ii_in = 0 for colout in out_df: freq, csd_akt = csd( inp_df[colin].to_numpy(), out_df[colout].to_numpy(), nfft=self.STFT.mfft, fs=self.fs, nperseg=self.STFT.mfft, detrend=False, noverlap=int(self.window_size - self.hop), ) # with hanning window csd_tot[:, oo, ii] = csd_akt oo += 1 for colin_psd in inp_df: _, csd_in = csd( inp_df[colin].to_numpy(), inp_df[colin_psd].to_numpy(), nfft=self.STFT.mfft, fs=self.fs, nperseg=self.STFT.mfft, detrend=False, noverlap=int(self.window_size - self.hop), ) # with hanning window csd_tot_in[:, ii_in, ii] = csd_in ii_in += 1 ii += 1 csd_tot_in_inv = np.empty_like(csd_tot_in) for ii in range(int(self.STFT.mfft / 2 + 1)): csd_tot_in_inv[ii, :, :] = la.pinv(csd_tot_in[ii, :, :]) return csd_tot @ csd_tot_in_inv, freq