Skip to content

Commit 2d68d5a

Browse files
committed
fixes
1 parent 50a72c9 commit 2d68d5a

File tree

2 files changed

+110
-70
lines changed

2 files changed

+110
-70
lines changed

python_workflow_definition/src/python_workflow_definition/cwl/__main__.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66

77
def load_function(funct):
8-
p, m = funct.rsplit('.', 1)
8+
p, m = funct.rsplit(".", 1)
99
return getattr(import_module(p), m)
1010

1111

1212
def convert_argument(arg):
1313
if ".pickle" in arg:
14-
with open(arg, 'rb') as f:
14+
with open(arg, "rb") as f:
1515
return pickle.load(f)
1616
else:
1717
return literal_eval(arg)
@@ -22,11 +22,13 @@ def convert_argument(arg):
2222
argument_lst = sys.argv[1:]
2323
funct = [
2424
load_function(funct=arg.split("=")[-1])
25-
for arg in argument_lst if "--function=" in arg
25+
for arg in argument_lst
26+
if "--function=" in arg
2627
][0]
2728
kwargs = {
2829
arg.split("=")[0][6:]: convert_argument(arg=arg.split("=")[-1])
29-
for arg in argument_lst if "--arg_" in arg
30+
for arg in argument_lst
31+
if "--arg_" in arg
3032
}
3133

3234
# evaluate function
@@ -35,8 +37,8 @@ def convert_argument(arg):
3537
# store output
3638
if isinstance(result, dict):
3739
for k, v in result.items():
38-
with open(k + ".pickle", 'wb') as f:
40+
with open(k + ".pickle", "wb") as f:
3941
pickle.dump(v, f)
4042
else:
41-
with open("result.pickle", 'wb') as f:
42-
pickle.dump(result, f)
43+
with open("result.pickle", "wb") as f:
44+
pickle.dump(result, f)

python_workflow_definition/src/python_workflow_definition/cwl/export.py

Lines changed: 101 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,87 +19,108 @@
1919
)
2020

2121

