@@ -203,7 +203,9 @@ def load_workflow_json(file_name: str) -> WorkGraph:
203203 return wg
204204
205205
206- def write_workflow_json (wg : WorkGraph , file_name : str , _nested_counter : dict = None ) -> dict :
206+ def write_workflow_json (
207+ wg : WorkGraph , file_name : str , _nested_counter : dict = None
208+ ) -> dict :
207209 """Write a WorkGraph to JSON file(s), with support for nested workflows.
208210
209211 Args:
@@ -237,13 +239,13 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
237239 nested_wg = None
238240
239241 # Method 1: Check if this is a SubGraphTask (has spec.node_type == 'SubGraph')
240- if hasattr (node , ' spec' ) and hasattr (node .spec , ' node_type' ):
241- if node .spec .node_type == ' SubGraph' and hasattr (node , ' tasks' ):
242+ if hasattr (node , " spec" ) and hasattr (node .spec , " node_type" ):
243+ if node .spec .node_type == " SubGraph" and hasattr (node , " tasks" ):
242244 is_graph = True
243245 nested_wg = node
244246
245247 # Method 2: Check if the node itself has tasks attribute (indicating it's a subgraph)
246- if not is_graph and hasattr (node , ' tasks' ):
248+ if not is_graph and hasattr (node , " tasks" ):
247249 # Make sure it has actual tasks (not just an empty list)
248250 tasks_list = [t for t in node .tasks if t .name not in GRAPH_LEVEL_NAMES ]
249251 if len (tasks_list ) > 0 :
@@ -264,38 +266,49 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
264266 # Recursively write the nested workflow
265267 write_workflow_json (nested_wg , str (nested_path ), _nested_counter )
266268
267- data [NODES_LABEL ].append ({"id" : i , "type" : "workflow" , "value" : nested_filename })
269+ data [NODES_LABEL ].append (
270+ {"id" : i , "type" : "workflow" , "value" : nested_filename }
271+ )
268272 else :
269273 # This is a regular function task
270274 # Try to get the module path from different sources
271275 module_path = executor .module_path
272276
273277 # If module_path is None, try to extract from pickled_callable
274- if module_path is None and hasattr (executor , ' pickled_callable' ):
278+ if module_path is None and hasattr (executor , " pickled_callable" ):
275279 # For pickled callables, try to get the original function
276280 try :
277281 import cloudpickle
282+
278283 func = cloudpickle .loads (executor .pickled_callable )
279- if hasattr (func , ' __module__' ):
284+ if hasattr (func , " __module__" ):
280285 module_path = func .__module__
281286 except Exception :
282287 pass # Keep module_path as None
283288
284289 callable_name = f"{ module_path } .{ executor .callable_name } "
285- data [NODES_LABEL ].append ({"id" : i , "type" : "function" , "value" : callable_name })
290+ data [NODES_LABEL ].append (
291+ {"id" : i , "type" : "function" , "value" : callable_name }
292+ )
286293
287294 i += 1
288295
289296 # Handle workflow-level inputs (create input nodes)
290297 input_name_mapping = {}
291- INTERNAL_SOCKETS = ['metadata' , '_wait' , '_outputs' , 'function_data' , 'function_inputs' ]
298+ INTERNAL_SOCKETS = [
299+ "metadata" ,
300+ "_wait" ,
301+ "_outputs" ,
302+ "function_data" ,
303+ "function_inputs" ,
304+ ]
292305
293306 # First, try to get default values from graph_inputs task (for SubGraphTasks)
294307 graph_inputs_defaults = {}
295308 for task in wg .tasks :
296- if task .name == ' graph_inputs' and hasattr (task , ' outputs' ):
309+ if task .name == " graph_inputs" and hasattr (task , " outputs" ):
297310 for output in task .outputs :
298- if hasattr (output , ' _name' ) and hasattr (output , ' value' ):
311+ if hasattr (output , " _name" ) and hasattr (output , " value" ):
299312 output_name = output ._name
300313 if output .value is not None and isinstance (output .value , orm .Data ):
301314 if isinstance (output .value , orm .List ):
@@ -311,20 +324,24 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
311324 val = int (val )
312325 graph_inputs_defaults [output_name ] = val
313326
314- if hasattr (wg , 'inputs' ) and wg .inputs is not None and hasattr (wg .inputs , '_sockets' ):
327+ if (
328+ hasattr (wg , "inputs" )
329+ and wg .inputs is not None
330+ and hasattr (wg .inputs , "_sockets" )
331+ ):
315332 for input_name , input_socket in wg .inputs ._sockets .items ():
316333 # Skip metadata and other special namespaces/internal sockets
317334 if isinstance (input_socket , TaskSocketNamespace ):
318335 continue
319- if input_name in INTERNAL_SOCKETS or input_name .startswith ('_' ):
336+ if input_name in INTERNAL_SOCKETS or input_name .startswith ("_" ):
320337 continue
321338
322339 # Check if this input has a default value
323340 # First try graph_inputs defaults, then the socket value
324341 input_value = None
325342 if input_name in graph_inputs_defaults :
326343 input_value = graph_inputs_defaults [input_name ]
327- elif hasattr (input_socket , ' value' ) and input_socket .value is not None :
344+ elif hasattr (input_socket , " value" ) and input_socket .value is not None :
328345 if isinstance (input_socket .value , orm .Data ):
329346 if isinstance (input_socket .value , orm .List ):
330347 input_value = input_socket .value .get_list ()
@@ -347,12 +364,16 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
347364
348365 # Handle workflow-level outputs (create output nodes)
349366 output_name_mapping = {}
350- if hasattr (wg , 'outputs' ) and wg .outputs is not None and hasattr (wg .outputs , '_sockets' ):
367+ if (
368+ hasattr (wg , "outputs" )
369+ and wg .outputs is not None
370+ and hasattr (wg .outputs , "_sockets" )
371+ ):
351372 for output_name , output_socket in wg .outputs ._sockets .items ():
352373 # Skip metadata and other special namespaces/internal sockets
353374 if isinstance (output_socket , TaskSocketNamespace ):
354375 continue
355- if output_name in INTERNAL_SOCKETS or output_name .startswith ('_' ):
376+ if output_name in INTERNAL_SOCKETS or output_name .startswith ("_" ):
356377 continue
357378
358379 data [NODES_LABEL ].append ({"id" : i , "type" : "output" , "name" : output_name })
@@ -376,7 +397,9 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
376397 else :
377398 link_data [SOURCE_LABEL ] = node_name_mapping .get (from_node_name )
378399 # if the from socket is the default result, we set it to None
379- link_data [SOURCE_PORT_LABEL ] = None if from_socket == "result" else from_socket
400+ link_data [SOURCE_PORT_LABEL ] = (
401+ None if from_socket == "result" else from_socket
402+ )
380403
381404 # Handle links to graph_outputs
382405 if to_node_name == "graph_outputs" :
@@ -410,7 +433,10 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
410433 if isinstance (input .value , orm .Data ):
411434 # Check if this input is already connected (e.g., from workflow inputs)
412435 node_id = node_name_mapping [node .name ]
413- if any (link [1 ] == node_id and link [2 ] == input ._name for link in existing_links ):
436+ if any (
437+ link [1 ] == node_id and link [2 ] == input ._name
438+ for link in existing_links
439+ ):
414440 continue
415441
416442 if input .value .uuid not in data_node_name_mapping :
@@ -441,7 +467,9 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
441467 SOURCE_PORT_LABEL : None ,
442468 }
443469 )
444- existing_links .add ((input_node_name , node_name_mapping [node .name ], input ._name ))
470+ existing_links .add (
471+ (input_node_name , node_name_mapping [node .name ], input ._name )
472+ )
445473
446474 data [VERSION_LABEL ] = VERSION_NUMBER
447475
@@ -456,6 +484,10 @@ def write_workflow_json(wg: WorkGraph, file_name: str, _nested_counter: dict = N
456484 workflow_data = data
457485 else :
458486 # Old-style workflow - need to update names and add result node
459- workflow_data = set_result_node (workflow_dict = update_node_names (workflow_dict = data ))
487+ workflow_data = set_result_node (
488+ workflow_dict = update_node_names (workflow_dict = data )
489+ )
460490
461- PythonWorkflowDefinitionWorkflow (** workflow_data ).dump_json_file (file_name = file_name , indent = 2 )
491+ PythonWorkflowDefinitionWorkflow (** workflow_data ).dump_json_file (
492+ file_name = file_name , indent = 2
493+ )
0 commit comments