Skip to content

exe_kg

ExeKG

Source code in exe_kg_lib/classes/exe_kg.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
class ExeKG:
    def __init__(self, input_exe_kg_path: str = None):
        """

        Args:
            input_exe_kg_path: path of KG to be executed
                               acts as switch for KG execution mode (if filled, mode is on)
        """
        self.top_level_schema = KGSchema.from_schema_info(KG_SCHEMAS["Data Science"])  # top-level KG schema
        self.bottom_level_schemata = {}

        # top-level KG schema entities
        self.atomic_task = Entity(self.top_level_schema.namespace.AtomicTask)
        self.atomic_method = Entity(self.top_level_schema.namespace.AtomicMethod)
        self.data_entity = Entity(self.top_level_schema.namespace.DataEntity)
        self.pipeline = Entity(self.top_level_schema.namespace.Pipeline)
        self.data = Entity(self.top_level_schema.namespace.Data)
        self.data_semantics = Entity(self.top_level_schema.namespace.DataSemantics)
        self.data_structure = Entity(self.top_level_schema.namespace.DataStructure)

        # self.input_kg: KG eventually filled with 3 KG schemas and the input executable KG in case of KG execution
        self.input_kg = Graph(bind_namespaces="rdflib")
        if input_exe_kg_path:  # KG execution mode
            self.input_kg.parse(input_exe_kg_path, format="n3")  # parse input executable KG
            check_kg_executability(self.input_kg)
            all_ns = [n for n in self.input_kg.namespace_manager.namespaces()]
            bottom_level_schema_info_set = False  # flag indicating that a bottom-level schema was found
            for schema_name, schema_info in KG_SCHEMAS.items():  # search for used bottom-level schema
                if (
                    schema_name == "Data Science"  # or schema_name == "Visualization"
                ):  # skip top-level KG schema and Visualization schema that is always used
                    continue

                if (schema_info["namespace_prefix"], URIRef(schema_info["namespace"])) in all_ns:
                    # bottom-level schema found
                    self.bottom_level_schemata[schema_info["namespace_prefix"]] = KGSchema.from_schema_info(schema_info)
                    bottom_level_schema_info_set = True

            if not bottom_level_schema_info_set:  # no bottom-level schema found, input executable KG is invalid
                print("Input executable KG did not have any bottom level KG schemas")
                exit(1)
        else:  # KG construction mode
            for schema_name, schema_info in KG_SCHEMAS.items():  # search for used bottom-level schema
                if (
                    schema_name == "Data Science"  # or schema_name == "Visualization"
                ):  # skip top-level KG schema and Visualization schema that is always used
                    continue

                self.bottom_level_schemata[schema_info["namespace_prefix"]] = KGSchema.from_schema_info(schema_info)

        bottom_level_schemata_kgs = [kg_schema.kg for kg_schema in self.bottom_level_schemata.values()]

        self.input_kg += self.top_level_schema.kg  # + self.visu_schema.kg  # combine all KG schemas in input KG

        for bottom_level_schema_kg in bottom_level_schemata_kgs:
            self.input_kg += bottom_level_schema_kg

        self.output_kg = Graph(bind_namespaces="rdflib")  # KG to be filled while constructing executable KG

        self._bind_used_namespaces([self.input_kg, self.output_kg])

        # below variables are filled in self._parse_kgs()
        self.task_type_dict = {}  # dict for uniquely naming each new pipeline task
        self.method_type_dict = {}  # dict for uniquely naming each new pipeline method
        self.atomic_task_list = []  # list for storing the available sub-classes of ds:AtomicTask
        self.atomic_method_list = []  # list for storing the available sub-classes of ds:AtomicMethod
        self.data_type_list = []  # list for storing the available sub-classes of ds:DataEntity
        self.data_semantics_list = []  # list for storing the available sub-classes of ds:DataSemantics
        self.data_structure_list = []  # list for storing the available sub-classes of ds:DataStructure

        self.existing_data_entity_list = (
            []
        )  # contains existing data entities that are output entities of previous tasks during KG construction
        self.last_created_task = (
            None  # last created pipeline task, for connecting consecutive pipeline tasks during KG construction
        )
        self.canvas_task_created = False  # indicates if canvas task was created during KG construction, and used for hiding the other Visualization tasks in CLI

        self._parse_kgs()

    def _bind_used_namespaces(self, kgs: List[Graph]):
        """
        Binds top-level, bottom-level and Visualization KG schemas' namespaces with their prefixes
        Adds these bindings to the Graphs of kgs list
        Args:
            kgs: list of Graph objects to which the namespace bindings are added
        """
        for kg in kgs:
            kg.bind(self.top_level_schema.namespace_prefix, self.top_level_schema.namespace)
            for bottom_level_kg_schema in self.bottom_level_schemata.values():
                kg.bind(
                    bottom_level_kg_schema.namespace_prefix,
                    bottom_level_kg_schema.namespace,
                )

    def _parse_kgs(self) -> None:
        """
        Fills lists with subclasses of top-level KG schema classes and initializes dicts used for unique naming
        """
        atomic_task_subclasses = get_subclasses_of(self.atomic_task.iri, self.input_kg)
        for t in list(atomic_task_subclasses):
            task = Entity(t[0], self.atomic_task)
            self.atomic_task_list.append(task)
            self.task_type_dict[task.name] = 1

        atomic_method_subclasses = get_subclasses_of(self.atomic_method.iri, self.input_kg)
        for m in list(atomic_method_subclasses):
            method = Entity(m[0], self.atomic_method)
            self.atomic_method_list.append(method)
            self.method_type_dict[method.name] = 1

        data_type_subclasses = get_subclasses_of(self.data_entity.iri, self.input_kg)
        for d in list(data_type_subclasses):
            data_type = Entity(d[0], self.data_entity)
            self.data_type_list.append(data_type)

        data_semantics_subclasses = get_subclasses_of(self.data_semantics.iri, self.top_level_schema.kg)
        for d in list(data_semantics_subclasses):
            if d[0] == self.data_entity.iri:
                continue
            data_semantics = Entity(d[0], self.data_semantics)
            self.data_semantics_list.append(data_semantics)

        data_structure_subclasses = get_subclasses_of(self.data_structure.iri, self.top_level_schema.kg)
        for d in list(data_structure_subclasses):
            if d[0] == self.data_entity.iri:
                continue
            data_structure = Entity(d[0], self.data_structure)
            self.data_structure_list.append(data_structure)

    def create_pipeline_task(self, pipeline_name: str, input_data_path: str) -> Task:
        """
        Instantiates and adds a new pipeline task entity to self.output_kg
        Args:
            pipeline_name: name for the pipeline
            input_data_path: path for the input data to be used by the pipeline's tasks

        Returns:
            Task: created pipeline
        """
        pipeline = create_pipeline_task(
            self.top_level_schema.namespace,
            self.pipeline,
            self.output_kg,
            pipeline_name,
            input_data_path,
        )
        self.last_created_task = pipeline
        return pipeline

    def create_data_entity(
        self,
        name: str,
        source_value: str,
        data_semantics_name: str,
        data_structure_name: str,
    ) -> DataEntity:
        """
        Creates a DataEntity object
        Args:
            name: name of the data entity
            source_value: name of the data source corresponding to a column of the data
            data_semantics_name: name of the data semantics entity
            data_structure_name: name of the data structure entity

        Returns:
            DataEntity: object initialized with the given parameter values
        """
        return DataEntity(
            self.top_level_schema.namespace + name,
            self.data_entity,
            source_value,
            self.top_level_schema.namespace + data_semantics_name,
            self.top_level_schema.namespace + data_structure_name,
        )

    def add_task(
        self,
        kg_schema_short: str,
        task: str,
        input_data_entity_dict: Dict[str, List[DataEntity]],
        method: str,
        properties_dict: Dict[str, Union[str, int, float]],
    ) -> Task:
        """
        Instantiates and adds a new task entity to self.output_kg
        Components attached to the task during creation: input and output data entities, and a method with properties
        Args:
            kg_schema_short: abbreviated name of the KG schema in which the task and method belong
            task: task name
            input_data_entity_dict: keys -> input names of the specified task
                                    values -> lists of DataEntity objects to be added as input to the task
            method: method name
            properties_dict: keys -> property names of the specified method
                             values -> values to be added as parameters to the method

        Returns:
            Task: object of the created task
        """
        kg_schema_to_use = self.bottom_level_schemata[kg_schema_short]

        relation_iri = (
            self.top_level_schema.namespace.hasNextTask
            if self.last_created_task.type != "Pipeline"
            else self.top_level_schema.namespace.hasStartTask
        )  # use relation depending on the previous task

        # instantiate task and link it with the previous one
        parent_task = Task(kg_schema_to_use.namespace + task, self.atomic_task)
        added_entity = add_instance_from_parent_with_relation(
            kg_schema_to_use.namespace,
            self.output_kg,
            parent_task,
            relation_iri,
            self.last_created_task,
            name_instance(self.task_type_dict, self.method_type_dict, parent_task),
        )
        next_task = Task.from_entity(added_entity)  # create Task object from Entity object

        # instantiate and add given input data entities to the task
        self._add_inputs_to_task(kg_schema_to_use.namespace, next_task, input_data_entity_dict)
        # instantiate and add output data entities to the task, as specified in the KG schema
        self._add_outputs_to_task(next_task)

        method_parent = Entity(kg_schema_to_use.namespace + method, self.atomic_method)

        # fetch compatible methods and their properties from KG schema
        results = list(
            get_method_properties_and_methods(
                self.input_kg,
                self.top_level_schema.namespace_prefix,
                next_task.parent_entity.iri,
            )
        )

        chosen_property_method = next(
            filter(lambda pair: pair[1].split("#")[1] == method, results), None
        )  # match given method_type with query result
        if chosen_property_method is None:
            print(f"Property connecting task of type {task} with method of type {method} not found")
            exit(1)

        # instantiate method and link it with the task using the appropriate chosen_property_method[0] relation
        method_entity = add_instance_from_parent_with_relation(
            kg_schema_to_use.namespace,
            self.output_kg,
            method_parent,
            chosen_property_method[0],
            next_task,
            name_instance(self.task_type_dict, self.method_type_dict, method_parent),
        )

        # fetch compatible data properties from KG schema
        property_list = get_data_properties_plus_inherited_by_class_iri(self.input_kg, method_parent.iri)

        # add data properties to the task with given values
        for pair in property_list:
            property_iri = pair[0]
            property_name = property_iri.split("#")[1]
            range_iri = pair[1]
            input_property = Literal(
                lexical_or_value=properties_dict[property_name],
                datatype=range_iri,
            )
            add_literal(self.output_kg, method_entity, property_iri, input_property)

        self.last_created_task = next_task  # store created task

        return next_task

    def _add_inputs_to_task(
        self,
        namespace: Namespace,
        task_entity: Task,
        input_data_entity_dict: Dict[str, List[DataEntity]] = None,
    ) -> None:
        """
        Instantiates and adds given input data entities to the given task of self.output_kg
        if input_data_entity_dict is None, user is asked to specify input data entities
        Args:
            task_entity: the task to add the input to
            input_data_entity_dict: keys -> input entity names corresponding to the given task as defined in the chosen bottom-level KG schema
                                    values -> list of corresponding data entities to be added as input to the task
        """

        use_cli = input_data_entity_dict is None

        # fetch compatible inputs from KG schema
        results = list(
            get_input_properties_and_inputs(
                self.input_kg,
                self.top_level_schema.namespace_prefix,
                task_entity.parent_entity.iri,
            )
        )

        # task_type_index was incremented when creating the task entity
        # reset the index to match the currently created task's index
        task_type_index = self.task_type_dict[task_entity.type] - 1
        for _, input_entity_iri, data_structure_iri in results:
            input_entity_name = input_entity_iri.split("#")[1]
            if not use_cli:
                input_data_entity_list = input_data_entity_dict[input_entity_name]
            else:
                # use CLI
                print(f"Specify input corresponding to {input_entity_name}")
                input_data_entity_list = get_input_for_existing_data_entities(self.existing_data_entity_list)
                input_data_entity_list += get_input_for_new_data_entities(
                    self.data_semantics_list,
                    self.data_structure_list,
                    namespace,
                    self.data_entity,
                )

            same_input_index = 1
            for input_data_entity in input_data_entity_list:
                # instantiate data entity corresponding to the found input_entity_name
                data_entity_iri = input_entity_iri + str(task_type_index) + "_" + str(same_input_index)
                # instantiate given data entity
                add_data_entity_instance(
                    self.output_kg,
                    self.data,
                    self.top_level_schema.kg,
                    self.top_level_schema.namespace,
                    input_data_entity,
                )
                # instantiate and attach data entity with reference to the given data entity
                data_entity = DataEntity(
                    data_entity_iri,
                    DataEntity(input_entity_iri, self.data_entity),
                    has_reference=input_data_entity.iri,
                    has_data_structure_iri=data_structure_iri,
                )
                add_and_attach_data_entity(
                    self.output_kg,
                    self.data,
                    self.top_level_schema.kg,
                    self.top_level_schema.namespace,
                    data_entity,
                    self.top_level_schema.namespace.hasInput,
                    task_entity,
                )
                task_entity.input_dict[input_entity_name] = data_entity
                same_input_index += 1

                if use_cli:
                    check_kg_executability(self.output_kg)

    def _add_outputs_to_task(self, task_entity: Task) -> None:
        """
        Instantiates and adds output data entities to the given task of self.output_kg, based on the task's definition in the KG schema
        Args:
            task_entity: the task to add the output to
        """
        # fetch compatible outputs from KG schema
        results = list(
            get_output_properties_and_outputs(
                self.input_kg,
                self.top_level_schema.namespace_prefix,
                task_entity.parent_entity.iri,
            )
        )

        # task_type_index was incremented when creating the task entity
        # reset the index to match the currently created task's index
        task_type_index = self.task_type_dict[task_entity.type] - 1
        for output_property, output_parent_entity_iri, data_structure_iri in results:
            # instantiate and add data entity
            output_data_entity_iri = output_parent_entity_iri + str(task_type_index)
            output_data_entity = DataEntity(
                output_data_entity_iri,
                DataEntity(output_parent_entity_iri, self.data_entity),
                has_data_structure_iri=data_structure_iri,
            )
            add_and_attach_data_entity(
                self.output_kg,
                self.data,
                self.top_level_schema.kg,
                self.top_level_schema.namespace,
                output_data_entity,
                self.top_level_schema.namespace.hasOutput,
                task_entity,
            )
            task_entity.output_dict[output_parent_entity_iri.split("#")[1]] = output_data_entity
            self.existing_data_entity_list.append(output_data_entity)

    def _create_next_task_cli(self) -> Union[None, Task]:
        """
        Instantiates and adds task (without method) based on user input to self.output_kg
        Adds task's output data entities to self.existing_data_entity_list
        Returns:
            None: in case user wants to end the pipeline creation
            Task: object of the created task
        """
        print("Please choose the next task")
        for i, t in enumerate(self.atomic_task_list):
            if not self.canvas_task_created and t.name == "PlotTask":
                continue
            if self.canvas_task_created and t.name == "CanvasTask":
                continue
            print(f"\t{str(i)}. {t.name}")
        print(f"\t{str(-1)}. End pipeline")
        next_task_id = int(input())
        if next_task_id == -1:
            return None

        next_task_parent = self.atomic_task_list[next_task_id]
        relation_iri = (
            self.top_level_schema.namespace.hasNextTask
            if self.last_created_task.type != "Pipeline"
            else self.top_level_schema.namespace.hasStartTask
        )  # use relation depending on the previous task

        # instantiate task and link it with the previous one
        task_entity = add_instance_from_parent_with_relation(
            next_task_parent.namespace,
            self.output_kg,
            next_task_parent,
            relation_iri,
            self.last_created_task,
            name_instance(self.task_type_dict, self.method_type_dict, next_task_parent),
        )

        task_entity = Task(task_entity.iri, task_entity.parent_entity)  # create Task object from Entity object's info

        # instantiate and add input data entities to the task based on user input
        self._add_inputs_to_task(next_task_parent.namespace, task_entity)
        # instantiate and add output data entities to the task, as specified in the KG schema
        self._add_outputs_to_task(task_entity)

        self.last_created_task = task_entity
        if task_entity.type == "CanvasTask":
            self.canvas_task_created = True

        return task_entity

    def _create_method(self, task_to_attach_to: Entity) -> None:
        """
        Instantiate and attach method to task of self.output_kg
        Args:
            task_to_attach_to: the task to attach the created method to
        """
        print(f"Please choose a method for {task_to_attach_to.type}:")

        # fetch compatible methods and their properties from KG schema
        results = list(
            get_method_properties_and_methods(
                self.input_kg,
                self.top_level_schema.namespace_prefix,
                task_to_attach_to.parent_entity.iri,
            )
        )
        for i, pair in enumerate(results):
            tmp_method = pair[1].split("#")[1]
            print(f"\t{str(i)}. {tmp_method}")

        method_id = int(input())
        selected_property_and_method = results[method_id]
        method_parent = next(
            filter(
                lambda m: m.iri == selected_property_and_method[1],
                self.atomic_method_list,
            ),
            None,
        )
        # instantiate method and link it with the task using the appropriate selected_property_and_method[0] relation
        add_instance_from_parent_with_relation(
            task_to_attach_to.namespace,
            self.output_kg,
            method_parent,
            selected_property_and_method[0],
            task_to_attach_to,
            name_instance(self.task_type_dict, self.method_type_dict, method_parent),
        )

        # fetch compatible data properties from KG schema
        property_list = get_data_properties_plus_inherited_by_class_iri(self.input_kg, method_parent.iri)

        if property_list:
            print(f"Please enter requested properties for {method_parent.name}:")
            # add data properties to the task with given values
            for pair in property_list:
                property_instance = URIRef(pair[0])
                range = pair[1].split("#")[1]
                range_iri = pair[1]
                input_property = Literal(
                    lexical_or_value=input("\t{} in range({}): ".format(pair[0].split("#")[1], range)),
                    datatype=range_iri,
                )
                add_literal(self.output_kg, task_to_attach_to, property_instance, input_property)

        check_kg_executability(self.output_kg)

    def start_pipeline_creation(self, pipeline_name: str, input_data_path: str) -> None:
        """
        Handles the pipeline creation through CLI
        Args:
            pipeline_name: name for the pipeline
            input_data_path: path for the input data to be used by the pipeline's tasks
        """
        pipeline = create_pipeline_task(
            self.top_level_schema.namespace,
            self.pipeline,
            self.output_kg,
            pipeline_name,
            input_data_path,
        )

        self.last_created_task = pipeline

        while True:
            next_task = self._create_next_task_cli()
            if next_task is None:
                break

            self._create_method(next_task)

    def save_created_kg(self, file_path: str) -> None:
        """
        Saves self.output_kg to a file
        Args:
            file_path: path of the output file
        """
        check_kg_executability(self.output_kg)

        dir_path = os.path.dirname(file_path)
        os.makedirs(dir_path, exist_ok=True)

        self.output_kg.serialize(destination=file_path)
        print(f"Executable KG saved in {file_path}")

    def _property_value_to_field_value(self, property_value: str) -> Union[str, DataEntity]:
        """
        Converts property value to Python class field value
        If property_value is not a data entity's IRI, it is returned as is
        Else, its property values are converted recursively and stored in a DataEntity object
        Args:
            property_value: value of the property as found in KG

        Returns:
            str: property_value parameter as is
            DataEntity: object containing parsed data entity properties
        """
        if "#" in property_value:
            data_entity = self._parse_data_entity_by_iri(property_value)
            if data_entity is None:
                return property_value
            return data_entity

        return property_value

    def _parse_data_entity_by_iri(self, in_out_data_entity_iri: str) -> Optional[DataEntity]:
        """
        Parses an input or output data entity of self.input_kg and stores the parsed info in a Python object
        Args:
            in_out_data_entity_iri: IRI of the KG entity to parse

        Returns:
            None: if given IRI does not belong to an instance of a sub-class of self.top_level_schema.namespace.DataEntity
            DataEntity: object with data entity's parsed properties
        """
        # fetch type of entity with given IRI
        query_result = get_first_query_result_if_exists(
            query_entity_parent_iri,
            self.input_kg,
            in_out_data_entity_iri,
            self.top_level_schema.namespace.DataEntity,
        )
        if query_result is None:
            return None

        data_entity_parent_iri = str(query_result[0])

        # fetch IRI of data entity that is referenced by the given entity
        query_result = get_first_query_result_if_exists(
            query_data_entity_reference_iri,
            self.input_kg,
            self.top_level_schema.namespace_prefix,
            in_out_data_entity_iri,
        )

        if query_result is None:  # no referenced data entity found
            data_entity_ref_iri = in_out_data_entity_iri
        else:
            data_entity_ref_iri = str(query_result[0])

        # create DataEntity object to store all the parsed properties
        data_entity = DataEntity(in_out_data_entity_iri, Entity(data_entity_parent_iri))
        data_entity.has_reference = data_entity_ref_iri.split("#")[1]

        for s, p, o in self.input_kg.triples((URIRef(data_entity_ref_iri), None, None)):
            # parse property name and value
            field_name = property_name_to_field_name(str(p))
            if not hasattr(data_entity, field_name) or field_name == "type":
                continue
            field_value = self._property_value_to_field_value(str(o))
            setattr(data_entity, field_name, field_value)  # set field value dynamically

        return data_entity

    def _parse_task_by_iri(self, task_iri: str, canvas_method: visual_tasks.CanvasTaskCanvasMethod = None) -> Task:
        """
        Parses a task of self.input_kg and stores the info in an object of a sub-class of Task
        The sub-class name and the object's fields are mapped dynamically based on the found KG components
        Args:
            task_iri: IRI of the task to be parsed
            canvas_method: optional object to pass as argument for task object initialization

        Returns:
            Task: object of a sub-class of Task, containing all the parsed info
        """
        # fetch type of entity with given IRI
        query_result = get_first_query_result_if_exists(
            query_entity_parent_iri,
            self.input_kg,
            task_iri,
            self.top_level_schema.namespace.AtomicTask,
        )

        if (
            query_result is None
        ):  # given IRI does not belong to an instance of a sub-class of self.top_level_schema.namespace.AtomicTask
            print(f"Cannot retrieve parent of task with iri {task_iri}. Exiting...")
            exit(1)

        task_parent_iri = str(query_result[0])

        task = Task(task_iri, Task(task_parent_iri))
        method = get_method_by_task_iri(
            self.input_kg,
            self.top_level_schema.namespace_prefix,
            self.top_level_schema.namespace,
            task_iri,
        )
        if method is None:
            print(f"Cannot retrieve method for task with iri: {task_iri}")

        # perform automatic mapping of KG task class to Python sub-class
        class_name = task.type + method.type
        Class = getattr(visual_tasks, class_name, None)
        if Class is None:
            Class = getattr(statistic_tasks, class_name, None)
        if Class is None:
            Class = getattr(ml_tasks, class_name, None)

        # create Task sub-class object
        if canvas_method:
            task = Class(task_iri, Task(task_parent_iri), canvas_method)
        else:
            task = Class(task_iri, Task(task_parent_iri))

        task_related_triples = self.input_kg.triples((URIRef(task_iri), None, None))
        method_related_triples = self.input_kg.triples((URIRef(method.iri), None, None))

        for s, p, o in itertools.chain(task_related_triples, method_related_triples):
            # parse property name and value
            field_name = property_name_to_field_name(str(p))
            if not hasattr(task, field_name) or field_name == "type":
                continue
            field_value = self._property_value_to_field_value(str(o))

            # set field value dynamically
            if field_name == "has_input" or field_name == "has_output":
                getattr(task, field_name).append(field_value)
            else:
                setattr(task, field_name, field_value)

        return task

    def execute_pipeline(self):
        """
        Retrieves and executes pipeline by parsing self.input_kg
        """
        pipeline_iri, input_data_path, next_task_iri = get_pipeline_and_first_task_iri(
            self.input_kg, self.top_level_schema.namespace_prefix
        )
        input_data = pd.read_csv(input_data_path, delimiter=",", encoding="ISO-8859-1")
        canvas_method = None  # stores Task object that corresponds to a task of type CanvasTask
        task_output_dict = {}  # gradually filled with outputs of executed tasks
        while next_task_iri is not None:
            next_task = self._parse_task_by_iri(next_task_iri, canvas_method)
            output = next_task.run_method(task_output_dict, input_data)
            if output:
                task_output_dict.update(output)

            if next_task.type == "CanvasTask":
                canvas_method = next_task

            next_task_iri = next_task.has_next_task

