Skip to content

ml_tasks

Concatenation

Bases: Task

Abstraction of owl:class ml:Concatenation.

This class represents a task for concatenating data.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class Concatenation(Task):
    """
    Abstraction of owl:class ml:Concatenation.

    This class represents a task for concatenating data.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
        """
        Concatenates data. The data to use are determined by self.inputs.
        Expects multiple input data values with name "DataInConcatenation".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            Dict[str, Any]: A dictionary containing the concatenated data with the key "DataOutConcatenatedData".
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        inputs = input_dict["DataInConcatenation"]
        input_values = [input["value"] for input in inputs]

        concatenation_result = pd.concat(input_values, axis=1)

        return self.create_output_dict({"DataOutConcatenatedData": concatenation_result})

run_method(other_task_output_dict, input_data)

Concatenates data. The data to use are determined by self.inputs. Expects multiple input data values with name "DataInConcatenation".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing the concatenated data with the key "DataOutConcatenatedData".

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
    """
    Concatenates data. The data to use are determined by self.inputs.
    Expects multiple input data values with name "DataInConcatenation".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        Dict[str, Any]: A dictionary containing the concatenated data with the key "DataOutConcatenatedData".
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    inputs = input_dict["DataInConcatenation"]
    input_values = [input["value"] for input in inputs]

    concatenation_result = pd.concat(input_values, axis=1)

    return self.create_output_dict({"DataOutConcatenatedData": concatenation_result})

DataSplitting

Bases: Task

Abstraction of owl:class ml:DataSplitting.

This class represents a task for splitting data.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class DataSplitting(Task):
    """
    Abstraction of owl:class ml:DataSplitting.

    This class represents a task for splitting data.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
        """
        Splits the data using the splitter determined by self.method.module_chain.
        The data to use are determined by self.inputs. Parameters to use for the splitter are in self.method.params_dict.
        Expects one input data value with name "DataInDataSplittingX" and one with name "DataInDataSplittingY".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            Dict[str, Any]: A dictionary containing the splitted data with the keys "DataOutSplittedTrainDataX", "DataOutSplittedTrainDataY", "DataOutSplittedTestDataX", and "DataOutSplittedTestDataY".

        Raises:
            NotImplementedError: If the data splitter is not supported.
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        input_x = input_dict["DataInDataSplittingX"][0]["value"]
        input_y = input_dict["DataInDataSplittingY"][0]["value"]

        if "TrainTestSplit" in self.method.module_chain:
            method_module = self.method.resolve_module(module_name_to_snakecase=True)
        else:
            method_module = self.method.resolve_module()

        # train_x, train_y, test_x, test_y = self.abstract_method(input_x, input_y)
        if "sklearn" in method_module.__module__:
            if method_module.__name__ == "train_test_split":
                train_x, test_x, train_y, test_y = method_module(input_x, input_y, **self.method.params_dict)
                print("train_test_split splitting finished")
                return self.create_output_dict(
                    {
                        "DataOutSplittedTrainDataX": train_x,
                        "DataOutSplittedTrainDataY": train_y,
                        "DataOutSplittedTestDataX": test_x,
                        "DataOutSplittedTestDataY": test_y,
                    }
                )
            else:
                assert isinstance(method_module, type), "The method_module should be a class"
                splitter = method_module(**self.method.params_dict)

                train_x_per_split = []
                valid_x_per_split = []
                train_y_per_split = []
                valid_y_per_split = []
                for train_index, valid_index in splitter.split(input_x, input_y):
                    train_x_per_split.append(input_x.iloc[train_index])
                    valid_x_per_split.append(input_x.iloc[valid_index])
                    train_y_per_split.append(input_y.iloc[train_index])
                    valid_y_per_split.append(input_y.iloc[valid_index])

                print(f"{splitter.__class__.__name__} splitting finished resulting in {len(train_x_per_split)} splits")
                return self.create_output_dict(
                    {
                        "DataOutSplittedTrainDataX": train_x_per_split,
                        "DataOutSplittedTrainDataY": train_y_per_split,
                        "DataOutSplittedTestDataX": valid_x_per_split,
                        "DataOutSplittedTestDataY": valid_y_per_split,
                    }
                )
        else:
            raise NotImplementedError("Only sklearn data splitters are supported for now")

run_method(other_task_output_dict, input_data)

Splits the data using the splitter determined by self.method.module_chain. The data to use are determined by self.inputs. Parameters to use for the splitter are in self.method.params_dict. Expects one input data value with name "DataInDataSplittingX" and one with name "DataInDataSplittingY".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing the splitted data with the keys "DataOutSplittedTrainDataX", "DataOutSplittedTrainDataY", "DataOutSplittedTestDataX", and "DataOutSplittedTestDataY".

Raises:

Type Description
NotImplementedError

If the data splitter is not supported.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
    """
    Splits the data using the splitter determined by self.method.module_chain.
    The data to use are determined by self.inputs. Parameters to use for the splitter are in self.method.params_dict.
    Expects one input data value with name "DataInDataSplittingX" and one with name "DataInDataSplittingY".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        Dict[str, Any]: A dictionary containing the splitted data with the keys "DataOutSplittedTrainDataX", "DataOutSplittedTrainDataY", "DataOutSplittedTestDataX", and "DataOutSplittedTestDataY".

    Raises:
        NotImplementedError: If the data splitter is not supported.
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    input_x = input_dict["DataInDataSplittingX"][0]["value"]
    input_y = input_dict["DataInDataSplittingY"][0]["value"]

    if "TrainTestSplit" in self.method.module_chain:
        method_module = self.method.resolve_module(module_name_to_snakecase=True)
    else:
        method_module = self.method.resolve_module()

    # train_x, train_y, test_x, test_y = self.abstract_method(input_x, input_y)
    if "sklearn" in method_module.__module__:
        if method_module.__name__ == "train_test_split":
            train_x, test_x, train_y, test_y = method_module(input_x, input_y, **self.method.params_dict)
            print("train_test_split splitting finished")
            return self.create_output_dict(
                {
                    "DataOutSplittedTrainDataX": train_x,
                    "DataOutSplittedTrainDataY": train_y,
                    "DataOutSplittedTestDataX": test_x,
                    "DataOutSplittedTestDataY": test_y,
                }
            )
        else:
            assert isinstance(method_module, type), "The method_module should be a class"
            splitter = method_module(**self.method.params_dict)

            train_x_per_split = []
            valid_x_per_split = []
            train_y_per_split = []
            valid_y_per_split = []
            for train_index, valid_index in splitter.split(input_x, input_y):
                train_x_per_split.append(input_x.iloc[train_index])
                valid_x_per_split.append(input_x.iloc[valid_index])
                train_y_per_split.append(input_y.iloc[train_index])
                valid_y_per_split.append(input_y.iloc[valid_index])

            print(f"{splitter.__class__.__name__} splitting finished resulting in {len(train_x_per_split)} splits")
            return self.create_output_dict(
                {
                    "DataOutSplittedTrainDataX": train_x_per_split,
                    "DataOutSplittedTrainDataY": train_y_per_split,
                    "DataOutSplittedTestDataX": valid_x_per_split,
                    "DataOutSplittedTestDataY": valid_y_per_split,
                }
            )
    else:
        raise NotImplementedError("Only sklearn data splitters are supported for now")

