16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 | class ExeKGBase:
def __init__(self):
"""
Args:
input_exe_kg_path: path of KG to be executed
acts as switch for KG execution mode (if filled, mode is on)
"""
self.top_level_schema = KGSchema.from_schema_info(KG_SCHEMAS["Data Science"]) # top-level KG schema
self.bottom_level_schemata = {}
for schema_name, schema_info in KG_SCHEMAS.items(): # search for used bottom-level schema
if (
schema_name == "Data Science" # or schema_name == "Visualization"
): # skip top-level KG schema and Visualization schema that is always used
continue
self.bottom_level_schemata[schema_info["namespace_prefix"]] = KGSchema.from_schema_info(schema_info)
# top-level KG schema entities
self.atomic_task = Entity(self.top_level_schema.namespace.AtomicTask)
self.task = Entity(self.top_level_schema.namespace.Task)
self.atomic_method = Entity(self.top_level_schema.namespace.AtomicMethod)
self.data_entity = Entity(self.top_level_schema.namespace.DataEntity)
self.pipeline = Entity(self.top_level_schema.namespace.Pipeline)
self.data = Entity(self.top_level_schema.namespace.Data)
self.data_semantics = Entity(self.top_level_schema.namespace.DataSemantics)
self.data_structure = Entity(self.top_level_schema.namespace.DataStructure)
# self.input_kg: KG eventually filled with 3 KG schemas and the input executable KG in case of KG execution
self.input_kg = Graph(bind_namespaces="rdflib")
# self.shacl_shapes_s: string containing SHACL shapes of all KG schemas
self.shacl_shapes_s = self.top_level_schema.shacl_shapes_s
# bottom_level_schemata_kgs = [kg_schema.kg for kg_schema in self.bottom_level_schemata.values()]
bottom_level_schemata_kgs = []
for kg_schema in self.bottom_level_schemata.values():
bottom_level_schemata_kgs.append(kg_schema.kg)
bottom_level_schemata_kgs.append(kg_schema.generated_schema_kg)
self.shacl_shapes_s += kg_schema.shacl_shapes_s
self.input_kg += self.top_level_schema.kg # + self.visu_schema.kg # combine all KG schemas in input KG
for bottom_level_schema_kg in bottom_level_schemata_kgs:
self.input_kg += bottom_level_schema_kg
self.exe_kg = Graph(bind_namespaces="rdflib") # variable to store the constructed ExeKG
self.pipeline_instance = None # variable to store pipeline's metadata
self.pipeline_serializable = Pipeline() # simplified version of pipeline for serialization purposes
self._bind_used_namespaces([self.input_kg, self.exe_kg])
# below variables are filled in self._parse_kgs()
self.task_type_dict = {} # dict for uniquely naming each new pipeline task
self.method_type_dict = {} # dict for uniquely naming each new pipeline method
self.atomic_task_list = [] # list for storing the available sub-classes of ds:AtomicTask
self.atomic_method_list = [] # list for storing the available sub-classes of ds:AtomicMethod
self.data_type_list = [] # list for storing the available sub-classes of ds:DataEntity
self.data_semantics_list = [] # list for storing the available sub-classes of ds:DataSemantics
self.data_structure_list = [] # list for storing the available sub-classes of ds:DataStructure
self.existing_data_entity_list = (
[]
) # contains existing data entities that are output entities of previous tasks during KG construction
self.last_created_task = (
None # last created pipeline task, for connecting consecutive pipeline tasks during KG construction
)
self.canvas_task_created = False # indicates if canvas task was created during KG construction, and used for hiding the other Visualization tasks in CLI
self._parse_kgs()
def _bind_used_namespaces(self, kgs: List[Graph]):
"""
Binds top-level and bottom-level KG schemas' namespaces with their prefixes
Adds these bindings to the Graphs of kgs list
Args:
kgs: list of Graph objects to which the namespace bindings are added
"""
for kg in kgs:
kg.bind(self.top_level_schema.namespace_prefix, self.top_level_schema.namespace)
for bottom_level_kg_schema in self.bottom_level_schemata.values():
kg.bind(
bottom_level_kg_schema.namespace_prefix,
bottom_level_kg_schema.namespace,
)
def _parse_kgs(self) -> None:
"""
Fills lists with subclasses of top-level KG schema classes and initializes dicts used for unique naming
"""
atomic_task_subclasses = query_subclasses_of(self.atomic_task.iri, self.input_kg)
for t in list(atomic_task_subclasses):
task = Entity(t[0], self.atomic_task)
self.atomic_task_list.append(task)
self.task_type_dict[task.name] = 1
atomic_method_subclasses = query_subclasses_of(self.atomic_method.iri, self.input_kg)
for m in list(atomic_method_subclasses):
method = Entity(m[0], self.atomic_method)
self.atomic_method_list.append(method)
self.method_type_dict[method.name] = 1
data_type_subclasses = query_subclasses_of(self.data_entity.iri, self.input_kg)
for d in list(data_type_subclasses):
data_type = Entity(d[0], self.data_entity)
self.data_type_list.append(data_type)
data_semantics_subclasses = query_subclasses_of(self.data_semantics.iri, self.top_level_schema.kg)
for d in list(data_semantics_subclasses):
if d[0] == self.data_entity.iri:
continue
data_semantics = Entity(d[0], self.data_semantics)
self.data_semantics_list.append(data_semantics)
data_structure_subclasses = query_subclasses_of(self.data_structure.iri, self.top_level_schema.kg)
for d in list(data_structure_subclasses):
if d[0] == self.data_entity.iri:
continue
data_structure = Entity(d[0], self.data_structure)
self.data_structure_list.append(data_structure)
|