__init__(input_exe_kg_path=None)

Parameters:

Name Type Description Default
input_exe_kg_path str

path of KG to be executed acts as switch for KG execution mode (if filled, mode is on)

None
Source code in exe_kg_lib/classes/exe_kg.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def __init__(self, input_exe_kg_path: str = None):
    """

    Args:
        input_exe_kg_path: path of KG to be executed
                           acts as switch for KG execution mode (if filled, mode is on)
    """
    self.top_level_schema = KGSchema.from_schema_info(KG_SCHEMAS["Data Science"])  # top-level KG schema
    self.bottom_level_schemata = {}

    # top-level KG schema entities
    self.atomic_task = Entity(self.top_level_schema.namespace.AtomicTask)
    self.atomic_method = Entity(self.top_level_schema.namespace.AtomicMethod)
    self.data_entity = Entity(self.top_level_schema.namespace.DataEntity)
    self.pipeline = Entity(self.top_level_schema.namespace.Pipeline)
    self.data = Entity(self.top_level_schema.namespace.Data)
    self.data_semantics = Entity(self.top_level_schema.namespace.DataSemantics)
    self.data_structure = Entity(self.top_level_schema.namespace.DataStructure)

    # self.input_kg: KG eventually filled with 3 KG schemas and the input executable KG in case of KG execution
    self.input_kg = Graph(bind_namespaces="rdflib")
    if input_exe_kg_path:  # KG execution mode
        self.input_kg.parse(input_exe_kg_path, format="n3")  # parse input executable KG
        check_kg_executability(self.input_kg)
        all_ns = [n for n in self.input_kg.namespace_manager.namespaces()]
        bottom_level_schema_info_set = False  # flag indicating that a bottom-level schema was found
        for schema_name, schema_info in KG_SCHEMAS.items():  # search for used bottom-level schema
            if (
                schema_name == "Data Science"  # or schema_name == "Visualization"
            ):  # skip top-level KG schema and Visualization schema that is always used
                continue

            if (schema_info["namespace_prefix"], URIRef(schema_info["namespace"])) in all_ns:
                # bottom-level schema found
                self.bottom_level_schemata[schema_info["namespace_prefix"]] = KGSchema.from_schema_info(schema_info)
                bottom_level_schema_info_set = True

        if not bottom_level_schema_info_set:  # no bottom-level schema found, input executable KG is invalid
            print("Input executable KG did not have any bottom level KG schemas")
            exit(1)
    else:  # KG construction mode
        for schema_name, schema_info in KG_SCHEMAS.items():  # search for used bottom-level schema
            if (
                schema_name == "Data Science"  # or schema_name == "Visualization"
            ):  # skip top-level KG schema and Visualization schema that is always used
                continue

            self.bottom_level_schemata[schema_info["namespace_prefix"]] = KGSchema.from_schema_info(schema_info)

    bottom_level_schemata_kgs = [kg_schema.kg for kg_schema in self.bottom_level_schemata.values()]

    self.input_kg += self.top_level_schema.kg  # + self.visu_schema.kg  # combine all KG schemas in input KG

    for bottom_level_schema_kg in bottom_level_schemata_kgs:
        self.input_kg += bottom_level_schema_kg

    self.output_kg = Graph(bind_namespaces="rdflib")  # KG to be filled while constructing executable KG

    self._bind_used_namespaces([self.input_kg, self.output_kg])

    # below variables are filled in self._parse_kgs()
    self.task_type_dict = {}  # dict for uniquely naming each new pipeline task
    self.method_type_dict = {}  # dict for uniquely naming each new pipeline method
    self.atomic_task_list = []  # list for storing the available sub-classes of ds:AtomicTask
    self.atomic_method_list = []  # list for storing the available sub-classes of ds:AtomicMethod
    self.data_type_list = []  # list for storing the available sub-classes of ds:DataEntity
    self.data_semantics_list = []  # list for storing the available sub-classes of ds:DataSemantics
    self.data_structure_list = []  # list for storing the available sub-classes of ds:DataStructure

    self.existing_data_entity_list = (
        []
    )  # contains existing data entities that are output entities of previous tasks during KG construction
    self.last_created_task = (
        None  # last created pipeline task, for connecting consecutive pipeline tasks during KG construction
    )
    self.canvas_task_created = False  # indicates if canvas task was created during KG construction, and used for hiding the other Visualization tasks in CLI

    self._parse_kgs()