PerformanceCalculation

Bases: Task

Abstraction of owl:class ml:PerformanceCalculation.

This class represents a task for calculating the performance of a machine learning model.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
class PerformanceCalculation(Task):
    """
    Abstraction of owl:class ml:PerformanceCalculation.

    This class represents a task for calculating the performance of a machine learning model.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
        """
        Calculates a score using a metric determined by self.method.module_chain.
        The data to use are determined by self.inputs. Parameters to use for the score calculation are in self.method.params_dict.
        Expects one input data value with name "DataInRealY" and one with name "DataInPredictedY".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            Dict[str, Any]: A dictionary containing the calculated score with the key "DataOutScore".

        Raises:
            NotImplementedError: If the metric is not supported.
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        # real_train_y = input_dict["DataInTrainRealY"]
        input_real_y = input_dict["DataInRealY"][0]["value"]
        # predicted_train_y = input_dict["DataInTrainPredictedY"]
        input_predicted_y = input_dict["DataInPredictedY"][0]["value"]

        method_module = self.method.resolve_module(module_name_to_snakecase=True)

        if "sklearn" in method_module.__module__:
            assert callable(method_module), "The method_module should be a function"
            if not isinstance(input_real_y, list):
                metric_value = method_module(input_real_y, input_predicted_y, **self.method.params_dict)
            else:
                # multiple splits
                metric_values = [
                    method_module(y, p, **self.method.params_dict) for y, p in zip(input_real_y, input_predicted_y)
                ]
                metric_value = sum(metric_values) / len(metric_values)
        else:
            raise NotImplementedError("Only sklearn metrics are supported for now")

        return self.create_output_dict({"DataOutScore": metric_value})

