Skip to content

pipeline

Pipeline

Represents a simplified version of a pipeline for serialization purposes. Can be converted to and from JSON.

Source code in exe_kg_lib/classes/exe_kg_serialization/pipeline.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class Pipeline:
    """
    Represents a simplified version of a pipeline for serialization purposes. Can be converted to and from JSON.
    """

    def __init__(
        self,
        name: str = "",
        input_data_path: str = "",
        output_plots_dir: str = "",
        data_entities: List[DataEntitySerializable] = None,
        tasks: List[TaskSerializable] = None,
    ):
        """
        Initializes a Pipeline object.

        Args:
            name (str): The name of the pipeline.
            input_data_path (str): The path to the input dataset.
            output_plots_dir (str): The directory to save output plots.
            data_entities (List[DataEntitySerializable]): A list of data entities.
            tasks (List[TaskSerializable]): A list of tasks.
        """
        self.name = name
        self.input_data_path = input_data_path
        self.output_plots_dir = output_plots_dir

        if data_entities is None:
            data_entities = []
        self.data_entities = data_entities

        if tasks is None:
            tasks = []
        self.tasks = tasks

    @classmethod
    def from_json(cls, source: Union[Path, TextIOWrapper, str]):
        """
        Deserializes a JSON source and creates an instance of the class.

        Args:
            source (Union[Path, TextIOWrapper, str]): The JSON source containing the pipeline.

        Returns:
            cls: An instance of the class with the deserialized data.
        """
        obj_dict = None
        try:
            # if source is a path
            if isinstance(source, TextIOWrapper):
                obj_dict = json.load(source)
            elif isinstance(source, Path) or Path(str(source)).exists():
                with open(source) as file:
                    obj_dict = json.load(file)
        except OSError:
            pass

        if obj_dict is None and isinstance(source, str):
            obj_dict = json.loads(source)

        if obj_dict is None:
            raise ValueError("Invalid source type. Must be a valid Path, TextIOWrapper, or str.")

        data_entities = []
        for data_entity_dict in obj_dict["data_entities"]:
            data_entity = DataEntitySerializable()
            data_entity.__dict__ = data_entity_dict
            data_entities.append(data_entity)

        tasks = []
        for task_dict in obj_dict["tasks"]:
            task = TaskSerializable.from_dict(task_dict)
            tasks.append(task)

        return cls(obj_dict["name"], obj_dict["input_data_path"], obj_dict["output_plots_dir"], data_entities, tasks)

    def to_json(self) -> str:
        """
        Converts the Pipeline object to a JSON string.

        Returns:
            str: The JSON representation of the Pipeline object.
        """
        obj_dict = self.__dict__
        obj_dict["data_entities"] = [data_entity.__dict__ for data_entity in self.data_entities]
        obj_dict["tasks"] = [task.to_dict() for task in self.tasks]
        return json.dumps(obj_dict, indent=4)

    def add_data_entity(self, name: str, source_value: str, data_semantics_name: str, data_structure_name: str):
        """
        Adds a data entity to the pipeline.

        Args:
            name (str): The name of the data entity.
            source_value (str): The source value of the data entity (i.e. column of the input dataset).
            data_semantics_name (str): The name of the data semantics.
            data_structure_name (str): The name of the data structure.
        """
        data_entity_ser = DataEntitySerializable(name, source_value, data_semantics_name, data_structure_name)

        self.data_entities.append(data_entity_ser)

    def add_task(
        self,
        kg_schema_short: str,
        task_type: str,
        method_type: str,
        method_params_dict: Dict[str, Union[str, int, float, dict]],
        input_entity_dict: Dict[str, Union[List[DataEntity], Method]],
        output_names: List[str],
    ):
        """
        Adds a task to the pipeline.

        Args:
            kg_schema_short (str): The short name of the KG schema (e.g. "ml" for Machine Learning).
            task_type (str): The type of the task.
            method_type (str): The type of the method.
            method_params_dict (Dict[str, Union[str, int, float]]): A dictionary of method parameters.
            input_entity_dict (Dict[str, Union[List[DataEntity], Method]]): A dictionary of input data entities.
            output_names (List[str]): A list of output names.
        """
        task_ser = TaskSerializable(kg_schema_short, task_type, method_type, method_params_dict, output_names)

        for input_entity_name, input_entity_value in input_entity_dict.items():
            if isinstance(input_entity_value, Method):  # provided input is a method
                input_method = input_entity_value

                task_ser.input_entity_info_dict[input_entity_name] = MethodSerializable(
                    input_method.name, input_method.params_dict
                )
            elif isinstance(input_entity_value, list) and all(
                isinstance(elem, DataEntity) for elem in input_entity_value
            ):  # provided input is list of data entities
                input_data_entity_list = input_entity_value
                input_data_entity_names = []
                for input_data_entity in input_data_entity_list:
                    input_data_entity_names.append(input_data_entity.name)

                task_ser.input_entity_info_dict[input_entity_name] = input_data_entity_names

        self.tasks.append(task_ser)