add_task(kg_schema_short, task, input_data_entity_dict, method, properties_dict)

Instantiates and adds a new task entity to self.output_kg Components attached to the task during creation: input and output data entities, and a method with properties Args: kg_schema_short: abbreviated name of the KG schema in which the task and method belong task: task name input_data_entity_dict: keys -> input names of the specified task values -> lists of DataEntity objects to be added as input to the task method: method name properties_dict: keys -> property names of the specified method values -> values to be added as parameters to the method

Returns:

Name Type Description
Task Task

object of the created task

Source code in exe_kg_lib/classes/exe_kg.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def add_task(
    self,
    kg_schema_short: str,
    task: str,
    input_data_entity_dict: Dict[str, List[DataEntity]],
    method: str,
    properties_dict: Dict[str, Union[str, int, float]],
) -> Task:
    """
    Instantiates and adds a new task entity to self.output_kg
    Components attached to the task during creation: input and output data entities, and a method with properties
    Args:
        kg_schema_short: abbreviated name of the KG schema in which the task and method belong
        task: task name
        input_data_entity_dict: keys -> input names of the specified task
                                values -> lists of DataEntity objects to be added as input to the task
        method: method name
        properties_dict: keys -> property names of the specified method
                         values -> values to be added as parameters to the method

    Returns:
        Task: object of the created task
    """
    kg_schema_to_use = self.bottom_level_schemata[kg_schema_short]

    relation_iri = (
        self.top_level_schema.namespace.hasNextTask
        if self.last_created_task.type != "Pipeline"
        else self.top_level_schema.namespace.hasStartTask
    )  # use relation depending on the previous task

    # instantiate task and link it with the previous one
    parent_task = Task(kg_schema_to_use.namespace + task, self.atomic_task)
    added_entity = add_instance_from_parent_with_relation(
        kg_schema_to_use.namespace,
        self.output_kg,
        parent_task,
        relation_iri,
        self.last_created_task,
        name_instance(self.task_type_dict, self.method_type_dict, parent_task),
    )
    next_task = Task.from_entity(added_entity)  # create Task object from Entity object

    # instantiate and add given input data entities to the task
    self._add_inputs_to_task(kg_schema_to_use.namespace, next_task, input_data_entity_dict)
    # instantiate and add output data entities to the task, as specified in the KG schema
    self._add_outputs_to_task(next_task)

    method_parent = Entity(kg_schema_to_use.namespace + method, self.atomic_method)

    # fetch compatible methods and their properties from KG schema
    results = list(
        get_method_properties_and_methods(
            self.input_kg,
            self.top_level_schema.namespace_prefix,
            next_task.parent_entity.iri,
        )
    )

    chosen_property_method = next(
        filter(lambda pair: pair[1].split("#")[1] == method, results), None
    )  # match given method_type with query result
    if chosen_property_method is None:
        print(f"Property connecting task of type {task} with method of type {method} not found")
        exit(1)

    # instantiate method and link it with the task using the appropriate chosen_property_method[0] relation
    method_entity = add_instance_from_parent_with_relation(
        kg_schema_to_use.namespace,
        self.output_kg,
        method_parent,
        chosen_property_method[0],
        next_task,
        name_instance(self.task_type_dict, self.method_type_dict, method_parent),
    )

    # fetch compatible data properties from KG schema
    property_list = get_data_properties_plus_inherited_by_class_iri(self.input_kg, method_parent.iri)

    # add data properties to the task with given values
    for pair in property_list:
        property_iri = pair[0]
        property_name = property_iri.split("#")[1]
        range_iri = pair[1]
        input_property = Literal(
            lexical_or_value=properties_dict[property_name],
            datatype=range_iri,
        )
        add_literal(self.output_kg, method_entity, property_iri, input_property)

    self.last_created_task = next_task  # store created task

    return next_task