run_method(other_task_output_dict, input_data)

Calculates a score using a metric determined by self.method.module_chain. The data to use are determined by self.inputs. Parameters to use for the score calculation are in self.method.params_dict. Expects one input data value with name "DataInRealY" and one with name "DataInPredictedY".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing the calculated score with the key "DataOutScore".

Raises:

Type Description
NotImplementedError

If the metric is not supported.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
    """
    Calculates a score using a metric determined by self.method.module_chain.
    The data to use are determined by self.inputs. Parameters to use for the score calculation are in self.method.params_dict.
    Expects one input data value with name "DataInRealY" and one with name "DataInPredictedY".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        Dict[str, Any]: A dictionary containing the calculated score with the key "DataOutScore".

    Raises:
        NotImplementedError: If the metric is not supported.
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    # real_train_y = input_dict["DataInTrainRealY"]
    input_real_y = input_dict["DataInRealY"][0]["value"]
    # predicted_train_y = input_dict["DataInTrainPredictedY"]
    input_predicted_y = input_dict["DataInPredictedY"][0]["value"]

    method_module = self.method.resolve_module(module_name_to_snakecase=True)

    if "sklearn" in method_module.__module__:
        assert callable(method_module), "The method_module should be a function"
        if not isinstance(input_real_y, list):
            metric_value = method_module(input_real_y, input_predicted_y, **self.method.params_dict)
        else:
            # multiple splits
            metric_values = [
                method_module(y, p, **self.method.params_dict) for y, p in zip(input_real_y, input_predicted_y)
            ]
            metric_value = sum(metric_values) / len(metric_values)
    else:
        raise NotImplementedError("Only sklearn metrics are supported for now")

    return self.create_output_dict({"DataOutScore": metric_value})

PrepareTransformer

Bases: Task

Abstraction of owl:class ml:PrepareTransformer.

This class represents a task for preparing a data transformer.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class PrepareTransformer(Task):
    """
    Abstraction of owl:class ml:PrepareTransformer.

    This class represents a task for preparing a data transformer.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
        """
        Prepares the transformer determined by self.method.module_chain.
        The data to use are determined by self.inputs. Parameters to use for the transformer are in self.method.params_dict.
        Expects one input data value with name "DataInToPrepareTransformer".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            Dict[str, Any]: A dictionary containing the transformer with the key "DataOutTransformer".

        Raises:
            NotImplementedError: If the transformer is not supported.
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        input = input_dict["DataInToPrepareTransformer"][0]["value"]

        method_module = self.method.resolve_module()
        if "sklearn" in method_module.__module__:
            assert isinstance(method_module, type), "The method_module should be a class"
            transformer = method_module(**self.method.params_dict)

            if not isinstance(input, list):
                transformer.fit(input)
            else:
                # multiple splits
                for input_part in input:
                    transformer.fit(input_part)

            print(f"{transformer.__class__.__name__} transforming finished")
        else:
            raise NotImplementedError("Only sklearn data transformers are supported for now")

        return self.create_output_dict({"DataOutTransformer": transformer})

run_method(other_task_output_dict, input_data)

Prepares the transformer determined by self.method.module_chain. The data to use are determined by self.inputs. Parameters to use for the transformer are in self.method.params_dict. Expects one input data value with name "DataInToPrepareTransformer".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing the transformer with the key "DataOutTransformer".

Raises:

Type Description
NotImplementedError

If the transformer is not supported.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
    """
    Prepares the transformer determined by self.method.module_chain.
    The data to use are determined by self.inputs. Parameters to use for the transformer are in self.method.params_dict.
    Expects one input data value with name "DataInToPrepareTransformer".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        Dict[str, Any]: A dictionary containing the transformer with the key "DataOutTransformer".

    Raises:
        NotImplementedError: If the transformer is not supported.
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    input = input_dict["DataInToPrepareTransformer"][0]["value"]

    method_module = self.method.resolve_module()
    if "sklearn" in method_module.__module__:
        assert isinstance(method_module, type), "The method_module should be a class"
        transformer = method_module(**self.method.params_dict)

        if not isinstance(input, list):
            transformer.fit(input)
        else:
            # multiple splits
            for input_part in input:
                transformer.fit(input_part)

        print(f"{transformer.__class__.__name__} transforming finished")
    else:
        raise NotImplementedError("Only sklearn data transformers are supported for now")

    return self.create_output_dict({"DataOutTransformer": transformer})

