diff --git a/python_workflow_definition/src/python_workflow_definition/pyiron_workflow.py b/python_workflow_definition/src/python_workflow_definition/pyiron_workflow.py index 1756b93..2347550 100644 --- a/python_workflow_definition/src/python_workflow_definition/pyiron_workflow.py +++ b/python_workflow_definition/src/python_workflow_definition/pyiron_workflow.py @@ -1,9 +1,10 @@ from inspect import isfunction from importlib import import_module -from typing import Optional +from typing import Any, Optional import numpy as np from pyiron_workflow import function_node, Workflow +from pyiron_workflow.api import Function from python_workflow_definition.models import PythonWorkflowDefinitionWorkflow from python_workflow_definition.shared import ( @@ -312,3 +313,62 @@ def load_workflow_json(file_name: str, workflow: Optional[Workflow] = None): total_dict=total_dict, node_conversion_dict=node_conversion_dict, ) + + +def import_from_string(library_path: str) -> Any: + # Copied from bagofholding + split_path = library_path.split(".", 1) + if len(split_path) == 1: + module_name, path = split_path[0], "" + else: + module_name, path = split_path + obj = import_module(module_name) + for k in path.split("."): + obj = getattr(obj, k) + return obj + + +def build_function_dag_workflow(file_name: str) -> Workflow: + content = remove_result( + PythonWorkflowDefinitionWorkflow.load_json_file(file_name=file_name) + ) + + input_values: dict[int, object] = ( + {} + ) # Type is actually more restrictive, must be jsonifyable object + nodes: dict[int, Function] = {} + wf = Workflow(file_name.split(".")[0]) + for node_dict in content[NODES_LABEL]: + if node_dict["type"] == "function": + fnc = import_from_string(node_dict["value"]) + n = function_node( + fnc, + output_labels=fnc.__name__ # Strictly force single-output + ) + nodes[node_dict["id"]] = n + wf.add_child(n) + elif node_dict["type"] == "input": + input_values[node_dict["id"]] = node_dict["value"] + + for edge_dict in content[EDGES_LABEL]: + target_id = edge_dict["target"] + target_port = edge_dict["targetPort"] + source_id = edge_dict["source"] + source_port = edge_dict["sourcePort"] + + if source_port is None: + if source_id in input_values.keys(): # Parent input value + upstream = input_values[source_id] + else: # Single-output sibling + upstream = nodes[source_id] + else: # Dictionary-output sibling + injected_attribute_access = nodes[source_id].__getitem__(source_port) + upstream = injected_attribute_access + downstream = nodes[target_id] + setattr( + downstream.inputs, target_port, upstream + ) # Exploit input panel magic + # Warning: edge setup routine is bespoke for an environment where all nodes return + # a single value or a dictionary + + return wf