# -*- coding: utf-8 -*-
from dataclasses import dataclass, field
from typing import List, Tuple
import warnings
from typing import Optional
import numpy as np
import pandas as pd
from scipy.signal import tf2zpk
@dataclass
class _Window:
"""Represents the sliding window"""
start: int
end: int
length: int = field(init=False)
window: range = field(init=False)
def __post_init__(self):
self.window = range(self.start, self.end)
self.length = len(self.window)
def slide(self, step: int) -> range:
"""
Slide the window to the right for n steps.
Parameters
----------
step: int
Integer for determining the sliding step.
Returns
-------
output: range
Returns the shifted window.
"""
return range(self.start + step, self.end + step)
def reverse(self) -> range:
"""
Reverse the window.
Parameters
----------
Returns
-------
output: range
Returns the reversed window.
"""
return self.window[::-1]
def slide_rev(self, step: int) -> range:
"""
Slide and reverse the window for n steps.
Parameters
----------
step: int
Integer for determining the sliding step.
Returns
-------
output: range
Returns the shifted and reversed window.
"""
return self.slide(step)[::-1]
def slide_periodic(self, step: int, times: int) -> List[int]:
"""
Slide the window to the right for n steps and m times.
Parameters
----------
step: int
Integer for determining the sliding step.
times: int
Integer for determining the repetitions.
Returns
-------
output: list[int]
Returns the shifted and periodic window.
"""
indices = []
for i in range(times):
indices.extend(list(self.slide(i * step)))
return indices
def slide_periodic_rev(self, step: int, times: int) -> List[int]:
"""
Slide the window to the right starting in the back for n steps and m times.
Parameters
----------
step: int
Integer for determining the sliding step.
times: int
Integer for determining the repetitions.
Returns
-------
output: list[int]
Returns the shifted and periodic window.
"""
indices = []
for i in range(times)[::-1]:
indices.extend(list(self.slide(i * step)))
return indices
def slide_rev_periodic(self, step: int, times: int) -> List[int]:
"""
Slide the window to the right for n steps and m times and reverse the window range.
Parameters
----------
step: int
Integer for determining the sliding step.
times: int
Integer for determining the repetitions.
Returns
-------
output: list[int]
Returns the shifted, reversed and periodic window.
"""
indices = []
for i in range(times):
indices.extend(list(self.slide_rev(i * step)))
return indices
def slide_rev_periodic_rev(self, step: int, times: int) -> List[int]:
"""
Slide the window to the right starting in the back for n steps and m times and reverse the window range.
Parameters
----------
step: int
Integer for determining the sliding step.
times: int
Integer for determining the repetitions.
Returns
-------
output: list[int]
Returns the shifted, reversed and periodic window.
"""
indices = []
for i in range(times)[::-1]:
indices.extend(list(self.slide_rev(i * step)))
return indices
@dataclass
class _StaticNumbers:
"""Represents all static numbers and length in an ARX model."""
na: int
nb: int
input_sensors: int
output_sensors: int
data_frames: int
data_lengths: List[int]
inputs: int = field(init=False)
outputs: int = field(init=False)
output_inputs: int = field(init=False)
outputs_inputs: int = field(init=False)
def __post_init__(self):
self.inputs = self.nb * self.input_sensors
self.outputs = self.na * self.output_sensors
self.output_inputs = self.na + self.inputs
self.outputs_inputs = self.outputs + self.inputs
[docs]
class ARX:
"""
Represents an AutoRegressive with eXogenous input model.
Parameters
----------
order: Tuple[int, int]
Parameter for the order of the outputs and inputs in den equation (na, nb)#
Returns
-------
None.
Example
-------
>>> import pandas as pd
>>> import numpy as np
>>> import softsensor.arx as arx
>>> d = {'in_col': np.linspace(0, 100, 101),
'out_col': np.linspace(100, 0, 101)}
>>> df = pd.DataFrame(d)
>>> arx = arx.ARX(order=(2, 2))
>>> arx.fit(data_train=[df], input_sensors=['in_col'], output_sensors=['out_col'])
>>> print(len(arx.parameters))
4
"""
na: int
nb: int
input_sensors: List[str] = []
output_sensors: List[str] = []
windows: List[_Window] = None
num: _StaticNumbers = None
parameters: np.ndarray = np.zeros(0)
regression_matrix: np.ndarray = np.zeros(0)
output_matrix: np.ndarray = np.zeros(0)
relevant_indices: List[int] = []
def __init__(self, order: Tuple[int, int]):
self.na, self.nb = order
self.Type = 'ARX'
[docs]
def fit(self, data_train: List[pd.DataFrame], input_sensors: List[str], output_sensors: List[str],
windows: List[Tuple[int, int]] = None, verbose: bool = False) -> None:
"""
Fit the ARX model
Parameters
----------
data_train: list[pd.DataFrame]
Training data
input_sensors: list[str]
Name of the input sensors.
output_sensors: list[str]
Name of the output sensors.
windows: Optional[list[tuple[int, int]]]
Set the windows for fitting the ARX parameters.
verbose: bool
Verbose flag for additional information.
Returns
-------
None
"""
len_data_frames = len(data_train)
data_lengths = [data_train[i].shape[0] for i in range(len_data_frames)]
self._initialize(input_sensors, output_sensors, len_data_frames, data_lengths, windows)
self.regression_matrix = self._build_regression_matrix(data_train)
self.output_matrix = self.build_data_matrix(data_train, data_type='outputs')
self.relevant_indices = []
for i, window in enumerate(self.windows):
data_length = self.num.data_lengths[i-1] if i > 0 else 0
self.relevant_indices.extend(list(window.slide_rev(data_length)))
relevant_outputs = self.output_matrix[self.relevant_indices, :]
indices_inputs = list(range(self.num.outputs, self.num.outputs_inputs))
for i in range(self.num.output_sensors):
indices = list(range(self.na*i, self.na * (i+1)))
indices.extend(indices_inputs)
self.parameters[:, i] = np.dot(np.linalg.pinv(self.regression_matrix[:, indices]),
relevant_outputs[:, i])
if verbose:
model_name = 'ARX'
print(f'\n{model_name}-Model')
for index, sensor_name in enumerate(self.output_sensors):
ARX.print_parameters(model_name, sensor_name, self.parameters[:, index])
stable, poles = self._is_stable()
if not stable:
warnings.warn(f'The {type(self).__name__} model is unstable with poles at {poles}!', UserWarning)
[docs]
def prediction(self, data_test: pd.DataFrame) -> pd.DataFrame:
"""
Predict the outputs to the input data
Parameters
----------
data_test: pd.DataFrame
Input to the system
Returns
-------
output: pd.DataFrame
Predicted output of the system
"""
input_data = data_test[self.input_sensors].values
output_df = pd.DataFrame(index=data_test.index)
for i, sensor in enumerate(self.output_sensors):
output = self._predict_single_output(input_data, index=i)
output_df[sensor] = output
return output_df
def _predict_single_output(self, input_data: np.ndarray, index: int, noise: np.ndarray = None) -> np.ndarray:
"""
Predict the outputs to the input data
Parameters
----------
input_data: np.ndarray
Input to the system
index: int
Index of the output
noise: np.ndarray
Noise sequence
Returns
-------
output: np.ndarray
Predicted output of the system
"""
output = np.zeros(input_data.shape[0])
latest_regression_vec = np.zeros_like(self.parameters[:, index].T)
for i, input_values in enumerate(input_data):
latest_regression_vec[self.na:self.num.output_inputs:self.nb] = input_values
output[i] = np.dot(latest_regression_vec, self.parameters[:, index])
latest_regression_vec = np.roll(latest_regression_vec, 1)
latest_regression_vec[0] = output[i]
if noise is not None:
latest_regression_vec[self.num.output_inputs] = noise[i]
return output
def _build_regression_matrix(self, data_train: List[pd.DataFrame]) -> np.ndarray:
"""
Build the regression matrix.
Parameters
----------
data_train: list[pd.DataFrame]
Training data
Returns
-------
output: np.ndarray
Returns the regression matrix.
"""
lengths = [0] + [window.length for window in self.windows]
regression_matrix = np.zeros((np.sum(lengths), self.num.outputs_inputs))
for index, data in enumerate(data_train):
inputs = data[self.input_sensors].values
outputs = data[self.output_sensors].values
row_index = lengths[index]
regression_matrix = self._update_regression_matrix(regression_matrix, row_index,
self.windows[index], inputs, outputs)
return regression_matrix
def _update_regression_matrix(self, regression_matrix: np.ndarray, row_index: int, window: _Window,
input_data: Optional[np.ndarray] = None,
output_data: Optional[np.ndarray] = None) -> np.ndarray:
"""
Update the regression matrix.
Parameters
----------
regression_matrix: np.ndarray
Regression matrix to modify
row_index: int
Start position in the regression matrix
window: Window
Relevant window
input_data: Optional[np.ndarray]
Input to the system
output_data: Optional[np.ndarray]
Output from the system
Returns
-------
output: np.ndarray
Updated regression matrix
"""
rows = range(row_index, row_index + window.length)
temporary_matrix = regression_matrix[rows, :]
if output_data is not None:
for i in range(self.na):
columns = range(i, self.num.outputs, self.na)
temporary_matrix[:, columns] = output_data[window.slide_rev(-i - 1), :]
if input_data is not None:
for i in range(self.nb):
columns = range(self.num.outputs + i, self.num.outputs_inputs, self.nb)
temporary_matrix[:, columns] = input_data[window.slide_rev(-i), :]
regression_matrix[rows, :] = temporary_matrix
return regression_matrix
[docs]
def build_data_matrix(self, data_train: List[pd.DataFrame], data_type: str) -> np.ndarray:
"""
Build the output matrix or input matrix with all data points from list of DataFrames.
Parameters
----------
data_train: list[pd.DataFrame]
Training data
data_type: str
Selector to create the outputs or inputs
Returns
-------
output: tuple[np.ndarray, np.ndarray]
Output matrix
"""
create_inputs = data_type == 'inputs'
num_columns = self.num.input_sensors if create_inputs else self.num.output_sensors
sensor_names = self.input_sensors if create_inputs else self.output_sensors
data_matrix = np.zeros((np.sum(self.num.data_lengths), num_columns))
start = 0
for index, data in enumerate(data_train):
data_matrix[start:start + self.num.data_lengths[index], :] = data[sensor_names].values
start += self.num.data_lengths[index]
return data_matrix
def _initialize(self, input_sensors: List[str], output_sensors: List[str], len_data_frames: int,
data_lengths: List[int], windows: List[Tuple[int, int]] = None) -> None:
"""
Initialize the class member variables.
Parameters
----------
input_sensors: list[str]
Name of the input sensors.
output_sensors: list[str]
Name of the output sensors.
len_data_frames: int
Length of the different datasets.
data_lengths: int
Lengths of the data.
windows: Optional[list[tuple[int, int]]]
Set the windows for fitting the ARX parameters.
Returns
-------
None
"""
self.input_sensors = input_sensors
self.output_sensors = output_sensors
self.num = _StaticNumbers(self.na, self.nb, len(self.input_sensors),
len(self.output_sensors), len_data_frames, data_lengths)
shape = (self.num.output_inputs, self.num.output_sensors)
self.parameters = np.ones(shape=shape)
if windows is None or len(windows) != len_data_frames:
windows = [(0, length) for length in data_lengths]
self.windows = []
for window, data_len in zip(windows, data_lengths):
if (window[1] - window[0]) > data_len:
window = (0, data_len)
length = min(data_len - self.na, data_len - self.nb, window[1] - window[0])
self.windows.append(_Window(window[1] - length, window[1]))
def _is_stable(self) -> Tuple[bool, np.ndarray]:
"""
Determine the stability by calculating the poles
Parameters
----------
Returns
-------
output: tuple[bool, np.ndarray]
Returns the flag for the stability and all poles
"""
all_poles = np.zeros(0)
for i in range(self.num.output_sensors):
parameters = self.parameters[:, i]
denominator_polynomial = parameters[:self.na]
denominator_polynomial = np.insert(denominator_polynomial, 0, 1)
_, poles, _ = tf2zpk([1], denominator_polynomial)
if poles.dtype == complex:
poles = np.abs(poles)
all_poles = np.append(all_poles, poles)
all_poles = np.unique(all_poles)
unstable_poles = all_poles[(all_poles <= -1) | (all_poles >= 1)]
return len(unstable_poles) == 0, unstable_poles
[docs]
@staticmethod
def print_parameters(model_name: str, sensor_name: str, parameters: np.ndarray):
"""
Prints the parameters formatted according to the sensor name.
Parameters
----------
model_name: str
The model name as string.
sensor_name: str
The sensor name as string.
parameters: np.ndarray
The parameters according to the sensor name.
Returns
-------
"""
print(f'Output sensor: {sensor_name}')
filtered_parameters = str(parameters).replace("\n", "")
print(f'\t {model_name}-Parameters: {filtered_parameters}')