|
| 1 | +import json |
| 2 | +from importlib import import_module |
| 3 | +from inspect import isfunction |
| 4 | + |
| 5 | + |
| 6 | +from python_workflow_definition.shared import get_dict, get_list, get_kwargs, get_source_handles |
| 7 | +from python_workflow_definition.purepython import resort_total_lst, group_edges, get_value |
| 8 | + |
| 9 | + |
| 10 | +def load_workflow_json(file_name, exe): |
| 11 | + with open(file_name, "r") as f: |
| 12 | + content = json.load(f) |
| 13 | + |
| 14 | + edges_new_lst = content["edges"] |
| 15 | + nodes_new_dict = {} |
| 16 | + for k, v in content["nodes"].items(): |
| 17 | + if isinstance(v, str) and "." in v: |
| 18 | + p, m = v.rsplit('.', 1) |
| 19 | + mod = import_module(p) |
| 20 | + nodes_new_dict[int(k)] = getattr(mod, m) |
| 21 | + else: |
| 22 | + nodes_new_dict[int(k)] = v |
| 23 | + |
| 24 | + total_lst = group_edges(edges_new_lst) |
| 25 | + total_new_lst = resort_total_lst(total_lst=total_lst, nodes_dict=nodes_new_dict) |
| 26 | + |
| 27 | + result_dict = {} |
| 28 | + last_key = None |
| 29 | + for lst in total_new_lst: |
| 30 | + node = nodes_new_dict[lst[0]] |
| 31 | + if isfunction(node): |
| 32 | + kwargs = { |
| 33 | + k: get_value(result_dict=result_dict, nodes_new_dict=nodes_new_dict, link_dict=v) |
| 34 | + for k, v in lst[1].items() |
| 35 | + } |
| 36 | + result_dict[lst[0]] = exe.submit(node, **kwargs) |
| 37 | + last_key = lst[0] |
| 38 | + |
| 39 | + return result_dict[last_key] |
0 commit comments