1010 absolute_import )
1111
1212# Import packages
13+ import os
1314from multiprocessing import Process , Pool , cpu_count , pool
1415from traceback import format_exception
1516import sys
@@ -50,6 +51,8 @@ def run_node(node, updatehash, taskid):
5051 the node to run
5152 updatehash : boolean
5253 flag for updating hash
54+ taskid : int
55+ an identifier for this task
5356
5457 Returns
5558 -------
@@ -63,7 +66,7 @@ def run_node(node, updatehash, taskid):
6366 # Try and execute the node via node.run()
6467 try :
6568 result ['result' ] = node .run (updatehash = updatehash )
66- except :
69+ except : # noqa: E722, intendedly catch all here
6770 result ['traceback' ] = format_exception (* sys .exc_info ())
6871 result ['result' ] = node .result
6972
@@ -131,6 +134,10 @@ def __init__(self, plugin_args=None):
131134 self ._task_obj = {}
132135 self ._taskid = 0
133136
137+ # Cache current working directory and make sure we
138+ # change to it when workers are set up
139+ self ._cwd = os .getcwd ()
140+
134141 # Read in options or set defaults.
135142 non_daemon = self .plugin_args .get ('non_daemon' , True )
136143 maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
@@ -143,19 +150,28 @@ def __init__(self, plugin_args=None):
143150
144151 # Instantiate different thread pools for non-daemon processes
145152 logger .debug ('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
146- 'mem_gb=%0.2f)' , 'non' * int (non_daemon ), self . processors ,
147- self .memory_gb )
153+ 'mem_gb=%0.2f, cwd=%s )' , 'non' * int (non_daemon ),
154+ self .processors , self . memory_gb , self . _cwd )
148155
149156 NipypePool = NonDaemonPool if non_daemon else Pool
150157 try :
151158 self .pool = NipypePool (
152- processes = self .processors , maxtasksperchild = maxtasks )
159+ processes = self .processors ,
160+ maxtasksperchild = maxtasks ,
161+ initializer = os .chdir ,
162+ initargs = (self ._cwd ,)
163+ )
153164 except TypeError :
165+ # Python < 3.2 does not have maxtasksperchild
166+ # When maxtasksperchild is not set, initializer is not to be
167+ # called
154168 self .pool = NipypePool (processes = self .processors )
155169
156170 self ._stats = None
157171
158172 def _async_callback (self , args ):
173+ # Make sure runtime is not left at a dubious working directory
174+ os .chdir (self ._cwd )
159175 self ._taskresult [args ['taskid' ]] = args
160176
161177 def _get_result (self , taskid ):
@@ -360,7 +376,6 @@ def _sort_jobs(self, jobids, scheduler='tsort'):
360376 if scheduler == 'mem_thread' :
361377 return sorted (
362378 jobids ,
363- key =
364- lambda item : (self .procs [item ].mem_gb , self .procs [item ].n_procs )
379+ key = lambda item : (self .procs [item ].mem_gb , self .procs [item ].n_procs )
365380 )
366381 return jobids
0 commit comments