__init__(name='', input_data_path='', output_plots_dir='', data_entities=None, tasks=None)

Initializes a Pipeline object.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

''
input_data_path str

The path to the input dataset.

''
output_plots_dir str

The directory to save output plots.

''
data_entities List[DataEntity]

A list of data entities.

None
tasks List[Task]

A list of tasks.

None
Source code in exe_kg_lib/classes/exe_kg_serialization/pipeline.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    name: str = "",
    input_data_path: str = "",
    output_plots_dir: str = "",
    data_entities: List[DataEntitySerializable] = None,
    tasks: List[TaskSerializable] = None,
):
    """
    Initializes a Pipeline object.

    Args:
        name (str): The name of the pipeline.
        input_data_path (str): The path to the input dataset.
        output_plots_dir (str): The directory to save output plots.
        data_entities (List[DataEntitySerializable]): A list of data entities.
        tasks (List[TaskSerializable]): A list of tasks.
    """
    self.name = name
    self.input_data_path = input_data_path
    self.output_plots_dir = output_plots_dir

    if data_entities is None:
        data_entities = []
    self.data_entities = data_entities

    if tasks is None:
        tasks = []
    self.tasks = tasks

add_data_entity(name, source_value, data_semantics_name, data_structure_name)

Adds a data entity to the pipeline.

Parameters:

Name Type Description Default
name str

The name of the data entity.

required
source_value str

The source value of the data entity (i.e. column of the input dataset).

required
data_semantics_name str

The name of the data semantics.

required
data_structure_name str

The name of the data structure.

required
Source code in exe_kg_lib/classes/exe_kg_serialization/pipeline.py
108
109
110
111
112
113
114
115
116
117
118
119
120
def add_data_entity(self, name: str, source_value: str, data_semantics_name: str, data_structure_name: str):
    """
    Adds a data entity to the pipeline.

    Args:
        name (str): The name of the data entity.
        source_value (str): The source value of the data entity (i.e. column of the input dataset).
        data_semantics_name (str): The name of the data semantics.
        data_structure_name (str): The name of the data structure.
    """
    data_entity_ser = DataEntitySerializable(name, source_value, data_semantics_name, data_structure_name)

    self.data_entities.append(data_entity_ser)

add_task(kg_schema_short, task_type, method_type, method_params_dict, input_entity_dict, output_names)

Adds a task to the pipeline.

Parameters:

Name Type Description Default
kg_schema_short str

The short name of the KG schema (e.g. "ml" for Machine Learning).

required
task_type str

The type of the task.

required
method_type str

The type of the method.

required
method_params_dict Dict[str, Union[str, int, float]]

A dictionary of method parameters.

required
input_entity_dict Dict[str, Union[List[DataEntity], Method]]

A dictionary of input data entities.

required
output_names List[str]

A list of output names.

