Skip to content

Commit 50a72c9

Browse files
committed
Implement export to cwl
1 parent e9aba8a commit 50a72c9

File tree

6 files changed

+219
-0
lines changed

6 files changed

+219
-0
lines changed

.github/workflows/pipeline.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ jobs:
128128
run: |
129129
cd example_workflows/arithmetic
130130
papermill aiida.ipynb aiida_out.ipynb -k "python3"
131+
papermill cwl.ipynb cwl_out.ipynb -k "python3"
131132
papermill jobflow.ipynb jobflow_out.ipynb -k "python3"
132133
papermill pyiron_base.ipynb pyiron_base_out.ipynb -k "python3"
133134
papermill universal_workflow.ipynb universal_workflow_out.ipynb -k "python3"

binder/environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ dependencies:
1212
- aiida-workgraph =0.5.2
1313
- conda_subprocess =0.0.6
1414
- networkx =3.4.2
15+
- cwltool =3.1.20250110105449
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"metadata":{"kernelspec":{"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.12.10"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"377fef56-484d-491c-b19e-1be6931e44eb","cell_type":"code","source":"import pickle","metadata":{"trusted":true},"outputs":[],"execution_count":4},{"id":"92e3921b-2bb8-4333-8cfe-4bd27f785d24","cell_type":"code","source":"from python_workflow_definition.cwl.export import load_workflow_json","metadata":{"trusted":true},"outputs":[],"execution_count":17},{"id":"5303c059-8ae4-4557-858e-b4bd64eac711","cell_type":"code","source":"load_workflow_json(file_name=\"workflow.json\")","metadata":{"trusted":true},"outputs":[],"execution_count":18},{"id":"df302bd2-e9b6-4595-979c-67c46414d986","cell_type":"code","source":"! cwltool workflow.cwl workflow.yml","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"/srv/conda/envs/notebook/bin/cwltool:11: DeprecationWarning: Nesting argument groups is deprecated.\n sys.exit(run())\n\u001b[1;30mINFO\u001b[0m /srv/conda/envs/notebook/bin/cwltool 3.1.20250110105449\n\u001b[1;30mINFO\u001b[0m Resolved 'workflow.cwl' to 'file:///home/jovyan/workflow.cwl'\n\u001b[1;30mINFO\u001b[0m [workflow ] start\n\u001b[1;30mINFO\u001b[0m [workflow ] starting step get_prod_and_div\n\u001b[1;30mINFO\u001b[0m [step get_prod_and_div] start\n\u001b[1;30mINFO\u001b[0m [job get_prod_and_div] /tmp/abngt5eq$ python \\\n /tmp/7pxhgcet/stg6ba69a25-7dc0-46be-9bbe-5c08e6ce1925/wrapper.py \\\n --function=workflow.get_prod_and_div \\\n --arg_x=/tmp/7pxhgcet/stg007c65c6-d1cd-44cd-b761-e181c0cc37d8/x.pickle \\\n --arg_y=/tmp/7pxhgcet/stg23ac56f7-9c1d-43d9-a90c-86036b3cbafc/y.pickle\n\u001b[1;30mINFO\u001b[0m [job get_prod_and_div] completed success\n\u001b[1;30mINFO\u001b[0m [step get_prod_and_div] completed success\n\u001b[1;30mINFO\u001b[0m [workflow ] starting step get_sum\n\u001b[1;30mINFO\u001b[0m [step get_sum] start\n\u001b[1;30mINFO\u001b[0m [job get_sum] /tmp/wzuzdwec$ python \\\n /tmp/lz9a89rn/stg44b61930-48fe-423d-a68a-593e237e0477/wrapper.py \\\n --function=workflow.get_sum \\\n --arg_x=/tmp/lz9a89rn/stgb7917545-93cd-45b4-833a-01559a22e620/prod.pickle \\\n --arg_y=/tmp/lz9a89rn/stg0c04cf0f-bc08-494d-ae4b-b12bde14f87a/div.pickle\n\u001b[1;30mINFO\u001b[0m [job get_sum] completed success\n\u001b[1;30mINFO\u001b[0m [step get_sum] completed success\n\u001b[1;30mINFO\u001b[0m [workflow ] completed success\n{\n \"result_file\": {\n \"location\": \"file:///home/jovyan/result.pickle\",\n \"basename\": \"result.pickle\",\n \"class\": \"File\",\n \"checksum\": \"sha1$3dfd802cefb317cc7138af1e3a299f565c74ddec\",\n \"size\": 21,\n \"path\": \"/home/jovyan/result.pickle\"\n }\n}\u001b[1;30mINFO\u001b[0m Final process status is success\n"}],"execution_count":19},{"id":"2942dbba-ea0a-4d20-be5c-ed9992d09ff8","cell_type":"code","source":"with open(\"result.pickle\", \"rb\") as f:\n print(pickle.load(f))","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2.5\n"}],"execution_count":20},{"id":"60e909ee-d0d0-4bd1-81c8-dd5274ae5834","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]}

python_workflow_definition/src/python_workflow_definition/cwl/__init__.py

Whitespace-only changes.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import sys
2+
import pickle
3+
from ast import literal_eval
4+
from importlib import import_module
5+
6+
7+
def load_function(funct):
8+
p, m = funct.rsplit('.', 1)
9+
return getattr(import_module(p), m)
10+
11+
12+
def convert_argument(arg):
13+
if ".pickle" in arg:
14+
with open(arg, 'rb') as f:
15+
return pickle.load(f)
16+
else:
17+
return literal_eval(arg)
18+
19+
20+
if __name__ == "__main__":
21+
# load input
22+
argument_lst = sys.argv[1:]
23+
funct = [
24+
load_function(funct=arg.split("=")[-1])
25+
for arg in argument_lst if "--function=" in arg
26+
][0]
27+
kwargs = {
28+
arg.split("=")[0][6:]: convert_argument(arg=arg.split("=")[-1])
29+
for arg in argument_lst if "--arg_" in arg
30+
}
31+
32+
# evaluate function
33+
result = funct(**kwargs)
34+
35+
# store output
36+
if isinstance(result, dict):
37+
for k, v in result.items():
38+
with open(k + ".pickle", 'wb') as f:
39+
pickle.dump(v, f)
40+
else:
41+
with open("result.pickle", 'wb') as f:
42+
pickle.dump(result, f)
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import json
2+
import pickle
3+
from yaml import CDumper as Dumper, dump
4+
5+
6+
from python_workflow_definition.purepython import (
7+
group_edges,
8+
resort_total_lst,
9+
)
10+
from python_workflow_definition.shared import (
11+
convert_nodes_list_to_dict,
12+
remove_result,
13+
EDGES_LABEL,
14+
NODES_LABEL,
15+
TARGET_LABEL,
16+
TARGET_PORT_LABEL,
17+
SOURCE_LABEL,
18+
SOURCE_PORT_LABEL,
19+
)
20+
21+
22+
def get_function_argument(argument: str, position: int = 3) -> dict:
23+
return {
24+
argument + '_file': {
25+
'type': 'File',
26+
'inputBinding': {'prefix': '--arg_' + argument + '=', 'separate': False, 'position': position},
27+
},
28+
}
29+
30+
31+
def get_function_template(function_name: str) -> dict:
32+
return {
33+
'function': {
34+
'default': function_name,
35+
'inputBinding': {'position': 2, 'prefix': '--function=', 'separate': False},
36+
'type': 'string',
37+
},
38+
}
39+
40+
41+
def get_output_name(output_name: str) -> dict:
42+
return {
43+
output_name + '_file': {
44+
'type': 'File',
45+
'outputBinding': {
46+
'glob': output_name + '.pickle'
47+
}
48+
}
49+
}
50+
51+
52+
def get_function(workflow):
53+
function_nodes_dict = {
54+
n['id']: n['value']
55+
for n in workflow[NODES_LABEL] if n["type"] == "function"
56+
}
57+
funct_dict = {}
58+
for funct_id in function_nodes_dict.keys():
59+
target_ports = list(set([e[TARGET_PORT_LABEL] for e in workflow[EDGES_LABEL] if e["target"] == funct_id]))
60+
source_ports = list(set([e[SOURCE_PORT_LABEL] for e in workflow[EDGES_LABEL] if e["source"] == funct_id]))
61+
funct_dict[funct_id] = {"targetPorts": target_ports, "sourcePorts": source_ports}
62+
return function_nodes_dict, funct_dict
63+
64+
65+
def write_function_cwl(workflow):
66+
function_nodes_dict, funct_dict = get_function(workflow)
67+
file_lst = []
68+
69+
for i in range(len(function_nodes_dict)):
70+
template = {
71+
'cwlVersion': 'v1.2',
72+
'class': 'CommandLineTool',
73+
'baseCommand': 'python',
74+
'inputs': {
75+
'wrapper': {
76+
'type': 'File',
77+
'inputBinding': {'position': 1},
78+
'default': {'class': 'File', 'location': 'wrapper.py'}
79+
},
80+
},
81+
'outputs': {
82+
}
83+
}
84+
file_name = function_nodes_dict[i].split(".")[-1] + ".cwl"
85+
if file_name not in file_lst:
86+
file_lst.append(file_name)
87+
template["inputs"].update(get_function_template(function_name=function_nodes_dict[i]))
88+
for j, arg in enumerate(funct_dict[i]['targetPorts']):
89+
template["inputs"].update(get_function_argument(argument=arg, position=3+j))
90+
for out in funct_dict[i]['sourcePorts']:
91+
if out is None:
92+
template["outputs"].update(get_output_name(output_name="result"))
93+
else:
94+
template["outputs"].update(get_output_name(output_name=out))
95+
with open(file_name, "w") as f:
96+
dump(template, f, Dumper=Dumper)
97+
98+
99+
def write_workflow_config(workflow):
100+
input_dict = {
101+
n["name"]: n["value"]
102+
for n in workflow[NODES_LABEL] if n["type"] == "input"
103+
}
104+
with open("workflow.yml", "w") as f:
105+
dump(
106+
{
107+
k + "_file": {"class": "File", "path": k + ".pickle"}
108+
for k in input_dict.keys()
109+
},
110+
f,
111+
Dumper=Dumper,
112+
)
113+
for k, v in input_dict.items():
114+
with open(k + ".pickle", "wb") as f:
115+
pickle.dump(v, f)
116+
117+
118+
def write_workflow(workflow):
119+
workflow_template = {
120+
'cwlVersion': 'v1.2',
121+
'class': 'Workflow',
122+
'inputs': {},
123+
'steps': {},
124+
'outputs': {},
125+
}
126+
input_dict = {
127+
n["name"]: n["value"]
128+
for n in workflow[NODES_LABEL] if n["type"] == "input"
129+
}
130+
function_nodes_dict, funct_dict = get_function(workflow)
131+
result_id = [n["id"] for n in workflow[NODES_LABEL] if n["type"] == "output"][0]
132+
last_compute_id = [e[SOURCE_LABEL] for e in workflow[EDGES_LABEL] if e[TARGET_LABEL] == result_id][0]
133+
workflow_template["inputs"].update({k + "_file": "File" for k in input_dict.keys()})
134+
if funct_dict[last_compute_id]["sourcePorts"] == [None]:
135+
workflow_template["outputs"] = {
136+
"result_file": {
137+
"type": "File",
138+
"outputSource": function_nodes_dict[last_compute_id].split(".")[-1] + "/result_file"
139+
},
140+
}
141+
else:
142+
raise ValueError()
143+
144+
content = remove_result(workflow_dict=workflow)
145+
edges_new_lst = content[EDGES_LABEL]
146+
total_lst = group_edges(edges_new_lst)
147+
nodes_new_dict = {
148+
int(k): v for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items()
149+
}
150+
total_new_lst = resort_total_lst(total_lst=total_lst, nodes_dict=nodes_new_dict)
151+
step_name_lst = {t[0]: function_nodes_dict[t[0]].split(".")[-1] for t in total_new_lst}
152+
input_id_dict = {n["id"]: n["name"] for n in workflow[NODES_LABEL] if n["type"] == "input"}
153+
for t in total_new_lst:
154+
ind = t[0]
155+
node_script = step_name_lst[ind] + ".cwl"
156+
output = [o + "_file" if o is not None else "result_file" for o in funct_dict[ind]['sourcePorts']]
157+
in_dict = {}
158+
for k, v in t[1].items():
159+
if v[SOURCE_LABEL] in input_id_dict:
160+
in_dict[k + "_file"] = input_id_dict[v[SOURCE_LABEL]] + "_file"
161+
else:
162+
in_dict[k + "_file"] = step_name_lst[v[SOURCE_LABEL]] + "/" + v[SOURCE_PORT_LABEL] + "_file"
163+
workflow_template["steps"].update({step_name_lst[ind]: {"run": node_script, "in": in_dict, "out": output}})
164+
with open("workflow.cwl", "w") as f:
165+
dump(workflow_template, f, Dumper=Dumper)
166+
167+
168+
def load_workflow_json(file_name: str):
169+
with open(file_name, "r") as f:
170+
workflow = json.load(f)
171+
172+
write_function_cwl(workflow=workflow)
173+
write_workflow_config(workflow=workflow)
174+
write_workflow(workflow=workflow)

0 commit comments

Comments
 (0)