Test

Bases: Task

Abstraction of owl:class ml:Test.

This class represents a test task for machine learning models.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class Test(Task):
    """
    Abstraction of owl:class ml:Test.

    This class represents a test task for machine learning models.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame):
        """
        Tests the machine learning model.
        The model and data to use are determined by self.inputs.
        Expects one input data value with name "DataInTestModel" and one with name "DataInTestX".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            dict: A dictionary containing the predicted values with the key "DataOutPredictedValueTest".

        Raises:
            NotImplementedError: If the model is not supported.
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        model = input_dict["DataInTestModel"][0]["value"]
        input_x = input_dict["DataInTestX"][0]["value"]

        # check if model belongs to sklearn library
        if "sklearn" in model.__module__:
            if not isinstance(input_x, list):
                predicted_y = model.predict(input_x)
            else:
                # multiple splits
                predicted_y = [model.predict(x) for x in input_x]
        else:
            raise NotImplementedError("Only sklearn models are supported for now")

        print(f"{model.__class__.__name__} testing finished")

        return self.create_output_dict({"DataOutPredictedValueTest": predicted_y})

run_method(other_task_output_dict, input_data)

Tests the machine learning model. The model and data to use are determined by self.inputs. Expects one input data value with name "DataInTestModel" and one with name "DataInTestX".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Name Type Description
dict

A dictionary containing the predicted values with the key "DataOutPredictedValueTest".

Raises:

Type Description
NotImplementedError

