Skip to content

task

Task

Bases: Entity

Abstraction of owl:class ds:AtomicTask.

❗ Important for contributors: See Section "Naming conventions" in README.md of "classes.tasks" package before extending the code's functionality.

Source code in exe_kg_lib/classes/task.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class Task(Entity):
    """
    Abstraction of owl:class ds:AtomicTask.

    ❗ Important for contributors: See Section "Naming conventions" in README.md of "classes.tasks" package before extending the code's functionality.
    """

    def __init__(
        self,
        iri: str,
        parent_entity: Entity = None,
    ):
        super().__init__(iri, parent_entity)
        self.next_task = None  # used for storing the next Task in the pipeline
        self.method = None  # used for storing the method of the Task
        self.inputs = []  # used for storing input DataEntity objects during KG execution
        self.outputs = []  # used for storing output DataEntity objects during KG execution
        self.input_dict = {}  # used for storing input DataEntity objects during KG creation
        self.output_dict = {}  # used for storing output DataEntity objects during KG creation

    @classmethod
    def from_entity(cls, entity: Entity):
        return cls(entity.iri, entity.parent_entity)

    def create_output_dict(self, keyword_value_dict: dict) -> dict:
        """
        For each key in keyword_value_dict, checks if the key exists in an output name of the Task.
        If yes, adds the output name with its value to out_dict.
        Args:
            keyword_value_dict: key-value pairs where key is a keyword to find in an output name of the Task
                                  and value is the value corresponding to that output name

        Returns:
            dict: pairs of Task's output names and corresponding output values
        """
        if len(self.outputs) == 0:
            # assume one output and use task name as key
            return {self.name: list(keyword_value_dict.values())[0]}

        output_names = [has_output_elem.name for has_output_elem in self.outputs]
        out_dict = {}
        for output_name in output_names:
            for key, value in keyword_value_dict.items():
                if key in output_name:
                    out_dict[output_name] = value

        return out_dict

    def get_inputs(
        self, dict_to_search: dict, fallback_df: pd.DataFrame
    ) -> Dict[str, Dict[str, Union[pd.DataFrame, Method]]]:
        """
        For each input of the Task:
            - If the input is a DataEntity: Searches for the input reference name in dict_to_search. If not found, uses fallback_df.
            - If the input is a Method: Uses the input type as the input name and the input itself as the input value.

        Args:
            dict_to_search: contains key-value pairs where key is a possible input name and value is its corresponding value
            fallback_df: contains data to return as an alternative

        Returns:
            Dict[str, Dict[str, Union[pd.DataFrame, Method]]]: dictionary with input types as keys and dictionaries with input names and values as values
        """
        input_dict = {}
        inputs_sorted = sorted(self.inputs, key=lambda x: x.name)
        for input in inputs_sorted:
            if input.type not in input_dict:
                input_dict[input.type] = []
            if isinstance(input, DataEntity):
                input_name = input.reference
                try:
                    input_value = dict_to_search[input.reference]
                except KeyError:
                    input_value = fallback_df.loc[:, [input.source]]
            elif isinstance(input, Method):
                input_name = input.type
                input_value = input

            input_dict[input.type].append({"name": input_name, "value": input_value})

        return input_dict

    @abstractmethod
    def run_method(self, *args):
        """
        Abstract method to be implemented by Task sub-classes that are in the bottom of the hierarchy.
        Executes the logic that is needed to fulfill the Task.
        Args:
            *args: defined by sub-classes
        """
        raise NotImplementedError

create_output_dict(keyword_value_dict)

For each key in keyword_value_dict, checks if the key exists in an output name of the Task. If yes, adds the output name with its value to out_dict. Args: keyword_value_dict: key-value pairs where key is a keyword to find in an output name of the Task and value is the value corresponding to that output name

Returns:

Name Type Description
dict dict

pairs of Task's output names and corresponding output values

Source code in exe_kg_lib/classes/task.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def create_output_dict(self, keyword_value_dict: dict) -> dict:
    """
    For each key in keyword_value_dict, checks if the key exists in an output name of the Task.
    If yes, adds the output name with its value to out_dict.
    Args:
        keyword_value_dict: key-value pairs where key is a keyword to find in an output name of the Task
                              and value is the value corresponding to that output name

    Returns:
        dict: pairs of Task's output names and corresponding output values
    """
    if len(self.outputs) == 0:
        # assume one output and use task name as key
        return {self.name: list(keyword_value_dict.values())[0]}

    output_names = [has_output_elem.name for has_output_elem in self.outputs]
    out_dict = {}
    for output_name in output_names:
        for key, value in keyword_value_dict.items():
            if key in output_name:
                out_dict[output_name] = value

    return out_dict

get_inputs(dict_to_search, fallback_df)

For each input of the Task
  • If the input is a DataEntity: Searches for the input reference name in dict_to_search. If not found, uses fallback_df.
  • If the input is a Method: Uses the input type as the input name and the input itself as the input value.

Parameters:

Name Type Description Default
dict_to_search dict

contains key-value pairs where key is a possible input name and value is its corresponding value

required
fallback_df DataFrame

contains data to return as an alternative

required

Returns:

Type Description
Dict[str, Dict[str, Union[DataFrame, Method]]]

Dict[str, Dict[str, Union[pd.DataFrame, Method]]]: dictionary with input types as keys and dictionaries with input names and values as values

Source code in exe_kg_lib/classes/task.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_inputs(
    self, dict_to_search: dict, fallback_df: pd.DataFrame
) -> Dict[str, Dict[str, Union[pd.DataFrame, Method]]]:
    """
    For each input of the Task:
        - If the input is a DataEntity: Searches for the input reference name in dict_to_search. If not found, uses fallback_df.
        - If the input is a Method: Uses the input type as the input name and the input itself as the input value.

    Args:
        dict_to_search: contains key-value pairs where key is a possible input name and value is its corresponding value
        fallback_df: contains data to return as an alternative

    Returns:
        Dict[str, Dict[str, Union[pd.DataFrame, Method]]]: dictionary with input types as keys and dictionaries with input names and values as values
    """
    input_dict = {}
    inputs_sorted = sorted(self.inputs, key=lambda x: x.name)
    for input in inputs_sorted:
        if input.type not in input_dict:
            input_dict[input.type] = []
        if isinstance(input, DataEntity):
            input_name = input.reference
            try:
                input_value = dict_to_search[input.reference]
            except KeyError:
                input_value = fallback_df.loc[:, [input.source]]
        elif isinstance(input, Method):
            input_name = input.type
            input_value = input

        input_dict[input.type].append({"name": input_name, "value": input_value})

    return input_dict

run_method(*args) abstractmethod

Abstract method to be implemented by Task sub-classes that are in the bottom of the hierarchy. Executes the logic that is needed to fulfill the Task. Args: *args: defined by sub-classes

Source code in exe_kg_lib/classes/task.py
 97
 98
 99
100
101
102
103
104
105
@abstractmethod
def run_method(self, *args):
    """
    Abstract method to be implemented by Task sub-classes that are in the bottom of the hierarchy.
    Executes the logic that is needed to fulfill the Task.
    Args:
        *args: defined by sub-classes
    """
    raise NotImplementedError