Skip to content

Commit 52ba824

Browse files
committed
pipeline runner: stability fixes and logging changes
1 parent c5a2008 commit 52ba824

File tree

1 file changed

+24
-10
lines changed

1 file changed

+24
-10
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
2222

23+
sys.setrecursionlimit(100000)
2324

2425
def setup_logger(name, log_file, level=logging.INFO):
2526
"""To setup as many loggers as you want"""
@@ -392,7 +393,8 @@ def __init__(self, workflowfile, args, jmax=100):
392393
signal.signal(signal.SIGINT, self.SIGHandler)
393394
signal.siginterrupt(signal.SIGINT, False)
394395
self.nicevalues = [ os.nice(0) for tid in range(len(self.taskuniverse)) ]
395-
self.internalmonitorcounter = 0
396+
self.internalmonitorcounter = 0 # internal use
397+
self.internalmonitorid = 0 # internal use
396398

397399
def SIGHandler(self, signum, frame):
398400
# basically forcing shut down of all child processes
@@ -495,8 +497,16 @@ def ok_to_submit(self, tid, backfill=False):
495497
actionlogger.debug ('Condition check --normal-- for ' + str(tid) + ':' + str(self.idtotask[tid]) + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem))
496498
return (okcpu and okmem)
497499
else:
500+
# not backfilling jobs which either take much memory or use lot's of CPU anyway
501+
# conditions are somewhat arbitrary and can be played with
502+
if float(self.cpuperid[tid]) > 0.9*float(self.args.cpu_limit):
503+
return False
504+
if float(self.maxmemperid[tid])/float(self.args.cpu_limit) >= 1900:
505+
return False
506+
498507
# analyse CPU
499-
okcpu = (self.curcpubooked + self.curcpubooked_backfill + float(self.cpuperid[tid]) <= softcpufactor*self.cpulimit)
508+
okcpu = (self.curcpubooked_backfill + float(self.cpuperid[tid]) <= self.cpulimit)
509+
okcpu = okcpu and (self.curcpubooked + self.curcpubooked_backfill + float(self.cpuperid[tid]) <= softcpufactor*self.cpulimit)
500510
# analyse MEM
501511
okmem = (self.curmembooked + self.curmembooked_backfill + float(self.maxmemperid[tid]) <= softmemfactor*self.memlimit)
502512
actionlogger.debug ('Condition check --backfill-- for ' + str(tid) + ':' + str(self.idtotask[tid]) + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem))
@@ -563,10 +573,11 @@ def stop_pipeline_and_exit(self, process_list):
563573

564574
def monitor(self, process_list):
565575
self.internalmonitorcounter+=1
566-
if self.internalmonitorcounter!=5:
576+
if self.internalmonitorcounter % 5 != 0:
567577
return
568578

569-
self.internalmonitorcounter=0
579+
self.internalmonitorid+=1
580+
570581
globalCPU=0.
571582
globalPSS=0.
572583
globalCPU_backfill=0.
@@ -606,12 +617,16 @@ def monitor(self, process_list):
606617
except Exception:
607618
pass
608619
"""
620+
thispss=0
621+
thisuss=0
609622
# MEMORY part
610623
try:
611624
fullmem=p.memory_full_info()
612-
totalPSS=totalPSS + getattr(fullmem,'pss',0) #<-- pss not available on MacOS
625+
thispss=getattr(fullmem,'pss',0) #<-- pss not available on MacOS
626+
totalPSS=totalPSS + thispss
613627
totalSWAP=totalSWAP + fullmem.swap
614-
totalUSS=totalUSS + fullmem.uss
628+
thisuss=fullmem.uss
629+
totalUSS=totalUSS + thisuss
615630
except (psutil.NoSuchProcess, psutil.AccessDenied):
616631
pass
617632

@@ -624,14 +639,16 @@ def monitor(self, process_list):
624639
except (psutil.NoSuchProcess, psutil.AccessDenied):
625640
thiscpu = 0.
626641
totalCPU = totalCPU + thiscpu
642+
# thisresource = {'iter':self.internalmonitorid, 'pid': p.pid, 'cpu':thiscpu, 'uss':thisuss/1024./1024., 'pss':thispss/1024./1024.}
643+
# metriclogger.info(thisresource)
627644
else:
628645
self.pid_to_psutilsproc[p.pid] = p
629646
try:
630647
self.pid_to_psutilsproc[p.pid].cpu_percent()
631648
except (psutil.NoSuchProcess, psutil.AccessDenied):
632649
pass
633650

634-
resources_per_task[tid]={'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS/1024./1024., 'pss':totalPSS/1024./1024, 'nice':proc.nice(), 'swap':totalSWAP}
651+
resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS/1024./1024., 'pss':totalPSS/1024./1024, 'nice':proc.nice(), 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']}
635652
metriclogger.info(resources_per_task[tid])
636653

637654
for r in resources_per_task.values():
@@ -642,9 +659,6 @@ def monitor(self, process_list):
642659
globalCPU_backfill+=r['cpu']
643660
globalPSS_backfill+=r['pss']
644661

645-
# print ("globalCPU " + str(globalCPU) + ' in ' + str(len(process_list)) + ' tasks ' + str(self.curmembooked) + ',' + str(self.curcpubooked))
646-
# print ("globalPSS " + str(globalPSS))
647-
metriclogger.info( "CPU-normal " + str(globalCPU) + " CPU-backfill " + str(globalCPU_backfill))
648662
if globalPSS > self.memlimit:
649663
metriclogger.info('*** MEMORY LIMIT PASSED !! ***')
650664
# --> We could use this for corrective actions such as killing jobs currently back-filling

0 commit comments

Comments
 (0)