|
1 | 1 | from inspect import isfunction |
2 | 2 | from importlib import import_module |
3 | | -from typing import Optional |
| 3 | +from typing import Any, Optional |
4 | 4 |
|
5 | 5 | import numpy as np |
6 | 6 | from pyiron_workflow import function_node, Workflow |
| 7 | +from pyiron_workflow.api import Function |
7 | 8 |
|
8 | 9 | from python_workflow_definition.models import PythonWorkflowDefinitionWorkflow |
9 | 10 | from python_workflow_definition.shared import ( |
@@ -312,3 +313,62 @@ def load_workflow_json(file_name: str, workflow: Optional[Workflow] = None): |
312 | 313 | total_dict=total_dict, |
313 | 314 | node_conversion_dict=node_conversion_dict, |
314 | 315 | ) |
| 316 | + |
| 317 | + |
| 318 | +def import_from_string(library_path: str) -> Any: |
| 319 | + # Copied from bagofholding |
| 320 | + split_path = library_path.split(".", 1) |
| 321 | + if len(split_path) == 1: |
| 322 | + module_name, path = split_path[0], "" |
| 323 | + else: |
| 324 | + module_name, path = split_path |
| 325 | + obj = import_module(module_name) |
| 326 | + for k in path.split("."): |
| 327 | + obj = getattr(obj, k) |
| 328 | + return obj |
| 329 | + |
| 330 | + |
| 331 | +def build_function_dag_workflow(file_name: str) -> Workflow: |
| 332 | + content = remove_result( |
| 333 | + PythonWorkflowDefinitionWorkflow.load_json_file(file_name="workflow.json") |
| 334 | + ) |
| 335 | + |
| 336 | + input_values: dict[int, object] = ( |
| 337 | + {} |
| 338 | + ) # Type is actually more restrictive, must be jsonifyable object |
| 339 | + nodes: dict[int, Function] = {} |
| 340 | + wf = Workflow(file_name.split(".")[0]) |
| 341 | + for node_dict in content[NODES_LABEL]: |
| 342 | + if node_dict["type"] == "function": |
| 343 | + fnc = import_from_string(node_dict["value"]) |
| 344 | + n = function_node( |
| 345 | + fnc, |
| 346 | + output_labels=fnc.__name__ # Strictly force single-output |
| 347 | + ) |
| 348 | + nodes[node_dict["id"]] = n |
| 349 | + wf.add_child(n) |
| 350 | + elif node_dict["type"] == "input": |
| 351 | + input_values[node_dict["id"]] = node_dict["value"] |
| 352 | + |
| 353 | + for edge_dict in content[EDGES_LABEL]: |
| 354 | + target_id = edge_dict["target"] |
| 355 | + target_port = edge_dict["targetPort"] |
| 356 | + source_id = edge_dict["source"] |
| 357 | + source_port = edge_dict["sourcePort"] |
| 358 | + |
| 359 | + if source_port is None: |
| 360 | + if source_id in input_values.keys(): # Parent input value |
| 361 | + upstream = input_values[source_id] |
| 362 | + else: # Single-output sibling |
| 363 | + upstream = nodes[source_id] |
| 364 | + else: # Dictionary-output sibling |
| 365 | + injected_attribute_access = nodes[source_id].__getitem__(source_port) |
| 366 | + upstream = injected_attribute_access |
| 367 | + downstream = nodes[target_id] |
| 368 | + setattr( |
| 369 | + downstream.inputs, target_port, upstream |
| 370 | + ) # Exploit input panel magic |
| 371 | + # Warning: edge setup routine is bespoke for an environment where all nodes return |
| 372 | + # a single value or a dictionary |
| 373 | + |
| 374 | + return wf |
0 commit comments