create_data_entity(name, source_value, data_semantics_name, data_structure_name)

Creates a DataEntity object Args: name: name of the data entity source_value: name of the data source corresponding to a column of the data data_semantics_name: name of the data semantics entity data_structure_name: name of the data structure entity

Returns:

Name Type Description
DataEntity DataEntity

object initialized with the given parameter values

Source code in exe_kg_lib/classes/exe_kg.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def create_data_entity(
    self,
    name: str,
    source_value: str,
    data_semantics_name: str,
    data_structure_name: str,
) -> DataEntity:
    """
    Creates a DataEntity object
    Args:
        name: name of the data entity
        source_value: name of the data source corresponding to a column of the data
        data_semantics_name: name of the data semantics entity
        data_structure_name: name of the data structure entity

    Returns:
        DataEntity: object initialized with the given parameter values
    """
    return DataEntity(
        self.top_level_schema.namespace + name,
        self.data_entity,
        source_value,
        self.top_level_schema.namespace + data_semantics_name,
        self.top_level_schema.namespace + data_structure_name,
    )

create_pipeline_task(pipeline_name, input_data_path)

Instantiates and adds a new pipeline task entity to self.output_kg Args: pipeline_name: name for the pipeline input_data_path: path for the input data to be used by the pipeline's tasks

Returns:

Name Type Description
Task Task

created pipeline

Source code in exe_kg_lib/classes/exe_kg.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def create_pipeline_task(self, pipeline_name: str, input_data_path: str) -> Task:
    """
    Instantiates and adds a new pipeline task entity to self.output_kg
    Args:
        pipeline_name: name for the pipeline
        input_data_path: path for the input data to be used by the pipeline's tasks

    Returns:
        Task: created pipeline
    """
    pipeline = create_pipeline_task(
        self.top_level_schema.namespace,
        self.pipeline,
        self.output_kg,
        pipeline_name,
        input_data_path,
    )
    self.last_created_task = pipeline
    return pipeline