If the model is not supported.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame):
    """
    Tests the machine learning model.
    The model and data to use are determined by self.inputs.
    Expects one input data value with name "DataInTestModel" and one with name "DataInTestX".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        dict: A dictionary containing the predicted values with the key "DataOutPredictedValueTest".

    Raises:
        NotImplementedError: If the model is not supported.
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    model = input_dict["DataInTestModel"][0]["value"]
    input_x = input_dict["DataInTestX"][0]["value"]

    # check if model belongs to sklearn library
    if "sklearn" in model.__module__:
        if not isinstance(input_x, list):
            predicted_y = model.predict(input_x)
        else:
            # multiple splits
            predicted_y = [model.predict(x) for x in input_x]
    else:
        raise NotImplementedError("Only sklearn models are supported for now")

    print(f"{model.__class__.__name__} testing finished")

    return self.create_output_dict({"DataOutPredictedValueTest": predicted_y})

Train

Bases: Task

Abstraction of owl:class ml:Train.

This class represents a training task for machine learning models.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Train(Task):
    """
    Abstraction of owl:class ml:Train.

    This class represents a training task for machine learning models.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame):
        """
        Trains the machine learning model determined by self.method.module_chain.
        The data to use are determined by self.inputs. Parameters to use for the model are in self.method.params_dict.
        Expects one input data value with name "DataInTrainX" and one with name "DataInTrainY".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            dict: A dictionary containing the trained model with the key "DataOutTrainModel".

        Raises:
            NotImplementedError: If the model is not supported.
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        input_x = input_dict["DataInTrainX"][0]["value"]
        input_y = input_dict["DataInTrainY"][0]["value"]
        input_model_as_method = None
        # check if input dict contains a method representing an ML model to be optimized
        if "InputModelAsMethod" in input_dict:
            input_model_as_method = input_dict["InputModelAsMethod"][0]["value"]
            input_model_as_method_module = input_model_as_method.resolve_module()

        method_module = self.method.resolve_module()
        if "sklearn" in method_module.__module__:
            assert isinstance(method_module, type), "The method_module should be a class"
            if input_model_as_method:
                # HPO (e.g. GridSearchCV) or Boosting (e.g. AdaBoostClassifier)
                model = method_module(
                    input_model_as_method_module(**input_model_as_method.params_dict),
                    **self.method.params_dict,
                )
            else:
                # normal training
                model = method_module(**self.method.params_dict)

            if not isinstance(input_x, list):
                model.fit(input_x, input_y)
            else:
                # multiple splits
                for x, y in zip(input_x, input_y):
                    model.fit(x, y)

            print(f"{model.__class__.__name__} training finished")
        else:
            raise NotImplementedError("Only sklearn models are supported for now")

        return self.create_output_dict({"DataOutTrainModel": model})

run_method(other_task_output_dict, input_data)

Trains the machine learning model determined by self.method.module_chain. The data to use are determined by self.inputs. Parameters to use for the model are in self.method.params_dict. Expects one input data value with name "DataInTrainX" and one with name "DataInTrainY".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Name Type Description
dict

A dictionary containing the trained model with the key "DataOutTrainModel".

Raises:

Type Description
NotImplementedError

If the model is not supported.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame):
    """
    Trains the machine learning model determined by self.method.module_chain.
    The data to use are determined by self.inputs. Parameters to use for the model are in self.method.params_dict.
    Expects one input data value with name "DataInTrainX" and one with name "DataInTrainY".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        dict: A dictionary containing the trained model with the key "DataOutTrainModel".

    Raises:
        NotImplementedError: If the model is not supported.
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    input_x = input_dict["DataInTrainX"][0]["value"]
    input_y = input_dict["DataInTrainY"][0]["value"]
    input_model_as_method = None
    # check if input dict contains a method representing an ML model to be optimized
    if "InputModelAsMethod" in input_dict:
        input_model_as_method = input_dict["InputModelAsMethod"][0]["value"]
        input_model_as_method_module = input_model_as_method.resolve_module()

    method_module = self.method.resolve_module()
    if "sklearn" in method_module.__module__:
        assert isinstance(method_module, type), "The method_module should be a class"
        if input_model_as_method:
            # HPO (e.g. GridSearchCV) or Boosting (e.g. AdaBoostClassifier)
            model = method_module(
                input_model_as_method_module(**input_model_as_method.params_dict),
                **self.method.params_dict,
            )
        else:
            # normal training
            model = method_module(**self.method.params_dict)

        if not isinstance(input_x, list):
            model.fit(input_x, input_y)
        else:
            # multiple splits
            for x, y in zip(input_x, input_y):
                model.fit(x, y)

        print(f"{model.__class__.__name__} training finished")
    else:
        raise NotImplementedError("Only sklearn models are supported for now")

    return self.create_output_dict({"DataOutTrainModel": model})

