Source code for softsensor.datasets

# -*- coding: utf-8 -*-
"""
Created on Mon Feb 27 18:16:57 2023

The methods and classes considered define the type of data set used. They are
Sliding Windows over the individual time series out of pd.DataFrames
@author: Tobias Westmeier CR/AMP4
"""
import torch
import math
import numpy as np
from torch.utils.data import Dataset


[docs] class SlidingWindow(Dataset): """ Initialize a sliding window class from a pandas Dataframe in torch.utils.Dataset format with a tuple of input and output data The dataframe is split into individual tensors with a length of windowsize and width if len(input_columns) using a sliding window approach for the input data if rnn_window is used, two tensors are generated for each time step, where the additional tensor defines the past output data with length of rnn_window and width of len(output_columns) Parameters ---------- df : pd.DataFrame pandas DataFrame with named columns and time dependent data. windowsize : int sliding window length. output_columns : list of str list of columns used for output. input_columns : list of str list of columns used for input. Add_zeros : TYPE, optional Adds zeros at the beginning of the time series to generate a dataset with as many inputs as the length of the windowsize. The default is False. rnn_window : int, optional use an additional sliding window as input in which the past output values are saved. The default is None. pre_comp : list of str, optional precomputed solution for models with student forcing used for second input instead of output columns. The default is None. Returns ------- None Notes ----- The use of a recurrent time window (rnn_window) ensures that zeros are artificially placed at the beginning of the time series to keep the dimension of the time series constant in the output SlidingWindow.__len__() returns length of df). If a rnn_window is specified, the time series is shortened. If the output length is to be kept, Add_zeros=True must be used. Examples -------- Example of a pure feed forward SlidingWindow dataset >>> import softsensor.datasets as ds >>> import pandas as pd >>> import numpy as np >>> d = {'in_col': np.linspace(0, 100, 101), 'out_col': np.linspace(100, 0, 101)} >>> df = pd.DataFrame(d) >>> sw = ds.SlidingWindow(df, 10, ['out_col'], ['in_col']) >>> print(sw.__len__()) 92 >>> print(sw.__getitem__(1)) (tensor([[0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]]), tensor([[91.]])) Example of a SlidingWindow dataset with recurrent connection >>> import softsensor.datasets as ds >>> import pandas as pd >>> import numpy as np >>> d = {'in_col': np.linspace(0, 100, 101), 'in_col2': np.linspace(0, 100, 101), 'out_col': np.linspace(100, 0, 101)} >>> df = pd.DataFrame(d) >>> sw = ds.SlidingWindow(df, 3, ['out_col'], ['in_col'], rnn_window=3) >>> print(sw.__len__()) 101 >>> print(sw.__getitem__(2)) ((tensor([[0., 1., 2.]]), tensor([[ 0., 100., 99.]])), tensor([[98.]])) Example of a SlidingWindow dataset with recurrent connection and precomputed prediction >>> import softsensor.datasets as ds >>> import pandas as pd >>> import numpy as np >>> d = {'in_col': np.linspace(0, 100, 101), 'in_col2': np.linspace(0, 100, 101), 'out_col': np.linspace(100, 0, 101), 'out_col_precomp': np.linspace(100, 0, 101) + 100} >>> df = pd.DataFrame(d) >>> sw = ds.SlidingWindow(df, 3, ['out_col'], ['in_col'], rnn_window=3, pre_comp=['out_col_precomp']) >>> print(sw.__len__()) 101 >>> print(sw.__getitem__(2)) ((tensor([[0., 1., 2.]]), tensor([[ 0., 200., 199.]])), tensor([[98.]])) """ def __init__(self, df, windowsize, output_columns, input_columns, Add_zeros=False, rnn_window=None, forecast=1, full_ds=True, pre_comp=None): self.windowsize = windowsize self.rnn_window = rnn_window self.num_inp = len(input_columns) self.num_out = len(output_columns) self.forecast = forecast self.pre_comp = pre_comp self.pre = True if pre_comp is not None else False # Read in important data as tensors self.data_y = df[output_columns] self.data_y = torch.transpose(torch.tensor(self.data_y.values), 0, 1) self.data_x = df[input_columns] self.data_x = torch.transpose(torch.tensor(self.data_x.values), 0, 1) if self.pre: self.data_y_pre = df[pre_comp] self.data_y_pre = torch.transpose(torch.tensor(self.data_y_pre.values), 0, 1) else: self.data_y_pre = None if rnn_window is None: self.subclass = ff_SlidingWindow(self.windowsize, self.data_x, self.data_y, Add_zeros, forecast, full_ds) else: self.subclass = rec_SlidingWindow(self.windowsize, self.data_x, self.data_y, rnn_window, forecast, full_ds, self.data_y_pre, self.pre)
[docs] def __getitem__(self, index): """ Takes index and gives back time window for input and subsequent target output Parameters ---------- index : int: the starting point for the time window Returns ------- x : torch.Tensor if rnn_window is None: Tensor of shape [len(input_columns), windowsize] if rnn_windwo is not None: tuple of Tensors with shape ([len(input_columns), windowsize], [len(output_columns), rnn_window]) y : torch.Tensor Tensor of shape [output_channels, 1] """ return self.subclass.__getitem__(index)
[docs] def __len__(self): """ Returns ------- number_of_samples : int Samples in Dataset """ return self.subclass.__len__()
[docs] class rec_SlidingWindow(Dataset): """ Subclass for SlidingWindow class two tensors are generated for each time step, where one tensor defines the input x and the second one the past output data with length of rnn_window and width of len(output_columns) Parameters ---------- windowsize : int sliding window length. data_x : torch.tensor input data data_y : torch.tensor output data as well as data that serves as second input rnn_window : int, optional use an additional sliding window as input in which the past output values are saved. The default is None. Returns ------- None """ def __init__(self, windowsize, data_x, data_y, rnn_window, forecast, full_ds, data_y_pre=None, pre=False): self.windowsize = windowsize self.rnn_window = rnn_window self.forecast = forecast self.data_x = data_x self.data_y = data_y self.full_ds = full_ds self.orig_length = self.data_x.shape[1] self.pre = pre if self.pre: self.data_y_pre = data_y_pre # use modulus to define datast length that is divisable by forecast if rnn_window > (windowsize-forecast): self.size = self.rnn_window self.offset = self.rnn_window - windowsize + forecast else: self.size = self.windowsize - forecast self.offset = 0 # Add Zeros to the beginning of input and output data zeros_inp_begin = torch.zeros((self.data_x.shape[0], self.size)) zeros_out_begin = torch.zeros((self.data_y.shape[0], self.size)) # Add zeros at the end to match dimensions for forcasting horizon self.mod = self.data_x.shape[1] % forecast if self.mod == 0: self.add_zero = 0 self.data_x = torch.cat((zeros_inp_begin, self.data_x), dim=1) self.data_y = torch.cat((zeros_out_begin, self.data_y), dim=1) if self.pre: self.data_y_pre = torch.cat((zeros_out_begin, self.data_y_pre), dim=1) else: self.add_zero = forecast - self.mod zeros_inp_end = torch.zeros((self.data_x.shape[0], self.add_zero)) zeros_out_end = torch.zeros((self.data_y.shape[0], self.add_zero)) self.data_x = torch.cat((zeros_inp_begin, self.data_x, zeros_inp_end), dim=1) self.data_y = torch.cat((zeros_out_begin, self.data_y, zeros_out_end), dim=1) if self.pre: self.data_y_pre = torch.cat((zeros_out_begin, self.data_y_pre, zeros_out_end), dim=1)
[docs] def __getitem__(self, index): """ Takes index and gives back time window for input and subsequent target output Parameters ---------- index : int: the starting point for the time window Returns ------- x : torch.Tensor tuple of Tensors with shape ([len(input_columns), windowsize], [len(output_columns), rnn_window]) y : torch.Tensor Tensor of shape [output_channels, 1] """ if not self.full_ds: index = index * self.forecast index = index + self.offset x1 = self.data_x[:, index:index+self.windowsize] if self.pre: x2 = self.data_y_pre[:, (index+self.windowsize-self.rnn_window-self.forecast): (index+self.windowsize-self.forecast)] else: x2 = self.data_y[:, (index+self.windowsize-self.rnn_window-self.forecast): (index+self.windowsize-self.forecast)] x = (x1.float(), x2.float()) y = self.data_y[:, index + self.windowsize - self.forecast: index + self.windowsize] y = y.float() return x, y
[docs] def __len__(self): """ Returns ------- number_of_samples : int Samples in Dataset """ if self.mod == 0: samples = self.orig_length - self.forecast + 1 else: samples = self.orig_length - self.mod + 1 if not self.full_ds: samples = math.ceil(samples / self.forecast) return samples
[docs] class ff_SlidingWindow(Dataset): """ Subclass for SlidingWindow class (feed forward) two tensors are generated for each time step, where one tensor defines the input x and the second one the past output data with length of rnn_window and width of len(output_columns) Parameters ---------- windowsize : int sliding window length. data_x : torch.tensor input data data_y : torch.tensor output data Add_zeros : bool Adds zeros at the beginning of the time series with length of windowsize-1. Returns ------- None """ def __init__(self, windowsize, data_x, data_y, Add_zeros, forecast, full_ds): self.windowsize = windowsize self.forecast = forecast self.data_x = data_x self.data_y = data_y self.orig_length = self.data_x.shape[1] self.Add_zeros = Add_zeros self.full_ds = full_ds if Add_zeros: # Add Zeros to the beginning of input and data zeros_inp_begin = torch.zeros((self.data_x.shape[0], windowsize - forecast)) zeros_out_begin = torch.zeros((self.data_y.shape[0], windowsize - forecast)) self.data_x = torch.cat((zeros_inp_begin, self.data_x), dim=1) self.data_y = torch.cat((zeros_out_begin, self.data_y), dim=1) # Add zeros at the end to match dimensions for forcasting horizon self.mod = (self.data_x.shape[1] - windowsize + forecast) % forecast if self.mod != 0: self.add_zero = forecast - self.mod zeros_inp_end = torch.zeros((self.data_x.shape[0], self.add_zero)) zeros_out_end = torch.zeros((self.data_y.shape[0], self.add_zero)) self.data_x = torch.cat((self.data_x, zeros_inp_end), dim=1) self.data_y = torch.cat((self.data_y, zeros_out_end), dim=1) else: self.add_zero = 0
[docs] def __getitem__(self, index): """ Takes index and gives back time window for input and subsequent target output Parameters ---------- index : int: the starting point for the time window Returns ------- x : torch.Tensor shape: [len(input_columns), windowsize] y : torch.Tensor Tensor of shape [output_channels, 1] """ if not self.full_ds: index = index * self.forecast x = self.data_x[:, index:index+self.windowsize] y = self.data_y[:, index + self.windowsize - self.forecast: index + self.windowsize] x, y, = x.float(), y.float() return x, y
[docs] def __len__(self): """ Returns ------- number_of_samples : int Samples in Dataset """ if self.Add_zeros: if self.mod == 0: samples = self.orig_length - self.forecast + 1 else: samples = self.orig_length - self.mod + 1 else: if self.mod == 0: samples = self.orig_length - self.windowsize + 1 else: samples = self.orig_length - self.windowsize + 1 + self.forecast - self.mod if not self.full_ds: samples = math.ceil(samples / self.forecast) return samples
[docs] class batch_rec_SW(Dataset): """ Batching of multiple sliding window classes for parallelisation purposes Parameters ---------- list_of_sw: list of SlidingWindow list of individual SlidingWindow classes for batching Returns ------- None. """ def __init__(self, list_of_sw): # directly use the subclass instead of the wrapper self.sws = [sw.subclass for sw in list_of_sw] # Get some important parameters from class self.size = self.sws[0].size self.forecast = self.sws[0].forecast self.num_inp = list_of_sw[0].num_inp self.num_out = list_of_sw[0].num_out self.offset = self.sws[0].offset self.windowsize = self.sws[0].windowsize self.rnn_window = self.sws[0].rnn_window self.lengths = [sw.__len__() for sw in list_of_sw] self.sws = [x for _, _, x in sorted(zip(self.lengths, enumerate(self.sws), self.sws))][::-1] self.original_ind = np.flip(np.argsort(self.lengths)) self.lengths = sorted(self.lengths)[::-1] self.valid_sws = [] num = len(self.lengths) ii = 0 for i in self.lengths[::-1]: self.valid_sws = self.valid_sws + (i - ii) * [num] ii = i num = num - 1 self.data_x = torch.full((self.valid_sws[0], self.num_inp, self.lengths[0]*self.forecast+ self.size), float('nan')) self.data_y = torch.full((self.valid_sws[0], self.num_out, self.lengths[0]*self.forecast + self.size), float('nan')) for i, sw in enumerate(self.sws): self.data_x[i, :, :sw.__len__()*self.forecast + self.size] = sw.data_x self.data_y[i, :, :sw.__len__()*self.forecast + self.size] = sw.data_y
[docs] def __getitem__(self, index): """ Takes index and gives back time window for input and subsequent target output. Ouput is batches for individual time series. batch_size depends on the numbr of SlidingWindow datasaets that have .__len__() >= index Parameters ---------- index : int: the starting point for the time window Returns ------- x : (torch.Tensor, torch.Tensor) Tensor with shape ([batch_size, len(input_columns), windowsize], [batch_size, len(output_columns), rnn_window]) y : torch.Tensor Tensor of shape [batch_size, output_channels, 1] """ ind = index index = index * self.forecast + self.offset x1 = self.data_x[:self.valid_sws[ind], :, index:index+self.windowsize] x2 = self.data_y[:self.valid_sws[ind], :, (index+self.windowsize-self.rnn_window-self.forecast): (index+self.windowsize - self.forecast)] x = (x1.float(), x2.float()) y = self.data_y[:self.valid_sws[ind], :, index + self.windowsize - self.forecast: index + self.windowsize] y = y.float() return x, y
[docs] def __len__(self): """ Returns ------- number_of_samples : int Samples in Dataset """ return self.lengths[0]
def __lengths__(self): """ Returns ------- list of int sorted list with length of each time series considered. """ return self.lengths def __widths__(self): """ Returns ------- list of int dim:[self.__len__()] number of SlidingWindow classes for which the index is valid. """ return self.valid_sws
[docs] def permutation(self): """ Returns ------- list of int dim:[len(list_of_sw)] returns original indizes before permutation """ return self.original_ind