execute_pipeline()

Retrieves and executes pipeline by parsing self.input_kg

Source code in exe_kg_lib/classes/exe_kg.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
def execute_pipeline(self):
    """
    Retrieves and executes pipeline by parsing self.input_kg
    """
    pipeline_iri, input_data_path, next_task_iri = get_pipeline_and_first_task_iri(
        self.input_kg, self.top_level_schema.namespace_prefix
    )
    input_data = pd.read_csv(input_data_path, delimiter=",", encoding="ISO-8859-1")
    canvas_method = None  # stores Task object that corresponds to a task of type CanvasTask
    task_output_dict = {}  # gradually filled with outputs of executed tasks
    while next_task_iri is not None:
        next_task = self._parse_task_by_iri(next_task_iri, canvas_method)
        output = next_task.run_method(task_output_dict, input_data)
        if output:
            task_output_dict.update(output)

        if next_task.type == "CanvasTask":
            canvas_method = next_task

        next_task_iri = next_task.has_next_task

save_created_kg(file_path)

Saves self.output_kg to a file Args: file_path: path of the output file

Source code in exe_kg_lib/classes/exe_kg.py
572
573
574
575
576
577
578
579
580
581
582
583
584
def save_created_kg(self, file_path: str) -> None:
    """
    Saves self.output_kg to a file
    Args:
        file_path: path of the output file
    """
    check_kg_executability(self.output_kg)

    dir_path = os.path.dirname(file_path)
    os.makedirs(dir_path, exist_ok=True)

    self.output_kg.serialize(destination=file_path)
    print(f"Executable KG saved in {file_path}")

start_pipeline_creation(pipeline_name, input_data_path)

Handles the pipeline creation through CLI Args: pipeline_name: name for the pipeline input_data_path: path for the input data to be used by the pipeline's tasks

Source code in exe_kg_lib/classes/exe_kg.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def start_pipeline_creation(self, pipeline_name: str, input_data_path: str) -> None:
    """
    Handles the pipeline creation through CLI
    Args:
        pipeline_name: name for the pipeline
        input_data_path: path for the input data to be used by the pipeline's tasks
    """
    pipeline = create_pipeline_task(
        self.top_level_schema.namespace,
        self.pipeline,
        self.output_kg,
        pipeline_name,
        input_data_path,
    )

    self.last_created_task = pipeline

    while True:
        next_task = self._create_next_task_cli()
        if next_task is None:
            break

        self._create_method(next_task)

Method

Bases: Entity

Abstraction of owl:class ds:AtomicMethod.

❗ Important for contributors: See Section "Naming conventions" in README.md of "classes.tasks" package before extending the code's functionality.

Source code in exe_kg_lib/classes/method.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class Method(Entity):
    """
    Abstraction of owl:class ds:AtomicMethod.

    ❗ Important for contributors: See Section "Naming conventions" in README.md of "classes.tasks" package before extending the code's functionality.
    """

    def __init__(
        self,
        iri: str,
        parent_entity: Entity,
        module_chain: List[str] = None,
        params_dict: Dict[str, Union[str, int, float]] = None,
        inherited_params_dict: Dict[str, Union[str, int, float]] = None,
    ):
        super().__init__(iri, parent_entity)

        if module_chain is None:
            module_chain = []
        self.module_chain = module_chain  # e.g. ['sklearn','model_selection', 'StratifiedShuffleSplit'] Used for resolving the Python module that contains the method to be executed

        if params_dict is None:
            params_dict = {}
        self.params_dict = params_dict  # used for storing method parameters during KG execution

        if inherited_params_dict is None:
            inherited_params_dict = {}
        self.inherited_params_dict = {}  # used for storing inherited method parameters during KG execution

    def resolve_module(self, module_name_to_snakecase=False) -> Any:
        """
        Resolves and returns the Python module specified by the method module chain.

        Args:
            module_name_to_snakecase (bool, optional): Whether to convert the last module name to snake case.
                                                      Defaults to False.

        Returns:
            Any: The resolved module.

        Raises:
            NotImplementedError: If the method module chain is not defined for the task.
        """
        if not self.module_chain:
            raise NotImplementedError(f"Method module chain not defined for task {self.name}.")

        module_chain = self.module_chain
        if module_name_to_snakecase:
            module_chain = self.module_chain[:-1] + [camel_to_snake(self.module_chain[-1])]

        module_chain_parents = ".".join(module_chain[:-1])
        module_chain_child = module_chain[-1]
        module_container = importlib.import_module(module_chain_parents)
        module = getattr(module_container, module_chain_child)
        return module

resolve_module(module_name_to_snakecase=False)

Resolves and returns the Python module specified by the method module chain.

Parameters:

Name Type Description Default
module_name_to_snakecase bool

Whether to convert the last module name to snake case. Defaults to False.

False

Returns:

Name Type Description
Any Any

The resolved module.

Raises:

Type Description
NotImplementedError

If the method module chain is not defined for the task.

Source code in exe_kg_lib/classes/method.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def resolve_module(self, module_name_to_snakecase=False) -> Any:
    """
    Resolves and returns the Python module specified by the method module chain.

    Args:
        module_name_to_snakecase (bool, optional): Whether to convert the last module name to snake case.
                                                  Defaults to False.

    Returns:
        Any: The resolved module.

    Raises:
        NotImplementedError: If the method module chain is not defined for the task.
    """
    if not self.module_chain:
        raise NotImplementedError(f"Method module chain not defined for task {self.name}.")

    module_chain = self.module_chain
    if module_name_to_snakecase:
        module_chain = self.module_chain[:-1] + [camel_to_snake(self.module_chain[-1])]

    module_chain_parents = ".".join(module_chain[:-1])
    module_chain_child = module_chain[-1]
    module_container = importlib.import_module(module_chain_parents)
    module = getattr(module_container, module_chain_child)
    return module

camel_to_snake(text)

Converts a camel case string to snake case.

Parameters:

Name Type Description Default
text str

The camel case string to be converted.

required

Returns:

Name Type Description
str str

The snake case version of the input string.

Source code in exe_kg_lib/utils/string_utils.py
17
18
19
20
21
22
23
24
25
26
27
28
def camel_to_snake(text: str) -> str:
    """
    Converts a camel case string to snake case.

    Args:
        text (str): The camel case string to be converted.

    Returns:
        str: The snake case version of the input string.
    """
    text = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text)
    return re.sub("([a-z0-9])([A-Z])", r"\1_\2", text).lower()

class_name_to_method_name(class_name)

Converts a class name to a method name by removing the word "Method" from the end of the class name.

Parameters:

Name Type Description Default
class_name str

The class name to convert.

required

Returns:

Name Type Description
str str

The converted method name.

Source code in exe_kg_lib/utils/string_utils.py
61
62
63
64
65
66
67
68
69
70
71
72
def class_name_to_method_name(class_name: str) -> str:
    """
    Converts a class name to a method name by removing the word "Method" from the end of the class name.

    Args:
        class_name (str): The class name to convert.

    Returns:
        str: The converted method name.
    """
    name = re.sub("Method$", "", class_name)
    return name

class_name_to_module_name(class_name)

Converts a class name to a module name by removing the "Module" suffix and converting it to snake case.

Parameters:

Name Type Description Default
class_name str

The class name to convert.

required

Returns:

Name Type Description
str str

The converted module name.

Source code in exe_kg_lib/utils/string_utils.py
47
48
49
50
51
52
53
54
55
56
57
58
def class_name_to_module_name(class_name: str) -> str:
    """
    Converts a class name to a module name by removing the "Module" suffix and converting it to snake case.

    Args:
        class_name (str): The class name to convert.

    Returns:
        str: The converted module name.
    """
    name = re.sub("Module$", "", class_name)
    return camel_to_snake(name)

get_converted_module_hierarchy_chain(kg, namespace_prefix, method_iri)

Retrieves the module hierarchy chain for a given method IRI and converts it to a list of module names.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
namespace_prefix str

The namespace prefix to use in queries.

required
method_iri str

The IRI of the method.

required

Returns:

Name Type Description
List List

The list of module names in the module hierarchy chain, in the correct order.

Source code in exe_kg_lib/utils/query_utils.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def get_converted_module_hierarchy_chain(
    kg: Graph,
    namespace_prefix: str,
    method_iri: str,
) -> List:
    """
    Retrieves the module hierarchy chain for a given method IRI and converts it to a list of module names.

    Args:
        kg (Graph): The knowledge graph to query.
        namespace_prefix (str): The namespace prefix to use in queries.
        method_iri (str): The IRI of the method.

    Returns:
        List: The list of module names in the module hierarchy chain, in the correct order.
    """
    module_chain_names = None
    try:
        module_chain_names = get_module_hierarchy_chain(kg, namespace_prefix, method_iri)
    except NoResultsError:
        print(f"Cannot retrieve module chain for method class: {method_iri}. Proceeding without it...")

    if module_chain_names:
        # convert KG class names to module names and reverse the module chain to store it in the correct order
        module_chain_names = [class_name_to_module_name(name) for name in module_chain_names]
        module_chain_names = [class_name_to_method_name(method_iri.split("#")[-1])] + module_chain_names
        module_chain_names.reverse()

    return module_chain_names

