|
6 | 6 | http://stackoverflow.com/a/8963618/1183453 |
7 | 7 | """ |
8 | 8 |
|
| 9 | +# Import packages |
9 | 10 | from multiprocessing import Process, Pool, cpu_count, pool |
10 | 11 | from traceback import format_exception |
11 | 12 | import sys |
12 | 13 | import numpy as np |
13 | 14 | from copy import deepcopy |
14 | 15 | from ..engine import MapNode |
15 | 16 | from ...utils.misc import str2bool |
16 | | -import datetime |
17 | 17 | import psutil |
18 | 18 | from ... import logging |
19 | 19 | import semaphore_singleton |
20 | 20 | from .base import (DistributedPluginBase, report_crash) |
21 | 21 |
|
| 22 | +# Init logger |
| 23 | +logger = logging.getLogger('workflow') |
22 | 24 |
|
23 | 25 | # Run node |
24 | 26 | def run_node(node, updatehash, runtime_profile=False): |
25 | 27 | """docstring |
26 | 28 | """ |
27 | 29 |
|
28 | 30 | # Import packages |
29 | | - try: |
30 | | - import memory_profiler |
31 | | - import datetime |
32 | | - except ImportError: |
33 | | - runtime_profile = False |
| 31 | + import datetime |
34 | 32 |
|
35 | 33 | # Init variables |
36 | 34 | result = dict(result=None, traceback=None) |
37 | 35 |
|
38 | 36 | # If we're profiling the run |
39 | 37 | if runtime_profile: |
40 | 38 | try: |
41 | | - # Init function tuple |
42 | | - proc = (node.run, (), {'updatehash' : updatehash}) |
43 | 39 | start = datetime.datetime.now() |
44 | | - mem_mb, retval = memory_profiler.memory_usage(proc=proc, retval=True, |
45 | | - include_children=True, |
46 | | - max_usage=True, interval=.9e-6) |
| 40 | + retval = node.run(updatehash=updatehash) |
47 | 41 | run_secs = (datetime.datetime.now() - start).total_seconds() |
48 | 42 | result['result'] = retval |
49 | | - result['node_memory'] = mem_mb[0]/1024.0 |
50 | 43 | result['run_seconds'] = run_secs |
51 | 44 | if hasattr(retval.runtime, 'get'): |
52 | 45 | result['cmd_memory'] = retval.runtime.get('cmd_memory') |
@@ -83,11 +76,11 @@ class NonDaemonPool(pool.Pool): |
83 | 76 | """ |
84 | 77 | Process = NonDaemonProcess |
85 | 78 |
|
86 | | -logger = logging.getLogger('workflow') |
87 | 79 |
|
88 | 80 | def release_lock(args): |
89 | 81 | semaphore_singleton.semaphore.release() |
90 | 82 |
|
| 83 | + |
91 | 84 | class ResourceMultiProcPlugin(DistributedPluginBase): |
92 | 85 | """Execute workflow with multiprocessing, not sending more jobs at once |
93 | 86 | than the system can support. |
|
0 commit comments