Skip to content

Commit 18ee6f2

Browse files
committed
Add universal_simple_to_aiida
1 parent 843d346 commit 18ee6f2

File tree

2 files changed

+288
-0
lines changed

2 files changed

+288
-0
lines changed
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from importlib import import_module
2+
import traceback
3+
from aiida_workgraph import WorkGraph, task
4+
import json
5+
6+
def pickle_node(value):
7+
"""Handle data nodes"""
8+
return value
9+
10+
11+
def get_item(data: dict, key: str):
12+
"""Handle get item from the outputs"""
13+
return data[key]
14+
15+
# I defined the function mapping here
16+
# but in principle, this is not needed, because one can import the function from the module path
17+
# func_mapping = {
18+
# "simple_workflow.add_x_and_y": task.pythonjob()(add_x_and_y),
19+
# "simple_workflow.add_x_and_y_and_z": task.pythonjob()(add_x_and_y_and_z),
20+
# "quantum_espresso_workflow.get_bulk_structure": task.pythonjob()(
21+
# get_bulk_structure
22+
# ),
23+
# "quantum_espresso_workflow.calculate_qe": task.pythonjob()(calculate_qe),
24+
# "quantum_espresso_workflow.plot_energy_volume_curve": task.pythonjob()(
25+
# plot_energy_volume_curve
26+
# ),
27+
# "python_workflow_definition.pyiron_base.get_dict": task.pythonjob()(get_dict),
28+
# "python_workflow_definition.pyiron_base.get_list": task.pythonjob()(get_list),
29+
# "quantum_espresso_workflow.generate_structures": task.pythonjob()(
30+
# generate_structures
31+
# ),
32+
# }
33+
34+
35+
def write_workflow_json(wg):
36+
wgdata = wg.to_dict()
37+
data = {"nodes": {}, "edges": []}
38+
node_name_mapping = {}
39+
i = 0
40+
for name, node in wgdata["tasks"].items():
41+
node_name_mapping[name] = i
42+
callable_name = node["executor"]["callable_name"]
43+
data["nodes"][i] = callable_name
44+
if callable_name == "pickle_node":
45+
data["nodes"][i] = node["inputs"]["value"]["property"]["value"].value
46+
i += 1
47+
48+
for link in wgdata["links"]:
49+
if wgdata["tasks"][link["from_node"]]["executor"]["callable_name"] == "pickle_node":
50+
link["from_socket"] = None
51+
link["from_node"] = node_name_mapping[link["from_node"]]
52+
link["to_node"] = node_name_mapping[link["to_node"]]
53+
data["edges"].append(link)
54+
55+
return data
56+
57+
58+
def load_workflow_json(filename):
59+
with open(filename) as f:
60+
data = json.load(f)
61+
62+
wg = WorkGraph()
63+
task_name_mapping = {}
64+
# add tasks
65+
for name, identifier in data["nodes"].items():
66+
# if isinstance(identifier, str) and identifier in func_mapping:
67+
if isinstance(identifier, str) and "." in identifier:
68+
p, m = identifier.rsplit('.', 1)
69+
mod = import_module(p)
70+
_func = getattr(mod, m)
71+
func = task.pythonjob()(_func)
72+
# func = func_mapping[identifier]
73+
# I use the register_pickle_by_value, because the function is defined in a local file
74+
wg.add_task(func, register_pickle_by_value=True, name=m)
75+
# Remove the default result output, because we will add the outputs later from the data in the link
76+
del wg.tasks[-1].outputs["result"]
77+
else:
78+
# data task
79+
wg.add_task(pickle_node, value=identifier, name=name)
80+
task_name_mapping[name] = wg.tasks[-1].name
81+
# add links
82+
for link in data["edges"]:
83+
if link["sourceHandle"] is None:
84+
link["sourceHandle"] = "result"
85+
try:
86+
from_task = wg.tasks[task_name_mapping[str(link["source"])]]
87+
# because we are not define the outputs explicitly during the pythonjob creation
88+
# we add it here, and assume the output exit
89+
if link["sourceHandle"] not in from_task.outputs:
90+
from_socket = from_task.add_output(
91+
"workgraph.any",
92+
name=link["sourceHandle"],
93+
metadata={"is_function_output": True},
94+
)
95+
else:
96+
from_socket = from_task.outputs[link["sourceHandle"]]
97+
to_task = wg.tasks[task_name_mapping[str(link["target"])]]
98+
# if the input is not exit, it means we pass the data into to the kwargs
99+
# in this case, we add the input socket
100+
if link["targetHandle"] not in to_task.inputs:
101+
#
102+
to_socket = to_task.add_input(
103+
"workgraph.any",
104+
name=link["targetHandle"],
105+
metadata={"is_function_input": True},
106+
)
107+
else:
108+
to_socket = to_task.inputs[link["targetHandle"]]
109+
wg.add_link(from_socket, to_socket)
110+
except Exception as e:
111+
traceback.print_exc()
112+
print("Failed to link", link, "with error:", e)
113+
return wg
114+
115+
def get_list(**kwargs):
116+
return list(kwargs.values())
117+
118+
def get_dict(**kwargs):
119+
return {k: v for k, v in kwargs.items()}