get_first_query_result_if_exists(query_method, *args)

Executes the given query method with the provided arguments and returns the first result if it exists.

Parameters:

Name Type Description Default
query_method Callable

The query method to execute.

required
*args

Variable number of arguments to pass to the query method.

()

Returns:

Type Description
Optional[str]

Optional[str]: The first query result if it exists, otherwise None.

Source code in exe_kg_lib/utils/query_utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def get_first_query_result_if_exists(query_method: Callable, *args) -> Optional[str]:
    """
    Executes the given query method with the provided arguments and returns the first result if it exists.

    Args:
        query_method (Callable): The query method to execute.
        *args: Variable number of arguments to pass to the query method.

    Returns:
        Optional[str]: The first query result if it exists, otherwise None.
    """
    query_result = next(
        iter(list(query_method(*args))),
        None,
    )

    if query_result is None:
        return None

    return query_result

get_grouped_inherited_inputs(input_kg, namespace_prefix, entity_iri)

Retrieves the inherited inputs for a given entity, grouped by data entity IRI.

Parameters:

Name Type Description Default
input_kg Graph

The input knowledge graph.

required
namespace_prefix str

The namespace prefix for the entity.

required
entity_iri str

The IRI of the entity.

required

Returns:

Type Description
List[Tuple[str, List[str]]]

List[Tuple[str, List[str]]]: A list of tuples, where each tuple contains a property name and a list of input values.

Source code in exe_kg_lib/utils/query_utils.py
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
def get_grouped_inherited_inputs(
    input_kg: Graph, namespace_prefix: str, entity_iri: str
) -> List[Tuple[str, List[str]]]:
    """
    Retrieves the inherited inputs for a given entity, grouped by data entity IRI.

    Args:
        input_kg (Graph): The input knowledge graph.
        namespace_prefix (str): The namespace prefix for the entity.
        entity_iri (str): The IRI of the entity.

    Returns:
        List[Tuple[str, List[str]]]: A list of tuples, where each tuple contains a property name and a list of input values.

    """
    property_list = list(query_inherited_inputs(input_kg, namespace_prefix, entity_iri))
    property_list = sorted(property_list, key=lambda elem: elem[0])  # prepare for grouping
    property_list = [
        (key, [(elem[1], elem[2]) for elem in group])
        for key, group in itertools.groupby(property_list, key=lambda elem: elem[0])
    ]

    return property_list

get_grouped_inherited_outputs(input_kg, namespace_prefix, entity_iri)

Retrieves the inherited outputs for a given entity, grouped by data entity IRI.

Parameters:

Name Type Description Default
input_kg Graph

The input knowledge graph.

required
namespace_prefix str

The namespace prefix for the entity.

required
entity_iri str

The IRI of the entity.

required

Returns:

Type Description
List[Tuple[str, List[str]]]

List[Tuple[str, List[str]]]: A list of tuples, where each tuple contains a property name and a list of input values.

Source code in exe_kg_lib/utils/query_utils.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def get_grouped_inherited_outputs(
    input_kg: Graph, namespace_prefix: str, entity_iri: str
) -> List[Tuple[str, List[str]]]:
    """
    Retrieves the inherited outputs for a given entity, grouped by data entity IRI.

    Args:
        input_kg (Graph): The input knowledge graph.
        namespace_prefix (str): The namespace prefix for the entity.
        entity_iri (str): The IRI of the entity.

    Returns:
        List[Tuple[str, List[str]]]: A list of tuples, where each tuple contains a property name and a list of input values.
    """
    property_list = list(query_inherited_outputs(input_kg, namespace_prefix, entity_iri))
    property_list = sorted(property_list, key=lambda elem: elem[0])  # prepare for grouping
    property_list = [
        (key, [(elem[1], elem[2]) for elem in group])
        for key, group in itertools.groupby(property_list, key=lambda elem: elem[0])
    ]

    return property_list

get_method_grouped_params(method_iri, namespace_prefix, kg, inherited=False)

Retrieves the (inherited) parameters for a given method, grouped by property IRI.

Parameters:

Name Type Description Default
method_iri str

The IRI of the method.

required
namespace_prefix str

The namespace prefix.

required
kg Graph

The knowledge graph.

required

Returns:

Type Description
List[Tuple[str, List[str]]]

List[Tuple[str, List[str]]]: A list of tuples, where each tuple contains a parameter name and a list of its values.

Source code in exe_kg_lib/utils/query_utils.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def get_method_grouped_params(
    method_iri: str, namespace_prefix: str, kg: Graph, inherited: bool = False
) -> List[Tuple[str, List[str]]]:
    """
    Retrieves the (inherited) parameters for a given method, grouped by property IRI.

    Args:
        method_iri (str): The IRI of the method.
        namespace_prefix (str): The namespace prefix.
        kg (Graph): The knowledge graph.

    Returns:
        List[Tuple[str, List[str]]]: A list of tuples, where each tuple contains a parameter name and a list of its values.
    """
    property_list = list(query_method_params_plus_inherited(method_iri, namespace_prefix, kg, inherited))
    property_list = sorted(property_list, key=lambda elem: elem[0])  # prepare for grouping
    property_list = [
        (key, [pair[1] for pair in group]) for key, group in itertools.groupby(property_list, lambda elem: elem[0])
    ]

    return property_list

get_module_hierarchy_chain(kg, namespace_prefix, method_iri)

Retrieves the hierarchy chain of the modules starting from the module connected to the given method IRI.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph.

required
namespace_prefix str

The namespace prefix of the module.

required
method_iri str

The IRI of the method.

required

Returns:

Name Type Description
List List

The hierarchy chain of the module, represented as a list of module names.

Raises:

Type Description
NoResultsError

If the method doesn't have a subclass that is a subclass of {namespace_prefix}:Module.

Source code in exe_kg_lib/utils/query_utils.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def get_module_hierarchy_chain(
    kg: Graph,
    namespace_prefix: str,
    method_iri: str,
) -> List:
    """
    Retrieves the hierarchy chain of the modules starting from the module connected to the given method IRI.

    Args:
        kg (Graph): The knowledge graph.
        namespace_prefix (str): The namespace prefix of the module.
        method_iri (str): The IRI of the method.

    Returns:
        List: The hierarchy chain of the module, represented as a list of module names.

    Raises:
        NoResultsError: If the method doesn't have a subclass that is a subclass of {namespace_prefix}:Module.
    """

    query_result = get_first_query_result_if_exists(
        query_module_iri_by_method_iri,
        kg,
        method_iri,
        namespace_prefix,
    )

    if query_result is None:
        raise NoResultsError(
            f"Method with IRI {method_iri} doesn't have a subclass that is subclass of {namespace_prefix}:Module"
        )

    module_iri = str(query_result[0])
    module_chain_query_res = list(query_hierarchy_chain(kg, module_iri))
    module_chain_query_res = [str(x[0]) for x in module_chain_query_res]
    module_chain_iris = [module_iri] + module_chain_query_res[:-1]
    module_chain_names = [iri.split("#")[-1] for iri in module_chain_iris]

    return module_chain_names

query_data_entity_reference_iri(kg, namespace_prefix, entity_iri)

Queries the knowledge graph for the reference IRIs associated with a given entity.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
namespace_prefix str

The namespace prefix used in the query.

required
entity_iri str

The IRI of the entity to query.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def query_data_entity_reference_iri(kg: Graph, namespace_prefix, entity_iri: str) -> query.Result:
    """
    Queries the knowledge graph for the reference IRIs associated with a given entity.

    Args:
        kg (Graph): The knowledge graph to query.
        namespace_prefix (str): The namespace prefix used in the query.
        entity_iri (str): The IRI of the entity to query.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?r WHERE {{ ?entity {namespace_prefix}:hasReference ?r . }}",
        initBindings={
            "entity": URIRef(entity_iri),
        },
    )

query_hierarchy_chain(kg, entity_iri)

Queries the class hierarchy chain of a given entity in a knowledge graph.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
entity_iri str

The IRI of the entity.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def query_hierarchy_chain(kg: Graph, entity_iri: str) -> query.Result:
    """
    Queries the class hierarchy chain of a given entity in a knowledge graph.

    Args:
        kg (Graph): The knowledge graph to query.
        entity_iri (str): The IRI of the entity.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?m2 WHERE {{ ?m1 rdfs:subClassOf+ ?m2 . }}",
        initBindings={
            "m1": URIRef(entity_iri),
        },
    )

