@@ -189,7 +189,7 @@ def __init__(self,
189189 self ._hashvalue = None
190190 self ._hashed_inputs = None
191191 self ._needed_outputs = []
192- self .needed_outputs = sorted ( needed_outputs )
192+ self .needed_outputs = needed_outputs
193193
194194 @property
195195 def interface (self ):
@@ -297,83 +297,105 @@ def help(self):
297297 """Print interface help"""
298298 self ._interface .help ()
299299
300- def hash_exists (self , updatehash = False ):
300+ def is_cached (self , rm_outdated = False ):
301301 """
302302 Check if the interface has been run previously, and whether
303- cached results are viable for reuse
303+ cached results are up-to-date.
304304 """
305+ outdir = self .output_dir ()
305306
306- # Get a dictionary with hashed filenames and a hashvalue
307- # of the dictionary itself.
307+ # Update hash
308308 hashed_inputs , hashvalue = self ._get_hashval ()
309- outdir = self .output_dir ()
309+
310+ # The output folder does not exist: not cached
311+ if not op .exists (outdir ):
312+ logger .debug ('[Node] Directory not found "%s".' , outdir )
313+ return False , False
314+
310315 hashfile = op .join (outdir , '_0x%s.json' % hashvalue )
311- hash_exists = op .exists (hashfile )
312-
313- logger .debug ('[Node] hash value=%s, exists=%s' , hashvalue , hash_exists )
314-
315- if op .exists (outdir ):
316- # Find previous hashfiles
317- globhashes = glob (op .join (outdir , '_0x*.json' ))
318- unfinished = [
319- path for path in globhashes
320- if path .endswith ('_unfinished.json' )
321- ]
322- hashfiles = list (set (globhashes ) - set (unfinished ))
323- if len (hashfiles ) > 1 :
324- for rmfile in hashfiles :
325- os .remove (rmfile )
326-
327- raise RuntimeError (
328- '[Node] Cache ERROR - Found %d previous hashfiles indicating '
329- 'that the ``base_dir`` for this node went stale. Please re-run the '
330- 'workflow.' % len (hashfiles ))
331-
332- # This should not happen, but clean up and break if so.
333- if unfinished and updatehash :
334- for rmfile in unfinished :
335- os .remove (rmfile )
336-
337- raise RuntimeError (
338- '[Node] Cache ERROR - Found unfinished hashfiles (%d) indicating '
339- 'that the ``base_dir`` for this node went stale. Please re-run the '
340- 'workflow.' % len (unfinished ))
341-
342- # Remove outdated hashfile
343- if hashfiles and hashfiles [0 ] != hashfile :
344- logger .info (
345- '[Node] Outdated hashfile found for "%s", removing and forcing node '
346- 'to rerun.' , self .fullname )
347-
348- # If logging is more verbose than INFO (20), print diff between hashes
349- loglevel = logger .getEffectiveLevel ()
350- if loglevel < 20 : # Lazy logging: only < INFO
351- split_out = split_filename (hashfiles [0 ])
352- exp_hash_file_base = split_out [1 ]
353- exp_hash = exp_hash_file_base [len ('_0x' ):]
354- logger .log (loglevel , "[Node] Old/new hashes = %s/%s" ,
355- exp_hash , hashvalue )
356- try :
357- prev_inputs = load_json (hashfiles [0 ])
358- except Exception :
359- pass
360- else :
361- logger .log (loglevel ,
362- dict_diff (prev_inputs , hashed_inputs , 10 ))
316+ cached = op .exists (hashfile )
317+
318+ # Check if updated
319+ globhashes = glob (op .join (outdir , '_0x*.json' ))
320+ unfinished = [
321+ path for path in globhashes
322+ if path .endswith ('_unfinished.json' )
323+ ]
324+ hashfiles = list (set (globhashes ) - set (unfinished ))
325+ logger .debug ('[Node] Hashes: %s, %s, %s, %s' ,
326+ hashed_inputs , hashvalue , hashfile , hashfiles )
327+
328+ # No previous hashfiles found, we're all set.
329+ if cached and len (hashfiles ) == 1 :
330+ assert (hashfile == hashfiles [0 ])
331+ logger .debug ('[Node] Up-to-date cache found for "%s".' , self .fullname )
332+ return True , True # Cached and updated
333+
334+ if len (hashfiles ) > 1 :
335+ if cached :
336+ hashfiles .remove (hashfile ) # Do not clean up the node, if cached
337+ logger .warning ('[Node] Found %d previous hashfiles indicating that the working '
338+ 'directory of node "%s" is stale, deleting old hashfiles.' ,
339+ len (hashfiles ), self .fullname )
340+ for rmfile in hashfiles :
341+ os .remove (rmfile )
342+
343+ hashfiles = [hashfile ] if cached else []
344+
345+ # At this point only one hashfile is in the folder
346+ # and we directly check whether it is updated
347+ if not hashfiles :
348+ logger .debug ('[Node] No hashfiles found in "%s".' , outdir )
349+ assert (not cached )
350+ return False , False
351+
352+ updated = hashfile == hashfiles [0 ]
353+ if not updated : # Report differences depending on log verbosity
354+ cached = True
355+ logger .info ('[Node] Outdated cache found for "%s".' , self .fullname )
356+ # If logging is more verbose than INFO (20), print diff between hashes
357+ loglevel = logger .getEffectiveLevel ()
358+ if loglevel < 40 : # Lazy logging: only < INFO
359+ exp_hash_file_base = split_filename (hashfiles [0 ])[1 ]
360+ exp_hash = exp_hash_file_base [len ('_0x' ):]
361+ logger .log (loglevel , "[Node] Old/new hashes = %s/%s" ,
362+ exp_hash , hashvalue )
363+ try :
364+ prev_inputs = load_json (hashfiles [0 ])
365+ except Exception :
366+ pass
367+ else :
368+ logger .log (loglevel ,
369+ dict_diff (prev_inputs , hashed_inputs , 10 ))
363370
371+ if rm_outdated :
364372 os .remove (hashfiles [0 ])
365373
374+ assert (cached ) # At this point, node is cached (may not be up-to-date)
375+ return cached , updated
376+
377+ def hash_exists (self , updatehash = False ):
378+ """
379+ Decorate the new `is_cached` method with hash updating
380+ to maintain backwards compatibility.
381+ """
382+
383+ # Get a dictionary with hashed filenames and a hashvalue
384+ # of the dictionary itself.
385+ cached , updated = self .is_cached (rm_outdated = True )
386+
387+ outdir = self .output_dir ()
388+ hashfile = op .join (outdir , '_0x%s.json' % self ._hashvalue )
389+
390+ if updated :
391+ return True , self ._hashvalue , hashfile , self ._hashed_inputs
392+
366393 # Update only possible if it exists
367- if hash_exists and updatehash :
368- logger .debug ("[Node] Updating hash: %s" , hashvalue )
369- _save_hashfile (hashfile , hashed_inputs )
394+ if cached and updatehash :
395+ logger .debug ("[Node] Updating hash: %s" , self . _hashvalue )
396+ _save_hashfile (hashfile , self . _hashed_inputs )
370397
371- logger .debug (
372- 'updatehash=%s, overwrite=%s, always_run=%s, hash_exists=%s, '
373- 'hash_method=%s' , updatehash , self .overwrite ,
374- self ._interface .always_run , hash_exists ,
375- self .config ['execution' ]['hash_method' ].lower ())
376- return hash_exists , hashvalue , hashfile , hashed_inputs
398+ return cached , self ._hashvalue , hashfile , self ._hashed_inputs
377399
378400 def run (self , updatehash = False ):
379401 """Execute the node in its directory.
@@ -390,23 +412,17 @@ def run(self, updatehash=False):
390412 if self .config is None :
391413 self .config = {}
392414 self .config = merge_dict (deepcopy (config ._sections ), self .config )
393- self ._get_inputs ()
394415
395- # Check if output directory exists
396416 outdir = self .output_dir ()
397- if op .exists (outdir ):
398- logger .debug ('Output directory (%s) exists and is %sempty,' ,
399- outdir , 'not ' * bool (os .listdir (outdir )))
417+ force_run = self .overwrite or (self .overwrite is None and
418+ self ._interface .always_run )
400419
401420 # Check hash, check whether run should be enforced
402421 logger .info ('[Node] Setting-up "%s" in "%s".' , self .fullname , outdir )
403- hash_info = self .hash_exists (updatehash = updatehash )
404- hash_exists , hashvalue , hashfile , hashed_inputs = hash_info
405- force_run = self .overwrite or (self .overwrite is None and
406- self ._interface .always_run )
422+ cached , updated = self .is_cached ()
407423
408424 # If the node is cached, check on pklz files and finish
409- if hash_exists and (updatehash or not force_run ):
425+ if not force_run and (updated or ( not updated and updatehash ) ):
410426 logger .debug ("Only updating node hashes or skipping execution" )
411427 inputs_file = op .join (outdir , '_inputs.pklz' )
412428 if not op .exists (inputs_file ):
@@ -418,46 +434,48 @@ def run(self, updatehash=False):
418434 logger .debug ('Creating node file %s' , node_file )
419435 savepkl (node_file , self )
420436
421- result = self ._run_interface (execute = False , updatehash = updatehash )
437+ result = self ._run_interface (execute = False ,
438+ updatehash = updatehash and not updated )
422439 logger .info ('[Node] "%s" found cached%s.' , self .fullname ,
423- ' (and hash updated)' * updatehash )
440+ ' (and hash updated)' * ( updatehash and not updated ) )
424441 return result
425442
426- # by rerunning we mean only nodes that did finish to run previously
427- if hash_exists and not isinstance (self , MapNode ):
428- logger .debug ('[Node] Rerunning "%s"' , self .fullname )
443+ if cached and updated and not isinstance (self , MapNode ):
444+ logger .debug ('[Node] Rerunning cached, up-to-date node "%s"' , self .fullname )
429445 if not force_run and str2bool (
430446 self .config ['execution' ]['stop_on_first_rerun' ]):
431447 raise Exception (
432448 'Cannot rerun when "stop_on_first_rerun" is set to True' )
433449
434- # Remove hashfile if it exists at this point (re-running)
435- if op .exists (hashfile ):
436- os .remove (hashfile )
450+ # Remove any hashfile that exists at this point (re)running.
451+ if cached :
452+ for outdatedhash in glob (op .join (self .output_dir (), '_0x*.json' )):
453+ os .remove (outdatedhash )
454+
437455
438456 # Hashfile while running
439- hashfile_unfinished = op .join (outdir ,
440- '_0x%s_unfinished.json' % hashvalue )
457+ hashfile_unfinished = op .join (
458+ outdir , '_0x%s_unfinished.json' % self . _hashvalue )
441459
442460 # Delete directory contents if this is not a MapNode or can't resume
443- rm_outdir = not isinstance (self , MapNode ) and not (
444- self ._interface .can_resume and op .isfile (hashfile_unfinished ))
445- if rm_outdir :
461+ can_resume = not (self ._interface .can_resume and op .isfile (hashfile_unfinished ))
462+ if can_resume and not isinstance (self , MapNode ):
446463 emptydirs (outdir , noexist_ok = True )
447464 else :
448465 logger .debug ('[%sNode] Resume - hashfile=%s' ,
449466 'Map' * int (isinstance (self , MapNode )),
450467 hashfile_unfinished )
451- if isinstance (self , MapNode ):
452- # remove old json files
453- for filename in glob (op .join (outdir , '_0x*.json' )):
454- os .remove (filename )
468+
469+ if isinstance (self , MapNode ):
470+ # remove old json files
471+ for filename in glob (op .join (outdir , '_0x*.json' )):
472+ os .remove (filename )
455473
456474 # Make sure outdir is created
457475 makedirs (outdir , exist_ok = True )
458476
459477 # Store runtime-hashfile, pre-execution report, the node and the inputs set.
460- _save_hashfile (hashfile_unfinished , hashed_inputs )
478+ _save_hashfile (hashfile_unfinished , self . _hashed_inputs )
461479 write_report (
462480 self , report_type = 'preexec' , is_mapnode = isinstance (self , MapNode ))
463481 savepkl (op .join (outdir , '_node.pklz' ), self )
@@ -485,7 +503,8 @@ def run(self, updatehash=False):
485503 os .chdir (cwd )
486504
487505 # Tear-up after success
488- shutil .move (hashfile_unfinished , hashfile )
506+ shutil .move (hashfile_unfinished ,
507+ hashfile_unfinished .replace ('_unfinished' , '' ))
489508 write_report (
490509 self , report_type = 'postexec' , is_mapnode = isinstance (self , MapNode ))
491510 logger .info ('[Node] Finished "%s".' , self .fullname )
@@ -551,8 +570,14 @@ def _get_inputs(self):
551570 # Successfully set inputs
552571 self ._got_inputs = True
553572
573+ def _update_hash (self ):
574+ for outdatedhash in glob (op .join (self .output_dir (), '_0x*.json' )):
575+ os .remove (outdatedhash )
576+ _save_hashfile (self ._hashvalue , self ._hashed_inputs )
577+
554578 def _run_interface (self , execute = True , updatehash = False ):
555579 if updatehash :
580+ self ._update_hash ()
556581 return self ._load_results ()
557582 return self ._run_command (execute )
558583
@@ -586,7 +611,6 @@ def _load_results(self):
586611 return result
587612
588613 def _run_command (self , execute , copyfiles = True ):
589-
590614 if not execute :
591615 try :
592616 result = self ._load_results ()
@@ -597,7 +621,8 @@ def _run_command(self, execute, copyfiles=True):
597621 copyfiles = False # OE: this was like this before,
598622 execute = True # I'll keep them for safety
599623 else :
600- logger .info ("[Node] Cached - collecting precomputed outputs" )
624+ logger .info ('[Node] Cached "%s" - collecting precomputed outputs' ,
625+ self .fullname )
601626 return result
602627
603628 # Run command: either execute is true or load_results failed.
@@ -1037,6 +1062,10 @@ def _set_mapnode_input(self, name, newvalue):
10371062 def _get_hashval (self ):
10381063 """Compute hash including iterfield lists."""
10391064 self ._get_inputs ()
1065+
1066+ if self ._hashvalue is not None and self ._hashed_inputs is not None :
1067+ return self ._hashed_inputs , self ._hashvalue
1068+
10401069 self ._check_iterfield ()
10411070 hashinputs = deepcopy (self ._interface .inputs )
10421071 for name in self .iterfield :
@@ -1061,7 +1090,8 @@ def _get_hashval(self):
10611090 hashobject .update (str (sorted_outputs ).encode ())
10621091 hashvalue = hashobject .hexdigest ()
10631092 hashed_inputs .append (('needed_outputs' , sorted_outputs ))
1064- return hashed_inputs , hashvalue
1093+ self ._hashed_inputs , self ._hashvalue = hashed_inputs , hashvalue
1094+ return self ._hashed_inputs , self ._hashvalue
10651095
10661096 @property
10671097 def inputs (self ):
0 commit comments