universal_simple_to_aiida.ipynb

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"from python_workflow_definition.aiida import load_workflow_json"
10+
]
11+
},
12+
{
13+
"cell_type": "code",
14+
"execution_count": 2,
15+
"metadata": {},
16+
"outputs": [],
17+
"source": [
18+
"workgraph = load_workflow_json(filename='workflow_simple.json')"
19+
]
20+
},
21+
{
22+
"cell_type": "code",
23+
"execution_count": 3,
24+
"metadata": {},
25+
"outputs": [
26+
{
27+
"data": {
28+
"application/vnd.jupyter.widget-view+json": {
29+
"model_id": "09375dd84157413495ec5f8c07449a48",
30+
"version_major": 2,
31+
"version_minor": 1
32+
},
33+
"text/plain": [
34+
"NodeGraphWidget(settings={'minimap': True}, style={'width': '90%', 'height': '600px'}, value={'name': 'WorkGra…"
35+
]
36+
},
37+
"execution_count": 3,
38+
"metadata": {},
39+
"output_type": "execute_result"
40+
}
41+
],
42+
"source": [
43+
"\n",
44+
"# TODO: Create inputs rather than tasks out of data nodes\n",
45+
"workgraph"
46+
]
47+
},
48+
{
49+
"cell_type": "code",
50+
"execution_count": 4,
51+
"metadata": {},
52+
"outputs": [
53+
{
54+
"name": "stdout",
55+
"output_type": "stream",
56+
"text": [
57+
"PROCESS: (<class 'aiida_workgraph.engine.workgraph.WorkGraphEngine'>, <class 'aiida.engine.processes.workchains.workchain.Protect'>)\n"
58+
]
59+
},
60+
{
61+
"name": "stderr",
62+
"output_type": "stream",
63+
"text": [
64+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: Continue workgraph.\n",
65+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: tasks ready to run: 2,3\n",
66+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|run_tasks]: Run task: 2, type: Normal\n",
67+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|update_normal_task_state]: Task: 2 finished.\n",
68+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: Continue workgraph.\n",
69+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: tasks ready to run: 3\n",
70+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|run_tasks]: Run task: 3, type: Normal\n",
71+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|update_normal_task_state]: Task: 3 finished.\n",
72+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: Continue workgraph.\n",
73+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: tasks ready to run: add_x_and_y\n",
74+
"03/14/2025 10:01:48 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|run_tasks]: Run task: add_x_and_y, type: PYTHONJOB\n"
75+
]
76+
},
77+
{
78+
"name": "stdout",
79+
"output_type": "stream",
80+
"text": [
81+
"------------------------------------------------------------\n",
82+
"kwargs: {'value': '1'}\n",
83+
"------------------------------------------------------------\n",
84+
"kwargs: {'value': '2'}\n",
85+
"------------------------------------------------------------\n",
86+
"kwargs: {'register_pickle_by_value': True, 'x': '1', 'y': '2'}\n",
87+
"PROCESS: (<class 'aiida_pythonjob.calculations.pythonjob.PythonJob'>, <class 'plumpy.processes.ProcessStateMachineMeta'>)\n"
88+
]
89+
},
90+
{
91+
"name": "stderr",
92+
"output_type": "stream",
93+
"text": [
94+
"03/14/2025 10:01:49 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|on_wait]: Process status: Waiting for child processes: 6285\n",
95+
"03/14/2025 10:01:52 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|update_task_state]: Task: add_x_and_y finished.\n",
96+
"03/14/2025 10:01:53 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: Continue workgraph.\n",
97+
"03/14/2025 10:01:53 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: tasks ready to run: add_x_and_y_and_z\n",
98+
"03/14/2025 10:01:53 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|run_tasks]: Run task: add_x_and_y_and_z, type: PYTHONJOB\n"
99+
]
100+
},
101+
{
102+
"name": "stdout",
103+
"output_type": "stream",
104+
"text": [
105+
"------------------------------------------------------------\n",
106+
"kwargs: {'register_pickle_by_value': True, 'x': <Str: uuid: f2fe3b5e-a209-4efa-9ba3-601723a3dc85 (pk: 6289) value: 1>, 'y': <Str: uuid: edcbc572-3d24-40b1-8cb1-6306d3b10f0d (pk: 6290) value: 2>, 'z': <Str: uuid: 66bbc8aa-9285-48d2-a88d-b5a939df46b7 (pk: 6291) value: 12>}\n",
107+
"PROCESS: (<class 'aiida_pythonjob.calculations.pythonjob.PythonJob'>, <class 'plumpy.processes.ProcessStateMachineMeta'>)\n"
108+
]
109+
},
110+
{
111+
"name": "stderr",
112+
"output_type": "stream",
113+
"text": [
114+
"03/14/2025 10:01:53 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|on_wait]: Process status: Waiting for child processes: 6298\n",
115+
"03/14/2025 10:01:56 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|update_task_state]: Task: add_x_and_y_and_z finished.\n",
116+
"03/14/2025 10:01:57 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: Continue workgraph.\n",
117+
"03/14/2025 10:01:57 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|continue_workgraph]: tasks ready to run: \n",
118+
"03/14/2025 10:01:57 AM <2720405> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [6276|WorkGraphEngine|finalize]: Finalize workgraph.\n"
119+
]
120+
},
121+
{
122+
"name": "stdout",
123+
"output_type": "stream",
124+
"text": [
125+
"group_outputs []\n",
126+
"group_outputs: {}\n"
127+
]
128+
},
129+
{
130+
"data": {
131+
"text/plain": [
132+
"{'execution_count': <Int: uuid: 525e250b-6175-45f8-8657-d66d60c68860 (pk: 6303) value: 1>}"
133+
]
134+
},
135+
"execution_count": 4,
136+
"metadata": {},
137+
"output_type": "execute_result"
138+
}
139+
],
140+
"source": [
141+
"from aiida import load_profile\n",
142+
"load_profile()\n",
143+
"\n",
144+
"workgraph.run()"
145+
]
146+
}
147+
],
148+
"metadata": {
149+
"kernelspec": {
150+
"display_name": "ADIS",
151+
"language": "python",
152+
"name": "python3"
153+
},
154+
"language_info": {
155+
"codemirror_mode": {
156+
"name": "ipython",
157+
"version": 3
158+
},
159+
"file_extension": ".py",
160+
"mimetype": "text/x-python",
161+
"name": "python",
162+
"nbconvert_exporter": "python",
163+
"pygments_lexer": "ipython3",
164+
"version": "3.10.12"
165+
}
166+
},
167+
"nbformat": 4,
168+
"nbformat_minor": 2
169+
}

0 commit comments

Comments
 (0)