2121
2222
2323def _get_function_dict (flow ):
24- return {
25- job .uuid : job .function
26- for job in flow .jobs
27- }
24+ return {job .uuid : job .function for job in flow .jobs }
2825
2926
3027def _get_nodes_dict (function_dict ):
@@ -37,7 +34,7 @@ def _get_nodes_dict(function_dict):
3734
3835
3936def _get_edge_from_dict (target , key , value_dict , nodes_mapping_dict ):
40- if len (value_dict [' attributes' ]) == 1 :
37+ if len (value_dict [" attributes" ]) == 1 :
4138 return {
4239 TARGET_LABEL : target ,
4340 TARGET_PORT_LABEL : key ,
@@ -57,72 +54,152 @@ def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
5754 edges_lst = []
5855 for job in flow_dict ["jobs" ]:
5956 for k , v in job ["function_kwargs" ].items ():
60- if isinstance (v , dict ) and "@module" in v and "@class" in v and "@version" in v :
61- edges_lst .append (_get_edge_from_dict (
62- target = nodes_mapping_dict [job ["uuid" ]],
63- key = k ,
64- value_dict = v ,
65- nodes_mapping_dict = nodes_mapping_dict ,
66- ))
67- elif isinstance (v , dict ) and any ([isinstance (el , dict ) and "@module" in el and "@class" in el and "@version" in el for el in v .values ()]):
57+ if (
58+ isinstance (v , dict )
59+ and "@module" in v
60+ and "@class" in v
61+ and "@version" in v
62+ ):
63+ edges_lst .append (
64+ _get_edge_from_dict (
65+ target = nodes_mapping_dict [job ["uuid" ]],
66+ key = k ,
67+ value_dict = v ,
68+ nodes_mapping_dict = nodes_mapping_dict ,
69+ )
70+ )
71+ elif isinstance (v , dict ) and any (
72+ [
73+ isinstance (el , dict )
74+ and "@module" in el
75+ and "@class" in el
76+ and "@version" in el
77+ for el in v .values ()
78+ ]
79+ ):
6880 node_dict_index = len (nodes_dict )
6981 nodes_dict [node_dict_index ] = get_dict
7082 for kt , vt in v .items ():
71- if isinstance (vt , dict ) and "@module" in vt and "@class" in vt and "@version" in vt :
72- edges_lst .append (_get_edge_from_dict (
73- target = node_dict_index ,
74- key = kt ,
75- value_dict = vt ,
76- nodes_mapping_dict = nodes_mapping_dict ,
77- ))
83+ if (
84+ isinstance (vt , dict )
85+ and "@module" in vt
86+ and "@class" in vt
87+ and "@version" in vt
88+ ):
89+ edges_lst .append (
90+ _get_edge_from_dict (
91+ target = node_dict_index ,
92+ key = kt ,
93+ value_dict = vt ,
94+ nodes_mapping_dict = nodes_mapping_dict ,
95+ )
96+ )
7897 else :
7998 if vt not in nodes_dict .values ():
8099 node_index = len (nodes_dict )
81100 nodes_dict [node_index ] = vt
82101 else :
83- node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[str (vt )]
84- edges_lst .append ({TARGET_LABEL : node_dict_index , TARGET_PORT_LABEL : kt , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
85- edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_dict_index , SOURCE_PORT_LABEL : None })
86- elif isinstance (v , list ) and any ([isinstance (el , dict ) and "@module" in el and "@class" in el and "@version" in el for el in v ]):
102+ node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[
103+ str (vt )
104+ ]
105+ edges_lst .append (
106+ {
107+ TARGET_LABEL : node_dict_index ,
108+ TARGET_PORT_LABEL : kt ,
109+ SOURCE_LABEL : node_index ,
110+ SOURCE_PORT_LABEL : None ,
111+ }
112+ )
113+ edges_lst .append (
114+ {
115+ TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]],
116+ TARGET_PORT_LABEL : k ,
117+ SOURCE_LABEL : node_dict_index ,
118+ SOURCE_PORT_LABEL : None ,
119+ }
120+ )
121+ elif isinstance (v , list ) and any (
122+ [
123+ isinstance (el , dict )
124+ and "@module" in el
125+ and "@class" in el
126+ and "@version" in el
127+ for el in v
128+ ]
129+ ):
87130 node_list_index = len (nodes_dict )
88131 nodes_dict [node_list_index ] = get_list
89132 for kt , vt in enumerate (v ):
90- if isinstance (vt , dict ) and "@module" in vt and "@class" in vt and "@version" in vt :
91- edges_lst .append (_get_edge_from_dict (
92- target = node_list_index ,
93- key = str (kt ),
94- value_dict = vt ,
95- nodes_mapping_dict = nodes_mapping_dict ,
96- ))
133+ if (
134+ isinstance (vt , dict )
135+ and "@module" in vt
136+ and "@class" in vt
137+ and "@version" in vt
138+ ):
139+ edges_lst .append (
140+ _get_edge_from_dict (
141+ target = node_list_index ,
142+ key = str (kt ),
143+ value_dict = vt ,
144+ nodes_mapping_dict = nodes_mapping_dict ,
145+ )
146+ )
97147 else :
98148 if vt not in nodes_dict .values ():
99149 node_index = len (nodes_dict )
100150 nodes_dict [node_index ] = vt
101151 else :
102- node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[str (vt )]
103- edges_lst .append ({TARGET_LABEL : node_list_index , TARGET_PORT_LABEL : kt , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
104- edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_list_index , SOURCE_PORT_LABEL : None })
152+ node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[
153+ str (vt )
154+ ]
155+ edges_lst .append (
156+ {
157+ TARGET_LABEL : node_list_index ,
158+ TARGET_PORT_LABEL : kt ,
159+ SOURCE_LABEL : node_index ,
160+ SOURCE_PORT_LABEL : None ,
161+ }
162+ )
163+ edges_lst .append (
164+ {
165+ TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]],
166+ TARGET_PORT_LABEL : k ,
167+ SOURCE_LABEL : node_list_index ,
168+ SOURCE_PORT_LABEL : None ,
169+ }
170+ )
105171 else :
106172 if v not in nodes_dict .values ():
107173 node_index = len (nodes_dict )
108174 nodes_dict [node_index ] = v
109175 else :
110176 node_index = {tv : tk for tk , tv in nodes_dict .items ()}[v ]
111- edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
177+ edges_lst .append (
178+ {
179+ TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]],
180+ TARGET_PORT_LABEL : k ,
181+ SOURCE_LABEL : node_index ,
182+ SOURCE_PORT_LABEL : None ,
183+ }
184+ )
112185 return edges_lst , nodes_dict
113186
114187
115188def _resort_total_lst (total_dict , nodes_dict ):
116189 nodes_with_dep_lst = list (sorted (total_dict .keys ()))
117- nodes_without_dep_lst = [k for k in nodes_dict .keys () if k not in nodes_with_dep_lst ]
190+ nodes_without_dep_lst = [
191+ k for k in nodes_dict .keys () if k not in nodes_with_dep_lst
192+ ]
118193 ordered_lst = []
119194 total_new_dict = {}
120195 while len (total_new_dict ) < len (total_dict ):
121196 for ind in sorted (total_dict .keys ()):
122197 connect = total_dict [ind ]
123198 if ind not in ordered_lst :
124199 source_lst = [sd [SOURCE_LABEL ] for sd in connect .values ()]
125- if all ([s in ordered_lst or s in nodes_without_dep_lst for s in source_lst ]):
200+ if all (
201+ [s in ordered_lst or s in nodes_without_dep_lst for s in source_lst ]
202+ ):
126203 ordered_lst .append (ind )
127204 total_new_dict [ind ] = connect
128205 return total_new_dict
@@ -142,7 +219,7 @@ def _group_edges(edges_lst):
142219
143220
144221def _get_input_dict (nodes_dict ):
145- return {k :v for k , v in nodes_dict .items () if not isfunction (v )}
222+ return {k : v for k , v in nodes_dict .items () if not isfunction (v )}
146223
147224
148225def _get_workflow (nodes_dict , input_dict , total_dict , source_handles_dict ):
@@ -157,12 +234,21 @@ def get_attr_helper(obj, source_handle):
157234 v = nodes_dict [k ]
158235 if isfunction (v ):
159236 if k in source_handles_dict .keys ():
160- fn = job (method = v , data = [el for el in source_handles_dict [k ] if el is not None ])
237+ fn = job (
238+ method = v ,
239+ data = [el for el in source_handles_dict [k ] if el is not None ],
240+ )
161241 else :
162242 fn = job (method = v )
163243 kwargs = {
164- kw : input_dict [vw [SOURCE_LABEL ]] if vw [SOURCE_LABEL ] in input_dict else get_attr_helper (
165- obj = memory_dict [vw [SOURCE_LABEL ]], source_handle = vw [SOURCE_PORT_LABEL ])
244+ kw : (
245+ input_dict [vw [SOURCE_LABEL ]]
246+ if vw [SOURCE_LABEL ] in input_dict
247+ else get_attr_helper (
248+ obj = memory_dict [vw [SOURCE_LABEL ]],
249+ source_handle = vw [SOURCE_PORT_LABEL ],
250+ )
251+ )
166252 for kw , vw in total_dict [k ].items ()
167253 }
168254 memory_dict [k ] = fn (** kwargs )
@@ -197,7 +283,7 @@ def load_workflow_json(file_name):
197283 nodes_new_dict = {}
198284 for k , v in convert_nodes_list_to_dict (nodes_list = content [NODES_LABEL ]).items ():
199285 if isinstance (v , str ) and "." in v :
200- p , m = v .rsplit ('.' , 1 )
286+ p , m = v .rsplit ("." , 1 )
201287 mod = import_module (p )
202288 nodes_new_dict [int (k )] = getattr (mod , m )
203289 else :
@@ -229,7 +315,9 @@ def write_workflow_json(flow, file_name="workflow.json"):
229315 nodes_store_lst = []
230316 for k , v in nodes_dict .items ():
231317 if isfunction (v ):
232- nodes_store_lst .append ({"id" : k , "function" : v .__module__ + "." + v .__name__ })
318+ nodes_store_lst .append (
319+ {"id" : k , "function" : v .__module__ + "." + v .__name__ }
320+ )
233321 elif isinstance (v , np .ndarray ):
234322 nodes_store_lst .append ({"id" : k , "value" : v .tolist ()})
235323 else :
0 commit comments