query_inherited_inputs(input_kg, namespace_prefix, entity_iri)

Queries the input knowledge graph to find (inherited) inputs, their structure and the properties that connect them to the given entity.

Parameters:

Name Type Description Default
input_kg Graph

The input knowledge graph.

required
namespace_prefix str

The namespace prefix used in the SPARQL query.

required
entity_iri str

The IRI of the entity for which inherited inputs are to be found.

required

Returns:

Type Description
Result

query.Result: The result of the SPARQL query.

Source code in exe_kg_lib/utils/query_utils.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def query_inherited_inputs(input_kg: Graph, namespace_prefix: str, entity_iri: str) -> query.Result:
    """
    Queries the input knowledge graph to find (inherited) inputs, their structure and the properties that connect them to the given entity.

    Args:
        input_kg (Graph): The input knowledge graph.
        namespace_prefix (str): The namespace prefix used in the SPARQL query.
        entity_iri (str): The IRI of the entity for which inherited inputs are to be found.

    Returns:
        query.Result: The result of the SPARQL query.

    """
    return input_kg.query(
        "\nSELECT ?m ?s ?p WHERE {?entity_iri rdfs:subClassOf* ?parent . "
        "?p rdfs:domain ?parent ."
        "?p rdfs:range ?m ."
        "?p rdfs:subPropertyOf+ " + namespace_prefix + ":hasInput ."
        "OPTIONAL { ?m rdfs:subClassOf ?s . }"
        "OPTIONAL { ?s rdfs:subClassOf+ " + namespace_prefix + ":DataStructure . }"
        "FILTER(?s != " + namespace_prefix + ":DataEntity) . }",
        initBindings={"entity_iri": URIRef(entity_iri)},
    )

query_inherited_outputs(input_kg, namespace_prefix, entity_iri)

Queries the input knowledge graph to find (inherited) outputs, their structure and the properties that connect them to the given entity.

Parameters:

Name Type Description Default
input_kg Graph

The input knowledge graph.

required
namespace_prefix str

The namespace prefix used in the SPARQL query.

required
entity_iri str

The IRI of the entity for which inherited inputs are to be found.

required

Returns:

Type Description
Result

query.Result: The result of the SPARQL query.

Source code in exe_kg_lib/utils/query_utils.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def query_inherited_outputs(input_kg: Graph, namespace_prefix: str, entity_iri: str) -> query.Result:
    """
    Queries the input knowledge graph to find (inherited) outputs, their structure and the properties that connect them to the given entity.

    Args:
        input_kg (Graph): The input knowledge graph.
        namespace_prefix (str): The namespace prefix used in the SPARQL query.
        entity_iri (str): The IRI of the entity for which inherited inputs are to be found.

    Returns:
        query.Result: The result of the SPARQL query.

    """
    return input_kg.query(
        "\nSELECT ?m ?s ?p WHERE {?entity_iri rdfs:subClassOf* ?parent . "
        "?p rdfs:domain ?parent ."
        "?p rdfs:range ?m ."
        "?p rdfs:subPropertyOf+ " + namespace_prefix + ":hasOutput ."
        "?m rdfs:subClassOf ?s ."
        "?s rdfs:subClassOf+ " + namespace_prefix + ":DataStructure . "
        "FILTER(?s != " + namespace_prefix + ":DataEntity) . }",
        initBindings={"entity_iri": URIRef(entity_iri)},
    )

query_input_triples(kg, namespace_prefix, entity_iri)

Queries the triples that connect the given entity with its inputs.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
namespace_prefix str

The namespace prefix used in the query.

required
entity_iri str

