Skip to content

Commit e40adbd

Browse files
committed
wip
1 parent 678c7e1 commit e40adbd

File tree

7 files changed

+1059
-321
lines changed

7 files changed

+1059
-321
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,4 @@ jobflow_to_aiida_qe.json
199199
aiida_to_jobflow_qe.json
200200
pyiron_base_to_aiida_simple.json
201201
pyiron_base_to_jobflow_qe.json
202-
202+
**/*.h5

example_workflows/arithmetic/aiida.ipynb

Lines changed: 324 additions & 122 deletions
Large diffs are not rendered by default.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{
2+
"version": "0.1.0",
3+
"nodes": [
4+
{
5+
"id": 0,
6+
"type": "function",
7+
"value": "workflow.get_prod_and_div"
8+
},
9+
{
10+
"id": 1,
11+
"type": "function",
12+
"value": "workflow.get_sum"
13+
},
14+
{
15+
"id": 2,
16+
"type": "function",
17+
"value": "workflow.get_square"
18+
},
19+
{
20+
"id": 3,
21+
"type": "input",
22+
"name": "x",
23+
"value": 1.0
24+
},
25+
{
26+
"id": 4,
27+
"type": "input",
28+
"name": "y",
29+
"value": 2.0
30+
},
31+
{
32+
"id": 5,
33+
"type": "output",
34+
"name": "result"
35+
}
36+
],
37+
"edges": [
38+
{
39+
"target": 1,
40+
"targetPort": "x",
41+
"source": 0,
42+
"sourcePort": "prod"
43+
},
44+
{
45+
"target": 1,
46+
"targetPort": "y",
47+
"source": 0,
48+
"sourcePort": "div"
49+
},
50+
{
51+
"target": 2,
52+
"targetPort": "x",
53+
"source": 1,
54+
"sourcePort": null
55+
},
56+
{
57+
"target": 0,
58+
"targetPort": "x",
59+
"source": 3,
60+
"sourcePort": null
61+
},
62+
{
63+
"target": 0,
64+
"targetPort": "y",
65+
"source": 4,
66+
"sourcePort": null
67+
},
68+
{
69+
"target": 5,
70+
"targetPort": null,
71+
"source": 2,
72+
"sourcePort": null
73+
}
74+
]
75+
}

example_workflows/arithmetic/jobflow.ipynb

Lines changed: 416 additions & 188 deletions
Large diffs are not rendered by default.
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# coding: utf-8
2+
3+
# # jobflow
4+
5+
# ## Define workflow with jobflow
6+
#
7+
# This tutorial will demonstrate how to use the PWD with `jobflow` and load the workflow with `aiida` and `pyiron`.
8+
#
9+
# [`jobflow`](https://joss.theoj.org/papers/10.21105/joss.05995) was developed to simplify the development of high-throughput workflows. It uses a decorator-based approach to define the “Job“s that can be connected to form complex workflows (“Flow“s). `jobflow` is the workflow language of the workflow library [`atomate2`](https://chemrxiv.org/engage/chemrxiv/article-details/678e76a16dde43c9085c75e9), designed to replace [atomate](https://www.sciencedirect.com/science/article/pii/S0927025617303919), which was central to the development of the [Materials Project](https://pubs.aip.org/aip/apm/article/1/1/011002/119685/Commentary-The-Materials-Project-A-materials) database.
10+
11+
# First, we start by importing the job decorator and the Flow class from jobflow, as welll as the necessary modules from the python workflow definition and the example arithmetic workflow.
12+
13+
# In[1]:
14+
15+
16+
from jobflow import job, Flow
17+
18+
19+
# In[2]:
20+
21+
22+
from python_workflow_definition.jobflow import write_workflow_json
23+
24+
25+
# In[3]:
26+
27+
28+
from workflow import (
29+
get_sum as _get_sum,
30+
get_prod_and_div as _get_prod_and_div,
31+
get_square as _get_square,
32+
)
33+
34+
35+
# Using the job object decorator, the imported functions from the arithmetic workflow are transformed into jobflow “Job”s. These “Job”s can delay the execution of Python functions and can be chained into workflows (“Flow”s). A “Job” can return serializable outputs (e.g., a number, a dictionary, or a Pydantic model) or a so-called “Response” object, which enables the execution of dynamic workflows where the number of nodes is not known prior to the workflow’s execution.
36+
37+
# In[4]:
38+
39+
40+
workflow_json_filename = "jobflow_simple.json"
41+
42+
43+
# In[5]:
44+
45+
46+
get_sum = job(_get_sum)
47+
# Note: one could also transfer the outputs to the datastore as well: get_prod_and_div = job(_get_prod_and_div, data=["prod", "div"])
48+
# On the way from the general definition to the jobflow definition, we do this automatically to avoid overflow databases.
49+
get_prod_and_div = job(_get_prod_and_div)
50+
get_square = job(_get_square)
51+
52+
53+
# In[6]:
54+
55+
56+
prod_and_div = get_prod_and_div(x=1, y=2)
57+
58+
59+
# In[7]:
60+
61+
62+
tmp_sum = get_sum(x=prod_and_div.output.prod, y=prod_and_div.output.div)
63+
64+
65+
# In[8]:
66+
67+
68+
result = get_square(x=tmp_sum.output)
69+
70+
71+
# In[9]:
72+
73+
74+
flow = Flow([prod_and_div, tmp_sum, result])
75+
76+
77+
# As jobflow itself is only a workflow language, the workflows are typically executed on high-performance computers with a workflow manager such as [Fireworks](https://onlinelibrary.wiley.com/doi/full/10.1002/cpe.3505) or [jobflow-remote](https://github.com/Matgenix/jobflow-remote). For smaller and test workflows, simple linear, non-parallel execution of the workflow graph can be performed with jobflow itself. All outputs of individual jobs are saved in a database. For high-throughput applications typically, a MongoDB database is used. For testing and smaller workflows, a memory database can be used instead.
78+
79+
# In[10]:
80+
81+
82+
write_workflow_json(flow=flow, file_name=workflow_json_filename)
83+
84+
85+
# In[11]:
86+
87+
88+
89+
90+
# Finally, you can write the workflow data into a JSON file to be imported later.
91+
92+
# ## Load Workflow with aiida
93+
#
94+
# In this part, we will demonstrate how to import the `jobflow` workflow into `aiida` via the PWD.
95+
96+
# In[12]:
97+
98+
99+
from aiida import load_profile
100+
101+
load_profile()
102+
103+
104+
# In[13]:
105+
106+
107+
from python_workflow_definition.aiida import load_workflow_json
108+
109+
110+
# We import the necessary modules from `aiida` and the PWD, as well as the workflow JSON file.
111+
112+
# In[14]:
113+
114+
wg = load_workflow_json(file_name=workflow_json_filename)
115+
116+
wg
117+
118+
119+
# Finally, we are now able to run the workflow with `aiida`.
120+
121+
# In[15]:
122+
123+
124+
wg.run()
125+
126+
127+
# ## Load Workflow with pyiron_base
128+
#
129+
# In this part, we will demonstrate how to import the `jobflow` workflow into `pyiron` via the PWD.
130+
131+
# In[16]:
132+
133+
134+
from python_workflow_definition.pyiron_base import load_workflow_json
135+
136+
137+
# In[17]:
138+
139+
140+
delayed_object_lst = load_workflow_json(file_name=workflow_json_filename)
141+
delayed_object_lst[-1].draw()
142+
143+
144+
# In[18]:
145+
146+
147+
delayed_object_lst[-1].pull()
148+
149+
150+
# Here, the procedure is the same as before: Import the necessary `pyiron_base` module from the PWD, import the workflow JSON file and run the workflow with pyiron.
151+
152+
# In[ ]:
153+
154+
155+
156+
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{
2+
"version": "0.1.0",
3+
"nodes": [
4+
{
5+
"id": 0,
6+
"type": "function",
7+
"value": "workflow.get_prod_and_div"
8+
},
9+
{
10+
"id": 1,
11+
"type": "function",
12+
"value": "workflow.get_sum"
13+
},
14+
{
15+
"id": 2,
16+
"type": "function",
17+
"value": "workflow.get_square"
18+
},
19+
{
20+
"id": 3,
21+
"type": "input",
22+
"name": "x",
23+
"value": 1
24+
},
25+
{
26+
"id": 4,
27+
"type": "input",
28+
"name": "y",
29+
"value": 2
30+
},
31+
{
32+
"id": 5,
33+
"type": "output",
34+
"name": "result"
35+
}
36+
],
37+
"edges": [
38+
{
39+
"target": 0,
40+
"targetPort": "x",
41+
"source": 3,
42+
"sourcePort": null
43+
},
44+
{
45+
"target": 0,
46+
"targetPort": "y",
47+
"source": 4,
48+
"sourcePort": null
49+
},
50+
{
51+
"target": 1,
52+
"targetPort": "x",
53+
"source": 0,
54+
"sourcePort": "prod"
55+
},
56+
{
57+
"target": 1,
58+
"targetPort": "y",
59+
"source": 0,
60+
"sourcePort": "div"
61+
},
62+
{
63+
"target": 2,
64+
"targetPort": "x",
65+
"source": 1,
66+
"sourcePort": null
67+
},
68+
{
69+
"target": 5,
70+
"targetPort": null,
71+
"source": 2,
72+
"sourcePort": null
73+
}
74+
]
75+
}

src/python_workflow_definition/aiida.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424

2525

2626
def load_workflow_json(file_name: str) -> WorkGraph:
27-
data = remove_result(
28-
workflow_dict=PythonWorkflowDefinitionWorkflow.load_json_file(
29-
file_name=file_name
30-
)
31-
)
27+
28+
data=PythonWorkflowDefinitionWorkflow.load_json_file(file_name=file_name)
29+
# data = remove_result(workflow_dict=workflow_dict)
3230

3331
wg = WorkGraph()
3432
task_name_mapping = {}
@@ -51,13 +49,17 @@ def load_workflow_json(file_name: str) -> WorkGraph:
5149

5250
# add links
5351
for link in data[EDGES_LABEL]:
52+
# TODO: continue here
5453
to_task = task_name_mapping[str(link[TARGET_LABEL])]
5554
# if the input is not exit, it means we pass the data into to the kwargs
5655
# in this case, we add the input socket
57-
if link[TARGET_PORT_LABEL] not in to_task.inputs:
58-
to_socket = to_task.add_input("workgraph.any", name=link[TARGET_PORT_LABEL])
59-
else:
60-
to_socket = to_task.inputs[link[TARGET_PORT_LABEL]]
56+
try:
57+
if link[TARGET_PORT_LABEL] not in to_task.inputs:
58+
to_socket = to_task.add_input("workgraph.any", name=link[TARGET_PORT_LABEL])
59+
else:
60+
to_socket = to_task.inputs[link[TARGET_PORT_LABEL]]
61+
except:
62+
breakpoint()
6163
from_task = task_name_mapping[str(link[SOURCE_LABEL])]
6264
if isinstance(from_task, orm.Data):
6365
to_socket.value = from_task
@@ -73,7 +75,7 @@ def load_workflow_json(file_name: str) -> WorkGraph:
7375
"workgraph.any",
7476
name=link[SOURCE_PORT_LABEL],
7577
# name=str(link["sourcePort"]),
76-
metadata={"is_function_output": True},
78+
# metadata={"is_function_output": True},
7779
)
7880
else:
7981
from_socket = from_task.outputs[link[SOURCE_PORT_LABEL]]

0 commit comments

Comments
 (0)