Bases: Task
Abstraction of owl:class stats:StatisticCalculation.
This class represents a task for calculating a statistic.
Source code in exe_kg_lib/classes/tasks/statistic_tasks.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 | class StatisticCalculation(Task):
"""
Abstraction of owl:class stats:StatisticCalculation.
This class represents a task for calculating a statistic.
"""
@abstractmethod
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
"""
Calculates a statistic. The data to use are determined by self.inputs.
Expects one input data value with name "DataInStatisticCalculation".
Args:
other_task_output_dict (dict): A dictionary containing the output of other tasks.
input_data (pd.DataFrame): The input data of the ExeKG's pipeline.
Returns:
Dict[str, Any]: A dictionary containing the calculated statistic with the key "DataOutStatisticCalculation".
Raises:
NotImplementedError: If the statistic is not supported.
"""
input_dict = self.get_inputs(other_task_output_dict, input_data)
input_data = input_dict["DataInStatisticCalculation"]
input = input_data[0]["value"] # assume one input
method_module = self.method.resolve_module(module_name_to_snakecase=True)
if "numpy" in method_module.__module__:
statistic_result = method_module(input, **self.method.params_dict)
return self.create_output_dict({"DataOutStatisticCalculation": statistic_result})
else:
raise NotImplementedError("Only numpy library is supported for now")
|
run_method(other_task_output_dict, input_data)
abstractmethod
Calculates a statistic. The data to use are determined by self.inputs. Expects one input data value with name "DataInStatisticCalculation".
Parameters:
Name | Type | Description | Default |
other_task_output_dict | dict | A dictionary containing the output of other tasks. | required |
input_data | DataFrame | The input data of the ExeKG's pipeline. | required |
Returns:
Type | Description |
Dict[str, Any] | Dict[str, Any]: A dictionary containing the calculated statistic with the key "DataOutStatisticCalculation". |
Raises:
Type | Description |
NotImplementedError | If the statistic is not supported. |
Source code in exe_kg_lib/classes/tasks/statistic_tasks.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 | @abstractmethod
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
"""
Calculates a statistic. The data to use are determined by self.inputs.
Expects one input data value with name "DataInStatisticCalculation".
Args:
other_task_output_dict (dict): A dictionary containing the output of other tasks.
input_data (pd.DataFrame): The input data of the ExeKG's pipeline.
Returns:
Dict[str, Any]: A dictionary containing the calculated statistic with the key "DataOutStatisticCalculation".
Raises:
NotImplementedError: If the statistic is not supported.
"""
input_dict = self.get_inputs(other_task_output_dict, input_data)
input_data = input_dict["DataInStatisticCalculation"]
input = input_data[0]["value"] # assume one input
method_module = self.method.resolve_module(module_name_to_snakecase=True)
if "numpy" in method_module.__module__:
statistic_result = method_module(input, **self.method.params_dict)
return self.create_output_dict({"DataOutStatisticCalculation": statistic_result})
else:
raise NotImplementedError("Only numpy library is supported for now")
|