The IRI of the entity to query input triples for.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def query_input_triples(kg: Graph, namespace_prefix: str, entity_iri: str) -> query.Result:
    """
    Queries the triples that connect the given entity with its inputs.

    Args:
        kg (Graph): The knowledge graph to query.
        namespace_prefix (str): The namespace prefix used in the query.
        entity_iri (str): The IRI of the entity to query input triples for.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"""
        SELECT DISTINCT ?s ?p ?o
        WHERE {{
            {{ ?s ?p ?o . FILTER(?p = {namespace_prefix}:hasInput) }}
            UNION
            {{ ?s ?p ?o . ?p rdfs:subPropertyOf* {namespace_prefix}:hasInput . }}
        }}
        """,
        initBindings={"s": URIRef(entity_iri)},
    )

query_instance_parent_iri(kg, entity_iri, upper_class_uri_ref, negation_of_inheritance=False)

Queries the knowledge graph to find the types of a given entity, that are subclasses of a given upper class.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
entity_iri str

The IRI of the entity.

required
upper_class_uri_ref URIRef

The URI reference of the upper class.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def query_instance_parent_iri(
    kg: Graph, entity_iri: str, upper_class_uri_ref: URIRef, negation_of_inheritance: bool = False
) -> query.Result:
    """
    Queries the knowledge graph to find the types of a given entity, that are subclasses of a given upper class.

    Args:
        kg (Graph): The knowledge graph to query.
        entity_iri (str): The IRI of the entity.
        upper_class_uri_ref (URIRef): The URI reference of the upper class.

    Returns:
        query.Result: The result of the query.
    """
    query_string = f"SELECT ?t WHERE {{ ?entity rdf:type ?t ."

    if negation_of_inheritance:
        query_string += f"FILTER NOT EXISTS {{ ?t rdfs:subClassOf* ?upper_class . }} }}"
    else:
        query_string += f"?t rdfs:subClassOf* ?upper_class . }}"

    return kg.query(
        query_string,
        initBindings={
            "entity": URIRef(entity_iri),
            "upper_class": upper_class_uri_ref,
        },
    )

query_linked_task_and_property(kg, namespace_prefix, method_iri)

Queries the linked task and linking property based on the given method IRI.

Parameters:

Name Type Description Default
kg Graph

The RDF graph to query.

required
namespace_prefix str

The namespace prefix for the AtomicTask.

required
method_iri str

The IRI of the method.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def query_linked_task_and_property(kg: Graph, namespace_prefix, method_iri: str) -> query.Result:
    """
    Queries the linked task and linking property based on the given method IRI.

    Args:
        kg (Graph): The RDF graph to query.
        namespace_prefix (str): The namespace prefix for the AtomicTask.
        method_iri (str): The IRI of the method.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?task WHERE {{ ?task ?m_property ?m ."
        f"                      ?task rdfs:subPropertyOf* {namespace_prefix}:AtomicTask .}}",
        initBindings={"m": URIRef(method_iri)},
    )

query_method_iri_by_task_iri(kg, namespace_prefix, task_iri)

Queries the method IRI associated with a given task IRI.

Parameters:

Name Type Description Default
kg Graph

The RDF graph to query.

required
namespace_prefix str

The namespace prefix for the method property.

required
task_iri str

The IRI of the task.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def query_method_iri_by_task_iri(kg: Graph, namespace_prefix, task_iri: str) -> query.Result:
    """
    Queries the method IRI associated with a given task IRI.

    Args:
        kg (Graph): The RDF graph to query.
        namespace_prefix (str): The namespace prefix for the method property.
        task_iri (str): The IRI of the task.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?m WHERE {{ ?task ?m_property ?m ."
        f"                   ?m_property rdfs:subPropertyOf* {namespace_prefix}:hasMethod .}}",
        initBindings={"task": URIRef(task_iri)},
    )

query_method_params(method_iri, namespace_prefix, kg)

Queries the parameters and their ranges for a given method IRI.

Parameters:

Name Type Description Default
method_iri str

The IRI (Internationalized Resource Identifier) of the method.

required
namespace_prefix str

The namespace prefix used in the knowledge graph.

required
kg Graph

The knowledge graph to query.

required

Returns:

Type Description
Result

query.Result: The result of the query, containing the parameters of the method.

Source code in exe_kg_lib/utils/query_utils.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def query_method_params(method_iri: str, namespace_prefix: str, kg: Graph) -> query.Result:
    """
    Queries the parameters and their ranges for a given method IRI.

    Args:
        method_iri (str): The IRI (Internationalized Resource Identifier) of the method.
        namespace_prefix (str): The namespace prefix used in the knowledge graph.
        kg (Graph): The knowledge graph to query.

    Returns:
        query.Result: The result of the query, containing the parameters of the method.
    """
    return kg.query(
        f"\nSELECT ?p ?r WHERE {{?p rdfs:domain ?task_iri . "
        f"?p rdfs:range ?r . "
        f"?p rdfs:subPropertyOf {namespace_prefix}:hasParameter . }}",
        initBindings={"task_iri": URIRef(method_iri)},
    )

query_method_params_plus_inherited(method_iri, namespace_prefix, kg, inherited=False)

Queries the parameters and their ranges for a given method IRI, including inherited parameters.

Parameters:

Name Type Description Default
method_iri str

The IRI of the method.

required
namespace_prefix str

The namespace prefix for the hasParameter property.

required
kg Graph

The RDF graph to query.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def query_method_params_plus_inherited(
    method_iri: str, namespace_prefix: str, kg: Graph, inherited=False
) -> query.Result:
    """
    Queries the parameters and their ranges for a given method IRI, including inherited parameters.

    Args:
        method_iri (str): The IRI of the method.
        namespace_prefix (str): The namespace prefix for the `hasParameter` property.
        kg (Graph): The RDF graph to query.

    Returns:
        query.Result: The result of the query.
    """
    if inherited:
        return kg.query(
            f"\nSELECT ?p ?r WHERE {{?p rdfs:domain ?domain . "
            f"?method_iri rdfs:subClassOf* ?domain . "
            f"?p rdfs:range ?r . "
            f"?p rdfs:subPropertyOf {namespace_prefix}:hasParameter . }}",
            initBindings={"method_iri": URIRef(method_iri)},
        )

    return kg.query(
        f"\nSELECT ?p ?r WHERE {{?p rdfs:domain ?method_iri . "
        f"?p rdfs:range ?r . "
        f"?p rdfs:subPropertyOf {namespace_prefix}:hasParameter . }}",
        initBindings={"method_iri": URIRef(method_iri)},
    )

query_method_properties_and_methods(input_kg, namespace_prefix, entity_parent_iri)

Queries the input knowledge graph for methods and the properties that connect them to the given entity.

Parameters:

Name Type Description Default
input_kg Graph

The input knowledge graph to query.

required
namespace_prefix str

The namespace prefix used in the query.

required
entity_parent_iri str

The IRI of the parent entity.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def query_method_properties_and_methods(input_kg: Graph, namespace_prefix: str, entity_parent_iri: str) -> query.Result:
    """
    Queries the input knowledge graph for methods and the properties that connect them to the given entity.

    Args:
        input_kg (Graph): The input knowledge graph to query.
        namespace_prefix (str): The namespace prefix used in the query.
        entity_parent_iri (str): The IRI of the parent entity.

    Returns:
        query.Result: The result of the query.
    """
    return input_kg.query(
        "\nSELECT ?p ?m WHERE {?p rdfs:domain ?entity_iri . "
        "?p rdfs:range ?m . "
        "?m rdfs:subClassOf " + namespace_prefix + ":AtomicMethod . }",
        initBindings={"entity_iri": URIRef(entity_parent_iri)},
    )

query_module_iri_by_method_iri(kg, method_iri, namespace_prefix)

Queries the knowledge graph to retrieve the module IRI associated with a given method IRI.

Parameters:

Name Type Description Default
kg Graph

The Knowledge Graph to query.

required
method_iri str

The IRI of the method.

required
namespace_prefix str

The namespace prefix used in the query.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def query_module_iri_by_method_iri(
    kg: Graph,
    method_iri: str,
    namespace_prefix: str,
) -> query.Result:
    """
    Queries the knowledge graph to retrieve the module IRI associated with a given method IRI.

    Args:
        kg (Graph): The Knowledge Graph to query.
        method_iri (str): The IRI of the method.
        namespace_prefix (str): The namespace prefix used in the query.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?module WHERE {{ ?method rdfs:subClassOf ?module . "
        f"                        ?module rdfs:subClassOf+ {namespace_prefix}:Module . "
        f"                        FILTER NOT EXISTS {{ ?module rdfs:subClassOf+ {namespace_prefix}:Method . }} . }}",
        initBindings={"method": URIRef(method_iri)},
    )

query_output_triples(kg, namespace_prefix, entity_iri)

Queries the triples that connect the given entity with its outputs.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
namespace_prefix str

The namespace prefix used in the query.

required
entity_iri str

The IRI of the entity to query input triples for.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def query_output_triples(kg: Graph, namespace_prefix: str, entity_iri: str) -> query.Result:
    """
    Queries the triples that connect the given entity with its outputs.

    Args:
        kg (Graph): The knowledge graph to query.
        namespace_prefix (str): The namespace prefix used in the query.
        entity_iri (str): The IRI of the entity to query input triples for.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"""
        SELECT DISTINCT ?s ?p ?o
        WHERE {{
            {{ ?s ?p ?o . FILTER(?p = {namespace_prefix}:hasOutput) }}
            UNION
            {{ ?s ?p ?o . ?p rdfs:subPropertyOf* {namespace_prefix}:hasOutput . }}
        }}
        """,
        initBindings={"s": URIRef(entity_iri)},
    )

query_parameters_triples(kg, namespace_prefix, entity_iri)

Queries the triples that connect the given entity with its parameters.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
namespace_prefix str

The namespace prefix used in the query.

required
entity_iri str

The IRI of the entity to query input triples for.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def query_parameters_triples(kg: Graph, namespace_prefix: str, entity_iri: str) -> query.Result:
    """
    Queries the triples that connect the given entity with its parameters.

    Args:
        kg (Graph): The knowledge graph to query.
        namespace_prefix (str): The namespace prefix used in the query.
        entity_iri (str): The IRI of the entity to query input triples for.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"""
        SELECT ?s ?p ?o
        WHERE {{
            {{ ?s ?p ?o . ?p rdfs:subPropertyOf* {namespace_prefix}:hasParameter . }}
        }}
        """,
        initBindings={"s": URIRef(entity_iri)},
    )

query_parent_classes(kg, entity_iri)

Queries the knowledge graph to retrieve the parent classes of a given entity.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
entity_iri str

The IRI of the entity.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def query_parent_classes(kg: Graph, entity_iri: str) -> query.Result:
    """
    Queries the knowledge graph to retrieve the parent classes of a given entity.

    Args:
        kg (Graph): The knowledge graph to query.
        entity_iri (str): The IRI of the entity.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?c WHERE {{ ?entity rdfs:subClassOf ?c . }}",
        initBindings={"entity": URIRef(entity_iri)},
    )

query_pipeline_info(kg, namespace_prefix)

Queries the knowledge graph for pipeline information.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
namespace_prefix str

The namespace prefix used in the query.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def query_pipeline_info(kg: Graph, namespace_prefix: str) -> query.Result:
    """
    Queries the knowledge graph for pipeline information.

    Args:
        kg (Graph): The knowledge graph to query.
        namespace_prefix (str): The namespace prefix used in the query.

    Returns:
        query.Result: The result of the query.

    """
    return kg.query(
        f"\nSELECT ?p ?i ?o ?t WHERE {{?p rdf:type {namespace_prefix}:Pipeline ;"
        f"                          {namespace_prefix}:hasInputDataPath ?i ;"
        f"                          {namespace_prefix}:hasPlotsOutputDir ?o ;"
        f"                          {namespace_prefix}:hasStartTask ?t . }}"
    )

query_subclasses_of(class_iri, kg)

Queries the knowledge graph to retrieve the subclasses of a given class.

Parameters:

Name Type Description Default
class_iri str

The IRI of the class.

required
kg Graph

The knowledge graph to query.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def query_subclasses_of(class_iri: str, kg: Graph) -> query.Result:
    """
    Queries the knowledge graph to retrieve the subclasses of a given class.

    Args:
        class_iri (str): The IRI of the class.
        kg (Graph): The knowledge graph to query.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        "\nSELECT ?t WHERE {?t rdfs:subClassOf ?class_iri . }",
        initBindings={"class_iri": class_iri},
    )

query_top_level_task_iri(kg, task_iri, namespace_prefix)

Queries the knowledge graph to find the top-level task for a given task.

Parameters:

Name Type Description Default
kg Graph

The knowledge graph to query.

required
task_iri str

The IRI of the task.

required
namespace_prefix str

The namespace prefix used in the query.

required

Returns:

Type Description
Result

query.Result: The result of the query.

Source code in exe_kg_lib/utils/query_utils.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def query_top_level_task_iri(kg: Graph, task_iri: str, namespace_prefix: str) -> query.Result:
    """
    Queries the knowledge graph to find the top-level task for a given task.

    Args:
        kg (Graph): The knowledge graph to query.
        task_iri (str): The IRI of the task.
        namespace_prefix (str): The namespace prefix used in the query.

    Returns:
        query.Result: The result of the query.
    """
    return kg.query(
        f"SELECT ?t2 WHERE {{ ?t1 rdfs:subClassOf* ?t2 ."
        f"                    ?t2 rdfs:subClassOf {namespace_prefix}:Task . "
        f"                    FILTER(?t2 != {namespace_prefix}:AtomicTask) . }}",
        initBindings={
            "t1": URIRef(task_iri),
        },
    )