Skip to content

Commit 671ea05

Browse files
committed
[ENH] Logging - MultiProc report current tasks
When the verbosity of logs is >= INFO, a list of currently running tasks is generated and printed out.
1 parent 44b5c17 commit 671ea05

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
from multiprocessing import Process, Pool, cpu_count, pool
1313
from traceback import format_exception
1414
import sys
15+
from textwrap import indent
16+
from logging import INFO
1517

1618
from copy import deepcopy
1719
import numpy as np
18-
1920
from ... import logging
2021
from ...utils.profiler import get_system_total_memory_gb
2122
from ..engine import MapNode
@@ -126,7 +127,7 @@ def __init__(self, plugin_args=None):
126127
self.raise_insufficient = self.plugin_args.get('raise_insufficient', True)
127128

128129
# Instantiate different thread pools for non-daemon processes
129-
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
130+
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
130131
'non' * int(non_daemon), self.processors, self.memory_gb)
131132

132133
NipypePool = NonDaemonPool if non_daemon else Pool
@@ -158,7 +159,7 @@ def _submit_job(self, node, updatehash=False):
158159
run_node, (node, updatehash, self._taskid),
159160
callback=self._async_callback)
160161

161-
logger.debug('MultiProc submitted task %s (taskid=%d).',
162+
logger.debug('[MultiProc] Submitted task %s (taskid=%d).',
162163
node.fullname, self._taskid)
163164
return self._taskid
164165

@@ -214,9 +215,17 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
214215
stats = (len(self.pending_tasks), len(jobids), free_memory_gb,
215216
self.memory_gb, free_processors, self.processors)
216217
if self._stats != stats:
217-
logger.info('Currently running %d tasks, and %d jobs ready. Free '
218-
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d',
219-
*stats)
218+
tasks_list_msg = ''
219+
if logger.level <= INFO:
220+
running_tasks = [' * %s' % self.procs[jobid].fullname
221+
for _, jobid in self.pending_tasks]
222+
if running_tasks:
223+
tasks_list_msg = '\nCurrently running:\n'
224+
tasks_list_msg += '\n'.join(running_tasks)
225+
tasks_list_msg = indent(tasks_list_msg, ' ' * 21)
226+
logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free '
227+
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s',
228+
*stats, tasks_list_msg)
220229
self._stats = stats
221230

222231
if free_memory_gb < 0.01 or free_processors == 0:

0 commit comments

Comments
 (0)