Transform

Bases: Task

Abstraction of owl:class ml:Transform.

This class represents a task for transforming data.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class Transform(Task):
    """
    Abstraction of owl:class ml:Transform.

    This class represents a task for transforming data.
    """

    def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
        """
        Applies a transformation to the data.
        The model and data to use are determined by self.inputs.
        Expects one input data value with name "DataInTransformer" and one with name "DataInToTransform".

        Args:
            other_task_output_dict (dict): A dictionary containing the output of other tasks.
            input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

        Returns:
            Dict[str, Any]: A dictionary containing the transformed data with the key "DataOutTransformed".

        Raises:
            NotImplementedError: If the transformer is not supported.
        """
        input_dict = self.get_inputs(other_task_output_dict, input_data)
        transformer = input_dict["DataInTransformer"][0]["value"]
        input = input_dict["DataInToTransform"][0]["value"]

        # check if model belongs to sklearn library
        if "sklearn" in transformer.__module__:
            if not isinstance(input, list):
                transformed_input = transformer.transform(input)
            else:  # multiple splits
                transformed_input = [
                    transformer.transform(x) for x in input
                ]  # NOTE: it can be that the transformer will try to trasform unseen data, which will raise an error. e.g. if OneHotEncoder is used, one chunk of input may have a category that is not present in another chunk of input

        else:
            raise NotImplementedError("Only sklearn data transformers are supported for now")

        print(f"{transformer.__class__.__name__} transforming finished")

        return self.create_output_dict({"DataOutTransformed": transformed_input})

run_method(other_task_output_dict, input_data)

Applies a transformation to the data. The model and data to use are determined by self.inputs. Expects one input data value with name "DataInTransformer" and one with name "DataInToTransform".

Parameters:

Name Type Description Default
other_task_output_dict dict

A dictionary containing the output of other tasks.

required
input_data DataFrame

The input data of the ExeKG's pipeline.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing the transformed data with the key "DataOutTransformed".

Raises:

Type Description
NotImplementedError

If the transformer is not supported.

Source code in exe_kg_lib/classes/tasks/ml_tasks.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def run_method(self, other_task_output_dict: dict, input_data: pd.DataFrame) -> Dict[str, Any]:
    """
    Applies a transformation to the data.
    The model and data to use are determined by self.inputs.
    Expects one input data value with name "DataInTransformer" and one with name "DataInToTransform".

    Args:
        other_task_output_dict (dict): A dictionary containing the output of other tasks.
        input_data (pd.DataFrame): The input data of the ExeKG's pipeline.

    Returns:
        Dict[str, Any]: A dictionary containing the transformed data with the key "DataOutTransformed".

    Raises:
        NotImplementedError: If the transformer is not supported.
    """
    input_dict = self.get_inputs(other_task_output_dict, input_data)
    transformer = input_dict["DataInTransformer"][0]["value"]
    input = input_dict["DataInToTransform"][0]["value"]

    # check if model belongs to sklearn library
    if "sklearn" in transformer.__module__:
        if not isinstance(input, list):
            transformed_input = transformer.transform(input)
        else:  # multiple splits
            transformed_input = [
                transformer.transform(x) for x in input
            ]  # NOTE: it can be that the transformer will try to trasform unseen data, which will raise an error. e.g. if OneHotEncoder is used, one chunk of input may have a category that is not present in another chunk of input

    else:
        raise NotImplementedError("Only sklearn data transformers are supported for now")

    print(f"{transformer.__class__.__name__} transforming finished")

    return self.create_output_dict({"DataOutTransformed": transformed_input})