required
Source code in exe_kg_lib/classes/exe_kg_serialization/pipeline.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def add_task(
    self,
    kg_schema_short: str,
    task_type: str,
    method_type: str,
    method_params_dict: Dict[str, Union[str, int, float, dict]],
    input_entity_dict: Dict[str, Union[List[DataEntity], Method]],
    output_names: List[str],
):
    """
    Adds a task to the pipeline.

    Args:
        kg_schema_short (str): The short name of the KG schema (e.g. "ml" for Machine Learning).
        task_type (str): The type of the task.
        method_type (str): The type of the method.
        method_params_dict (Dict[str, Union[str, int, float]]): A dictionary of method parameters.
        input_entity_dict (Dict[str, Union[List[DataEntity], Method]]): A dictionary of input data entities.
        output_names (List[str]): A list of output names.
    """
    task_ser = TaskSerializable(kg_schema_short, task_type, method_type, method_params_dict, output_names)

    for input_entity_name, input_entity_value in input_entity_dict.items():
        if isinstance(input_entity_value, Method):  # provided input is a method
            input_method = input_entity_value

            task_ser.input_entity_info_dict[input_entity_name] = MethodSerializable(
                input_method.name, input_method.params_dict
            )
        elif isinstance(input_entity_value, list) and all(
            isinstance(elem, DataEntity) for elem in input_entity_value
        ):  # provided input is list of data entities
            input_data_entity_list = input_entity_value
            input_data_entity_names = []
            for input_data_entity in input_data_entity_list:
                input_data_entity_names.append(input_data_entity.name)

            task_ser.input_entity_info_dict[input_entity_name] = input_data_entity_names

    self.tasks.append(task_ser)

from_json(source) classmethod

Deserializes a JSON source and creates an instance of the class.

Parameters:

Name Type Description Default
source Union[Path, TextIOWrapper, str]

The JSON source containing the pipeline.

required

Returns:

Name Type Description
cls

An instance of the class with the deserialized data.

Source code in exe_kg_lib/classes/exe_kg_serialization/pipeline.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def from_json(cls, source: Union[Path, TextIOWrapper, str]):
    """
    Deserializes a JSON source and creates an instance of the class.

    Args:
        source (Union[Path, TextIOWrapper, str]): The JSON source containing the pipeline.

    Returns:
        cls: An instance of the class with the deserialized data.
    """
    obj_dict = None
    try:
        # if source is a path
        if isinstance(source, TextIOWrapper):
            obj_dict = json.load(source)
        elif isinstance(source, Path) or Path(str(source)).exists():
            with open(source) as file:
                obj_dict = json.load(file)
    except OSError:
        pass

    if obj_dict is None and isinstance(source, str):
        obj_dict = json.loads(source)

    if obj_dict is None:
        raise ValueError("Invalid source type. Must be a valid Path, TextIOWrapper, or str.")

    data_entities = []
    for data_entity_dict in obj_dict["data_entities"]:
        data_entity = DataEntitySerializable()
        data_entity.__dict__ = data_entity_dict
        data_entities.append(data_entity)

    tasks = []
    for task_dict in obj_dict["tasks"]:
        task = TaskSerializable.from_dict(task_dict)
        tasks.append(task)

    return cls(obj_dict["name"], obj_dict["input_data_path"], obj_dict["output_plots_dir"], data_entities, tasks)

to_json()

Converts the Pipeline object to a JSON string.

Returns:

Name Type Description
str str

The JSON representation of the Pipeline object.

Source code in exe_kg_lib/classes/exe_kg_serialization/pipeline.py
 96
 97
 98
 99
100
101
102
103
104
105
106
def to_json(self) -> str:
    """
    Converts the Pipeline object to a JSON string.

    Returns:
        str: The JSON representation of the Pipeline object.
    """
    obj_dict = self.__dict__
    obj_dict["data_entities"] = [data_entity.__dict__ for data_entity in self.data_entities]
    obj_dict["tasks"] = [task.to_dict() for task in self.tasks]
    return json.dumps(obj_dict, indent=4)