22-
def get_function_argument(argument: str, position: int = 3) -> dict:
22+
def _get_function_argument(argument: str, position: int = 3) -> dict:
2323
return {
24-
argument + '_file': {
25-
'type': 'File',
26-
'inputBinding': {'prefix': '--arg_' + argument + '=', 'separate': False, 'position': position},
24+
argument
25+
+ "_file": {
26+
"type": "File",
27+
"inputBinding": {
28+
"prefix": "--arg_" + argument + "=",
29+
"separate": False,
30+
"position": position,
31+
},
2732
},
2833
}
2934

3035

31-
def get_function_template(function_name: str) -> dict:
36+
def _get_function_template(function_name: str) -> dict:
3237
return {
33-
'function': {
34-
'default': function_name,
35-
'inputBinding': {'position': 2, 'prefix': '--function=', 'separate': False},
36-
'type': 'string',
38+
"function": {
39+
"default": function_name,
40+
"inputBinding": {"position": 2, "prefix": "--function=", "separate": False},
41+
"type": "string",
3742
},
3843
}
3944

4045

41-
def get_output_name(output_name: str) -> dict:
46+
def _get_output_name(output_name: str) -> dict:
4247
return {
43-
output_name + '_file': {
44-
'type': 'File',
45-
'outputBinding': {
46-
'glob': output_name + '.pickle'
47-
}
48-
}
48+
output_name
49+
+ "_file": {"type": "File", "outputBinding": {"glob": output_name + ".pickle"}}
4950
}
5051

5152

52-
def get_function(workflow):
53+
def _get_function(workflow):
5354
function_nodes_dict = {
54-
n['id']: n['value']
55-
for n in workflow[NODES_LABEL] if n["type"] == "function"
55+
n["id"]: n["value"] for n in workflow[NODES_LABEL] if n["type"] == "function"
5656
}
5757
funct_dict = {}
5858
for funct_id in function_nodes_dict.keys():
59-
target_ports = list(set([e[TARGET_PORT_LABEL] for e in workflow[EDGES_LABEL] if e["target"] == funct_id]))
60-
source_ports = list(set([e[SOURCE_PORT_LABEL] for e in workflow[EDGES_LABEL] if e["source"] == funct_id]))
61-
funct_dict[funct_id] = {"targetPorts": target_ports, "sourcePorts": source_ports}
59+
target_ports = list(
60+
set(
61+
[
62+
e[TARGET_PORT_LABEL]
63+
for e in workflow[EDGES_LABEL]
64+
if e["target"] == funct_id
65+
]
66+
)
67+
)
68+
source_ports = list(
69+
set(
70+
[
71+
e[SOURCE_PORT_LABEL]
72+
for e in workflow[EDGES_LABEL]
73+
if e["source"] == funct_id
74+
]
75+
)
76+
)
77+
funct_dict[funct_id] = {
78+
"targetPorts": target_ports,
79+
"sourcePorts": source_ports,
80+
}
6281
return function_nodes_dict, funct_dict
6382

6483

65-
def write_function_cwl(workflow):
66-
function_nodes_dict, funct_dict = get_function(workflow)
84+
def _write_function_cwl(workflow):
85+
function_nodes_dict, funct_dict = _get_function(workflow)
6786
file_lst = []
6887

6988
for i in range(len(function_nodes_dict)):
7089
template = {
71-
'cwlVersion': 'v1.2',
72-
'class': 'CommandLineTool',
73-
'baseCommand': 'python',
74-
'inputs': {
75-
'wrapper': {
76-
'type': 'File',
77-
'inputBinding': {'position': 1},
78-
'default': {'class': 'File', 'location': 'wrapper.py'}
90+
"cwlVersion": "v1.2",
91+
"class": "CommandLineTool",
92+
"baseCommand": "python",
93+
"inputs": {
94+
"wrapper": {
95+
"type": "string",
96+
"inputBinding": {"position": 1, "prefix": "-m"},
97+
"default": "python_workflow_definition.cwl",
7998
},
8099
},
81-
'outputs': {
82-
}
100+
"outputs": {},
83101
}
84102
file_name = function_nodes_dict[i].split(".")[-1] + ".cwl"
85103
if file_name not in file_lst:
86104
file_lst.append(file_name)
87-
template["inputs"].update(get_function_template(function_name=function_nodes_dict[i]))
88-
for j, arg in enumerate(funct_dict[i]['targetPorts']):
89-
template["inputs"].update(get_function_argument(argument=arg, position=3+j))
90-
for out in funct_dict[i]['sourcePorts']:
105+
template["inputs"].update(
106+
_get_function_template(function_name=function_nodes_dict[i])
107+
)
108+
for j, arg in enumerate(funct_dict[i]["targetPorts"]):
109+
template["inputs"].update(
110+
_get_function_argument(argument=arg, position=3 + j)
111+
)
112+
for out in funct_dict[i]["sourcePorts"]:
91113
if out is None:
92-
template["outputs"].update(get_output_name(output_name="result"))
114+
template["outputs"].update(_get_output_name(output_name="result"))
93115
else:
94-
template["outputs"].update(get_output_name(output_name=out))
116+
template["outputs"].update(_get_output_name(output_name=out))
95117
with open(file_name, "w") as f:
96118
dump(template, f, Dumper=Dumper)
97119

98120

99-
def write_workflow_config(workflow):
121+
def _write_workflow_config(workflow):
100122
input_dict = {
101-
n["name"]: n["value"]
102-
for n in workflow[NODES_LABEL] if n["type"] == "input"
123+
n["name"]: n["value"] for n in workflow[NODES_LABEL] if n["type"] == "input"
103124
}
104125
with open("workflow.yml", "w") as f:
105126
dump(
@@ -115,27 +136,29 @@ def write_workflow_config(workflow):
115136
pickle.dump(v, f)
116137

117138

118-
def write_workflow(workflow):
139+
def _write_workflow(workflow):
119140
workflow_template = {
120-
'cwlVersion': 'v1.2',
121-
'class': 'Workflow',
122-
'inputs': {},
123-
'steps': {},
124-
'outputs': {},
141+
"cwlVersion": "v1.2",
142+
"class": "Workflow",
143+
"inputs": {},
144+
"steps": {},
145+
"outputs": {},
125146
}
126147
input_dict = {
127-
n["name"]: n["value"]
128-
for n in workflow[NODES_LABEL] if n["type"] == "input"
148+
n["name"]: n["value"] for n in workflow[NODES_LABEL] if n["type"] == "input"
129149
}
130-
function_nodes_dict, funct_dict = get_function(workflow)
150+
function_nodes_dict, funct_dict = _get_function(workflow)
131151
result_id = [n["id"] for n in workflow[NODES_LABEL] if n["type"] == "output"][0]
132-
last_compute_id = [e[SOURCE_LABEL] for e in workflow[EDGES_LABEL] if e[TARGET_LABEL] == result_id][0]
152+
last_compute_id = [
153+
e[SOURCE_LABEL] for e in workflow[EDGES_LABEL] if e[TARGET_LABEL] == result_id
154+
][0]
133155
workflow_template["inputs"].update({k + "_file": "File" for k in input_dict.keys()})
134156
if funct_dict[last_compute_id]["sourcePorts"] == [None]:
135157
workflow_template["outputs"] = {
136158
"result_file": {
137159
"type": "File",
138-
"outputSource": function_nodes_dict[last_compute_id].split(".")[-1] + "/result_file"
160+
"outputSource": function_nodes_dict[last_compute_id].split(".")[-1]
161+
+ "/result_file",
139162
},
140163
}
141164
else:
@@ -145,22 +168,37 @@ def write_workflow(workflow):
145168
edges_new_lst = content[EDGES_LABEL]
146169
total_lst = group_edges(edges_new_lst)
147170
nodes_new_dict = {
148-
int(k): v for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items()
171+
int(k): v
172+
for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items()
149173
}
150174
total_new_lst = resort_total_lst(total_lst=total_lst, nodes_dict=nodes_new_dict)
151-
step_name_lst = {t[0]: function_nodes_dict[t[0]].split(".")[-1] for t in total_new_lst}
152-
input_id_dict = {n["id"]: n["name"] for n in workflow[NODES_LABEL] if n["type"] == "input"}
175+
step_name_lst = {
176+
t[0]: function_nodes_dict[t[0]].split(".")[-1] for t in total_new_lst
177+
}
178+
input_id_dict = {
179+
n["id"]: n["name"] for n in workflow[NODES_LABEL] if n["type"] == "input"
180+
}
153181
for t in total_new_lst:
154182
ind = t[0]
155183
node_script = step_name_lst[ind] + ".cwl"
156-
output = [o + "_file" if o is not None else "result_file" for o in funct_dict[ind]['sourcePorts']]
184+
output = [
185+
o + "_file" if o is not None else "result_file"
186+
for o in funct_dict[ind]["sourcePorts"]
187+
]
157188
in_dict = {}
158189
for k, v in t[1].items():
159190
if v[SOURCE_LABEL] in input_id_dict:
160191
in_dict[k + "_file"] = input_id_dict[v[SOURCE_LABEL]] + "_file"
161192
else:
162-
in_dict[k + "_file"] = step_name_lst[v[SOURCE_LABEL]] + "/" + v[SOURCE_PORT_LABEL] + "_file"
163-
workflow_template["steps"].update({step_name_lst[ind]: {"run": node_script, "in": in_dict, "out": output}})
193+
in_dict[k + "_file"] = (
194+
step_name_lst[v[SOURCE_LABEL]]
195+
+ "/"
196+
+ v[SOURCE_PORT_LABEL]
197+
+ "_file"
198+
)
199+
workflow_template["steps"].update(
200+
{step_name_lst[ind]: {"run": node_script, "in": in_dict, "out": output}}
201+
)
164202
with open("workflow.cwl", "w") as f:
165203
dump(workflow_template, f, Dumper=Dumper)
166204

@@ -169,6 +207,6 @@ def load_workflow_json(file_name: str):
169207
with open(file_name, "r") as f:
170208
workflow = json.load(f)
171209

172-
write_function_cwl(workflow=workflow)
173-
write_workflow_config(workflow=workflow)
174-
write_workflow(workflow=workflow)
210+
_write_function_cwl(workflow=workflow)
211+
_write_workflow_config(workflow=workflow)
212+
_write_workflow(workflow=workflow)

0 commit comments

Comments
 (0)